JFrame(用C++11特性重构系列——线程池)
作者:互联网
线程池的基本思想:有一个异步任务队列,任何地方都可以往此队列中加任务,其中任务就是一个个待执行的函数,然后还有一个线程池,线程池中有一定数量的线程,线程池一经启动,每个运行的线程都会从这个异步队列中不断取出任务并执行!
SyncQueue
1 #ifndef SYNCQUEUE_H 2 #define SYNCQUEUE_H 3 4 #include<list> 5 #include<mutex> 6 #include<thread> 7 #include<condition_variable> 8 #include <iostream> 9 using namespace std; 10 11 template<typename T> 12 class SyncQueue 13 { 14 public: 15 SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false) 16 { 17 } 18 19 void Put(const T&x) 20 { 21 Add(x); 22 } 23 24 void Put(T&&x) 25 { 26 Add(std::forward<T>(x)); 27 } 28 29 void Take(std::list<T>& list) 30 { 31 std::unique_lock<std::mutex> locker(m_mutex); 32 m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); }); 33 34 if (m_needStop) 35 return; 36 list = std::move(m_queue); 37 m_notFull.notify_one(); 38 } 39 40 void Take(T& t) 41 { 42 std::unique_lock<std::mutex> locker(m_mutex); 43 m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); }); 44 45 if (m_needStop) 46 return; 47 t = m_queue.front(); 48 m_queue.pop_front(); 49 m_notFull.notify_one(); 50 } 51 52 void Stop() 53 { 54 { 55 std::lock_guard<std::mutex> locker(m_mutex); 56 m_needStop = true; 57 } 58 m_notFull.notify_all(); 59 m_notEmpty.notify_all(); 60 } 61 62 bool Empty() 63 { 64 std::lock_guard<std::mutex> locker(m_mutex); 65 return m_queue.empty(); 66 } 67 68 bool Full() 69 { 70 std::lock_guard<std::mutex> locker(m_mutex); 71 return m_queue.size() == m_maxSize; 72 } 73 74 size_t Size() 75 { 76 std::lock_guard<std::mutex> locker(m_mutex); 77 return m_queue.size(); 78 } 79 80 int Count() 81 { 82 return m_queue.size(); 83 } 84 private: 85 bool NotFull() const 86 { 87 bool full = m_queue.size() >= m_maxSize; 88 if (full) 89 cout << "full, waiting,thread id: " << this_thread::get_id() << endl; 90 return !full; 91 } 92 93 bool NotEmpty() const 94 { 95 bool empty = m_queue.empty(); 96 if (empty) 97 cout << "empty,waiting,thread id: " << this_thread::get_id() << endl; 98 return !empty; 99 } 100 101 template<typename F> 102 void Add(F&&x) 103 { 104 std::unique_lock< std::mutex> locker(m_mutex); 105 m_notFull.wait(locker, [this]{return m_needStop || NotFull(); }); 106 if (m_needStop) 107 return; 108 109 m_queue.push_back(std::forward<F>(x)); 110 m_notEmpty.notify_one(); 111 } 112 113 private: 114 std::list<T> m_queue; //缓冲区 115 std::mutex m_mutex; //互斥量和条件变量结合起来使用 116 std::condition_variable m_notEmpty;//不为空的条件变量 117 std::condition_variable m_notFull; //没有满的条件变量 118 int m_maxSize; //同步队列最大的size 119 120 bool m_needStop; //停止的标志 121 }; 122 123 #endif // SYNCQUEUE_H
ThreadPool
1 #ifndef THREADPOOL_H 2 #define THREADPOOL_H 3 4 #include<list> 5 #include<thread> 6 #include<functional> 7 #include<memory> 8 #include <atomic> 9 #include "SyncQueue.hpp" 10 11 const int MaxTaskCount = 2000; 12 class ThreadPool 13 { 14 public: 15 using Task = std::function<void()>; 16 ThreadPool(int numThreads = std::thread::hardware_concurrency()) 17 : m_queue(MaxTaskCount), m_numThreads(numThreads) 18 { 19 // Start(numThreads); 20 } 21 22 ~ThreadPool(void) 23 { 24 //如果没有停止时则主动停止线程池 25 Stop(); 26 } 27 28 void Stop() 29 { 30 std::call_once(m_flag, [this]{StopThreadGroup(); }); //保证多线程情况下只调用一次StopThreadGroup 31 } 32 33 void AddTask(Task&&task) 34 { 35 m_queue.Put(std::forward<Task>(task)); 36 } 37 38 void AddTask(const Task& task) 39 { 40 m_queue.Put(task); 41 } 42 43 void Start() 44 { 45 m_running = true; 46 //创建线程组 47 for (int i = 0; i <m_numThreads; ++i) 48 { 49 m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this)); 50 } 51 } 52 private: 53 void RunInThread() 54 { 55 while (m_running) 56 { 57 //取任务分别执行 58 std::list<Task> list; 59 m_queue.Take(list); 60 61 for (auto& task : list) 62 { 63 if (!m_running) 64 return; 65 66 task(); 67 } 68 } 69 } 70 71 void StopThreadGroup() 72 { 73 m_queue.Stop(); //让同步队列中的线程停止 74 m_running = false; //置为false,让内部线程跳出循环并退出 75 76 for (auto thread : m_threadgroup) //等待线程结束 77 { 78 if (thread) 79 thread->join(); 80 } 81 m_threadgroup.clear(); 82 } 83 84 std::list<std::shared_ptr<std::thread>> m_threadgroup; //处理任务的线程组 85 SyncQueue<Task> m_queue; //同步队列 86 atomic_bool m_running; //是否停止的标志 87 std::once_flag m_flag; 88 int m_numThreads = 0; 89 }; 90 91 #endif // THREADPOOL_H
测试代码:
1 void testThreadPool() 2 { 3 ThreadPool pool; 4 std::thread thd1([&pool]{ 5 for(int i = 0; i < 10000; i++) 6 { 7 this_thread::sleep_for(std::chrono::seconds(1)); 8 auto thdid = this_thread::get_id(); 9 pool.AddTask([thdid]{ 10 std::cout << "同步层线程 1 的线程 ID:" << thdid << std::endl; 11 }); 12 } 13 }); 14 std::thread thd2([&pool]{ 15 for(int i = 0; i < 10000; i++) 16 { 17 this_thread::sleep_for(std::chrono::seconds(1)); 18 auto thdid = this_thread::get_id(); 19 pool.AddTask([thdid]{ 20 std::cout << "同步层线程 2 的线程 ID:" << thdid << std::endl; 21 }); 22 } 23 }); 24 pool.Start(); 25 this_thread::sleep_for(std::chrono::seconds(100)); 26 pool.Stop(); 27 thd1.join(); 28 thd2.join(); 29 }
标签:11,std,JFrame,return,void,queue,线程,include 来源: https://www.cnblogs.com/JackZheng/p/16272511.html