系统相关
首页 > 系统相关> > 线程池讲解(C++)|LinuxC/C++服务器开发

线程池讲解(C++)|LinuxC/C++服务器开发

作者:互联网

1. 前言

线程池是一般服务端或者客户端编程经常要用到的一种管理线程的类,从网上找到一个比较好的线程池实现,主要运用C++11。记录一下理解过程,帮助学习线程池和C++11。

150行代码手写线程池 :https://www.bilibili.com/video/BV1yQ4y1o7zv/

2. 代码

2.1 先上代码

threadPool.hpp

/*
 * @Author WangYubo
 * @Date 09/17/2018
 * @Description 线程池实现的头文件
 */
 
#ifndef THREAD_ _POOL_ _H
#define THREAD_ _POOL_ _H

#include <atomic>
#include <condition_ variable>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>

class threadPool {
    using Task = std::function<void(void)>;

   private:
    std::vector<std::thread> m_pool;   //线程池
    std::atomic<int> m_idleThreadNum;  //空闲线程数量
    std::atomic<bool> m_stoped;        //是否停止线程
    std::mutex m_lock;                 //线程池锁
    std::queue<Task> m_tasks;          //待执行任务
    std::condition_variable m_cv;      //线程控制
    int m_threadNum = 0;               //线程总数
    std::string m_poolName;            //线程池名称
    //线程执行函数
    void run();

   public:
    //构造函数
    threadPool() : m_stoped(true) {}
    ~threadPool();

    //添加线程函数
    template <class F, class... Args>
    auto commit(F &&f, Args &&... args) -> std::future<decltype(f(args...))> {
        using RetType = decltype(f(args...));

        if (m_stoped) {
            return std::future<RetType>();
        }

        auto task = std::make_shared<std::packaged_task<RetType()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        auto ret = task->get_future();
        {
            std::lock_guard<std::mutex> lock(m_lock);
            m_tasks.emplace([task] { (*task)(); });
        }
        m_cv.notify_one();
        return ret;
    }

    // 初始化
    virtual void init(int threadNum, std::string name = "ThreadPool");
};
#endif

threadPool.cpp

/*
 * @Author WangYubo
 * @Date 09/17/2018
 * @Description 线程池实现的cpp文件
 */

#include "threadPool . hpp"
#include "log . hpp"

using namespace std;

threadPool::~threadPool() {
    m_stoped = true;
    m_cv.notify_all();
    for (auto &tmp : m_pool) {
        if (tmp.joinable()) tmp.join();
    }
}

void threadPool::init(int threadNum, string name) {
    if (!m_stoped) {
        LOG_INFO("%s has been started, thread num %d", m_poolName.c_str(),
                 m_threadNum);
        return;
    }

    {
        unique_lock<mutex> lock(m_lock);
        if (!m_stoped) {
            return;
        }
        m_stoped = false;
        // 清理旧的线程,初始化新的线程
        m_pool.clear();

        for (int i = 0; i < threadNum; i++) {
            m_pool.emplace_back(thread(&threadPool::run, this));
        }
        m_threadNum = threadNum;
        m_poolName = name;
    }
    LOG_INFO("%s start thread num %d", m_poolName.c_str(), m_threadNum);
}

void threadPool::run() {
    while (true) {
        m_idleThreadNum--;
        Task task;
        {
            unique_lock<mutex> lock(m_lock);
            m_cv.wait(lock, [this] { return m_stoped || !(m_tasks.empty()); });
            if (m_tasks.empty()) {
                return;
            }
            task = move(m_tasks.front());
            m_tasks.pop();
        }
        LOG_DEBUG("Handle one task");
        task();
        m_idleThreadNum++;
    }
}

2.2. 实际应用示例

多线程实现从1累加到10000,使用原子变量实现线程同步
main.cpp

/*
 * @Author WangYubo
 * @Date 09/17/2018
 * @Description
 */

#include "baseInstance . hpp "
#include "threadPool . hpp"
#include "log .hpp"
#include <atomic>
#include <vector>
#include <future>


using namespace std;

class testPool : public threadPool, public BaseInstance<testPool> {
   public:
    void init(int threadNum) { threadPool::init(threadNum, "testPool"); }
};

atomic<int> addNum;
atomic<int> result;
int test() {
    int add = 0;
    while (true) {
        add = addNum++;
        if (add > 10000) {
            return add;
        }
        result += add;
    }
}

int main(int argC, char *arg[]) {
    LOG_DEBUG("Hello");
    testPool &pool = testPool::getInstance();
    pool.init(5);
    vector<future<int>> ret;

    for (int i = 0; i < 20; i++) {
        ret.emplace_back(pool.commit(test));
    }
    for (auto &tmp : ret) {
        tmp.wait();
    }

    LOG_DEBUG("End, result %d", result.load());
    return 0;
}

3. 详解

乍一看很复杂,但是实现的功能却很强大,支持传入任意参数的任务函数。获取线程结果使用wait函数,充分运用C++11的各种新特性future、atomic(用来不加锁)、condition_variable等。
但是这个线程函数不可以传入引用,由于bind的特性,只允许传入指针才可以修改外部变量。传入的参数都将成为值拷贝的形式。

3.1. 类的结构

class threadPool {
    using Task = std::function<void(void)>;

   private:
    std::vector<std::thread> m_pool;   //线程池
    std::atomic<int> m_idleThreadNum;  //空闲线程数量
    std::atomic<bool> m_stoped;        //是否停止线程
    std::mutex m_lock;                 //线程池锁
    std::queue<Task> m_tasks;          //待执行任务
    std::condition_variable m_cv;      //线程控制
    int m_threadNum = 0;               //线程总数
    std::string m_poolName;            //线程池名称
    //线程执行函数
    void run();

   public:
    //构造函数
    threadPool() : m_stoped(true) {}
    ~threadPool();

    //添加线程函数
    template <class F, class... Args>
    auto commit(F &&f, Args &&... args) -> std::future<decltype(f(args...))> {
        using RetType = decltype(f(args...));

        if (m_stoped) {
            return std::future<RetType>();
        }

        auto task = std::make_shared<std::packaged_task<RetType()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        auto ret = task->get_future();
        {
            std::lock_guard<std::mutex> lock(m_lock);
            m_tasks.emplace([task] { (*task)(); });
        }
        m_cv.notify_one();
        return ret;
    }

    // 初始化
    virtual void init(int threadNum, std::string name = "ThreadPool");
};

参数列表

  1. 使用一个vector存放各个线程,这个是真正意义上的池,线程使用c++的thread类
  2. 空闲线程数量和是否要停止线程标记,由于操作比较频繁,加锁会造成资源浪费,使用C++的atomic类定义原子变量
  3. 线程池的操作肯定是要加锁的,要用mutex。这个锁同时被用于控制线程,和condition_variable联合用。
  4. 外部传入的任务分发给线程池来做,这里就需要有一个任务队列,队列自然使用C++的queue,任务使用C++的function类定义一个void xxx(void)函数,为什么这样后面再解析
  5. 主要对线程的操作用到了condition_variable类,这里划重点,需要仔细理解,后面详解记得关注
  6. 线程总数和线程名字,纯属用来记录使用,内部打印线程池名称和线程数量使用

函数列表

3.2. 用到的C++11新特性详解

thread 线程类

#include <iostream>
#include <thread>

using namespace std;

void th_func()
{
    std::cout << "hello thread." << std::endl;
}

int main(int argc, char *argv[])
{
    std::thread t(th_func);
    t.join();

    return 0;
}

mutex 互斥量,用于加锁

在这里插入图片描述

示例

#include <thread>
#include <mutex>
#include <vector>
#include <iostream>
#include <algorithm>

std::mutex my_lock;

void add(int &num, int &sum){
    while(true){
        std::lock_guard<std::mutex> lock(my_lock);
        if (num > 100){
			break;
        }
        sum += num++;
    }
}

int main(){
    int sum = 0;
    int num = 0;
    std::vector<std::thread> ver;   //保存线程的vector
    for(int i = 0; i < 20; ++i){
        std::thread t = std::thread(add, std::ref(num), std::ref(sum));
        ver.emplace_back(std::move(t)); //保存线程
    }

    std::for_each(ver.begin(), ver.end(), std::mem_fn(&std::thread::join)); //join
    std::cout << sum << std::endl;
}

condition_variable 条件变量,线程管理主要用到的类

void threadPool::run() {
    while (true) {
        m_idleThreadNum--;
        Task task;
        {
            unique_lock<mutex> lock(m_lock);
            m_cv.wait(lock, [this] { return m_stoped || !(m_tasks.empty()); });
            if (m_tasks.empty()) {
                return;
            }
            task = move(m_tasks.front());
            m_tasks.pop();
        }
        LOG_DEBUG("Handle one task");
        task();
        m_idleThreadNum++;
    }
}

future 未来值获取

示例

int test1(promise<int> &promisObj) {
    sleep(10);
    LOG_DEBUG("set promise value");
	// promise变量赋值
    promisObj.set_value(10);
    sleep(10);
    LOG_DEBUG("return");
    return 0;
}

int main(int argC, char *arg[]) {
    LOG_DEBUG("Hello");
    testPool &pool = testPool::getInstance();
    pool.init(5);
    vector<future<int>> ret;
    // 声明promise
	promise<int> promisObj;
    // 起一个线程处理
    ret.emplace_back(pool.commit(test1, ref(promisObj)));
    // 获取future变量
	auto tmp = promisObj.get_future();
	// 等待变量赋值,这里会阻塞
	tmp.wait();
    LOG_DEBUG("get data %d", tmp.get());
    for (auto &tmp : ret) {
        tmp.wait();
    }

    return 0;
}

packaged_task

//添加线程函数
template <class F, class... Args>
auto commit(F &&f, Args &&... args) -> std::future<decltype(f(args...))> {
    using RetType = decltype(f(args...));

    if (m_stoped) {
        return std::future<RetType>();
    }

    auto task = std::make_shared<std::packaged_task<RetType()>>(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    auto ret = task->get_future();
    {
        std::lock_guard<std::mutex> lock(m_lock);
        m_tasks.emplace([task] { (*task)(); });
    }
    m_cv.notify_one();
    return ret;
}

forward 转移赋值

bind 绑定函数和参数列表

void fun_2(int &a,int &b)
{
    a++;
    b++;
    LOG_DEBUG("print a = %d, b = %d",a, b); 	// a = 4, b = 3
}

int main(int argC, char *arg[]) {
    LOG_DEBUG("Hello");
    int m = 2;
    int n = 3;
    auto f4 = std::bind(fun_2, n, placeholders::_1); //表示绑定fun_2的第一个参数为n, fun_2的第二个参数由调用f4的第一个参数(_1)指定。
    f4(m);
    LOG_DEBUG("m %d", m);	// 3
    LOG_DEBUG("n %d", n);	// 3
    return 0;
}

3.3. 具体实现函数详解

最新C/C++linux服务器开发/架构师面试题、学习资料、教学视频和学习路线脑图(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享>>>>资料获取
在这里插入图片描述

标签:std,task,int,lock,C++,LinuxC,future,线程
来源: https://blog.csdn.net/weixin_52622200/article/details/117130648