C++11 线程池
作者:互联网
cpp
#include<thread>
#include<mutex>
#include<vector>
#include<condition_variable>
#include<boost/circular_buffer.hpp>
#include<functional>
#include<atomic>
#include<cassert>
class ThreadPool
{
public:
explicit ThreadPool(size_t n) :q_{ n }, running_{ false }{}
~ThreadPool()
{
Stop();
}
void Start(size_t n)
{
if (running_)return;
running_ = true;
threads_.reserve(n);
while (n--)
{
threads_.emplace_back(&ThreadPool::Worker, this);
}
}
template<typename Fun>
void submit(Fun f)
{
std::unique_lock<std::mutex> lk{ mtx_ };
not_full_.wait(lk, [this] {return !running_ || !q_.full(); });
assert(!running_||!q_.full());
if (!running_) return;
q_.push_back(std::move(f));
not_empty_.notify_one();
}
void Stop()
{
if (!running_)return;;
running_ = false;
not_full_.notify_all();
not_empty_.notify_all();
for (auto& th : threads_)
{
if (th.joinable())th.join();
}
}
private:
void Worker()
{
while (true)
{
task t;
{
std::unique_lock<std::mutex> lk{ mtx_ };
not_empty_.wait(lk, [this] {return !running_ || !q_.empty(); });
assert(!running_ || !q_.empty());
if (!running_)return;
t = std::move(q_.front()) ;
q_.pop_front();
not_full_.notify_one();
}
t();
}
}
private:
using task = std::function<void()>;
std::vector<std::thread>threads_;
boost::circular_buffer<task>q_;
std::mutex mtx_;
std::condition_variable not_full_;
std::condition_variable not_empty_;
std::atomic<bool> running_;
};
#include<iostream>
int main()
{
ThreadPool pool{ 8 };
std::mutex mtx;
pool.Start(4);
for (int i = 0; i < 100; ++i)
{
pool.submit(
[i]{
std::cout << i << " " << std::this_thread::get_id() << std::endl;
});
}
return 0;
}
可传参带返回值的
#include <vector>
#include <thread>
#include <boost/circular_buffer.hpp>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>
#include <atomic>
class task {
public:
task() = default;
template <class Fun>
explicit task(Fun&& f) : ptr_{ new wrapper{ std::move(f) } } {}
void operator()() { ptr_->call(); }
private:
class wrapper_base {
public:
virtual void call() = 0;
virtual ~wrapper_base() = default;
};
template <class Fun>
class wrapper : public wrapper_base {
public:
explicit wrapper(Fun&& f) : f_{ std::move(f) } {}
virtual void call() override { f_(); }
private:
Fun f_;
};
std::unique_ptr<wrapper_base> ptr_;
};
class thread_pool {
public:
explicit thread_pool(size_t n) : q_{ n }, running_{ false } {}
~thread_pool() { stop(); }
void start(size_t n) {
if (running_) return;
running_ = true;
threads_.reserve(n);
while (n--) {
threads_.emplace_back(&thread_pool::worker, this);
}
}
void stop() {
if (!running_) return;
running_ = false;
not_full_.notify_all();
not_empty_.notify_all();
for (auto& t : threads_) {
if (t.joinable()) t.join();
}
}
template <class Fun,typename ...Args>
auto submit(Fun&& f,Args&&...args)->std::future<decltype(f(args...))> {
using RetType = decltype(f(args...));
auto pt = std::make_unique<std::packaged_task<RetType()>>( std::bind(std::forward<Fun>(f), std::forward<Args>(args)...) );
auto ret = pt->get_future();
task t{ std::move(*pt) };
{
std::unique_lock lk{ m_ };
not_full_.wait(lk, [this] { return !running_ || !q_.full(); });
assert(!running_ || !q_.full());
if (!running_) return {};
q_.push_back(std::move(t));
not_empty_.notify_one();
}
return ret;
}
private:
void worker() {
while (true) {
task t;
{
std::unique_lock lk{ m_ };
not_empty_.wait(lk, [this] { return !running_ || !q_.empty(); });
assert(!running_ || !q_.empty());
if (!running_) return;
t = std::move(q_.front());
q_.pop_front();
not_full_.notify_one();
}
t();
}
}
std::vector<std::thread> threads_;
boost::circular_buffer<task> q_;
std::mutex m_;
std::condition_variable not_full_;
std::condition_variable not_empty_;
std::atomic<bool> running_;
};
#include <iostream>
int func(int a)
{
std::cout << "id:" << std::this_thread::get_id() << std::endl;
return a;
}
int main() {
thread_pool pool{ 10 };
pool.start(3);
std::vector<std::future<int>> rets;
for (int i = 0; i < 100; i++) {
auto ret = pool.submit(func, i);
rets.push_back(std::move(ret));
}
for (auto& ret : rets) {
std::cout << ret.get() << " ";
}
}
标签:11,std,full,return,C++,_.,running,线程,include 来源: https://www.cnblogs.com/chengmf/p/15987224.html