其他分享
首页 > 其他分享> > 基于libevent线程池实现

基于libevent线程池实现

作者:互联网

XThreadPool.h

#ifndef XTHREADPOOL_H
#define XTHREADPOOL_H
#include <vector>

class XThread;
class XTask;
class XThreadPool
{
private:
    int threadCount = 0; // 线程数量
    int lastThread = -1;
    std::vector<XThread *> threads;

public:
    // 单例模式
    static XThreadPool *Get()
    {
        static XThreadPool p;
        return &p;
    }

    // 初始化所有线程并启动线程
    void Init(int threadCount);

    // 分发线程
    void Dispatch(XTask *task);
};
#endif

XThreadPool.cpp

#include <XTask.h>
#include <XThread.h>
#include <XThreadPool.h>
#include <iostream>
#include <thread>

using namespace std;

void XThreadPool::Init(int threadCount)
{
    this->threadCount = threadCount;
    this->lastThread = -1;
    for (int i = 0; i < threadCount; i++)
    {
        XThread *t = new XThread();
        t->id = i + 1; // 传递线程编号
        cout << "create thread " << i << endl;
        t->Start(); // 启动线程
        threads.push_back(t);
        this_thread::sleep_for(chrono::microseconds(10)); // 10ms
    }
}

// 分发线程
void XThreadPool::Dispatch(XTask *task)
{
    // 轮询
    if (!task)
    {
        return;
    }
    int tid = (lastThread + 1) % threadCount;
    lastThread = tid;
    XThread *t = threads[tid];

    t->AddTask(task);
    // 激活线程
    t->Activate();
}

XThread.h

#ifndef XTHREAD_H
#define XTHREAD_H
#include <event2/bufferevent.h>
#include <event2/event.h>
#include <list>
#include <mutex>
class XTask;
class XThread
{
public:
    // 启动线程
    void Start();

    // 线程入口函数
    void Main();

    //安装线程,初始化event_base和管道监听事件,用于激活线程
    bool Setup();

    // 收到主线程发出的激活消息,(线程池的分发调用)
    void Notify(evutil_socket_t fd, short which);

    // 线程激活
    void Activate();

    // 添加处理的任务,一个线程可以同时处理多个任务,共用一个event_base
    void AddTask(XTask *t);

    int id = 0; // 线程编号
private:
    int notify_send_fd = 0;
    event_base *base = 0;

    std::list<XTask *> tasks; // 任务列表
    std::mutex tasks_mutex;   // 线程安全互斥
};
#endif

XThread.cpp

#include "XThread.h"
#include <XTask.h>
#include <event2/buffer.h>
#include <event2/event.h>
#include <iostream>
#include <thread>
#include <unistd.h>

using namespace std;

// 激活线程任务的事件回调函数
static void NotifyCB(evutil_socket_t fd, short which, void *arg)
{
    XThread *t = (XThread *)arg;
    t->Notify(fd, which);
}

void XThread::Notify(evutil_socket_t fd, short which)
{
    // 水平触发,只要没有接收完成,会再次进来
    char buf[2] = {0};
    int re = read(fd, buf, 1);
    if (re <= 0)
    {
        return;
    }
    cout << "id = " << id << buf << endl;
    XTask *task = NULL;

    // 获取任务,并初始化任务
    tasks_mutex.lock();
    if (tasks.empty())
    {
        tasks_mutex.unlock();
        return;
    }
    task = tasks.front(); // 先进先出
    tasks.pop_front();
    tasks_mutex.unlock();
    task->Init();
}

// 添加处理的任务,一个线程可以同时处理多个任务,共用一个event_base
void XThread::AddTask(XTask *t)
{
    if (!t)
        return;
    t->set_base(this->base);
    tasks_mutex.lock();
    tasks.push_back(t);
    tasks_mutex.unlock();
}

// 线程激活
void XThread::Activate()
{
    int re = write(this->notify_send_fd, "c", 1);
    if (re <= 0)
    {
        cerr << "Activate failed" << endl;
    }
}

// 启动线程
void XThread::Start()
{
    Setup();
    // 启动线程
    thread th(&XThread::Main, this);

    // 断开与主线程联系
    th.detach();
}

// 线程入口函数
void XThread::Main()
{
    cout << id << "void XThread::Main() begin" << endl;
    if (!base)
    {
        cerr << "Thread::Main() failed! base is null" << endl;
    }
    event_base_dispatch(base);
    event_base_free(base);
    cout << id << "void XThread::Main() end" << endl;
}

//安装线程,初始化event_base和管道监听事件,用于激活线程
bool XThread::Setup()
{
    // linux用管道 创建为管道,不能用send recv ,用read write
    // fds[0] 读 fds[1]写
    int fds[2];
    if (pipe(fds))
    {
        cerr << "pipe failed" << endl;
        return false;
    }

    // 读取绑定到event事件中,写入要保存
    notify_send_fd = fds[1];

    // 创建libevent上下文,无锁
    event_config *ev_config = event_config_new();
    event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
    this->base = event_base_new_with_config(ev_config);
    event_config_free(ev_config);
    if (!base)
    {
        cerr << "event_base_new_with_config failed in thread" << endl;
    }

    // 添加管道监听事件,用于激活线程执行任务
    event *ev = event_new(base, fds[0], EV_READ | EV_PERSIST, NotifyCB, this);
    event_add(ev, NULL);

    return true;
}

XTask.h

#ifndef XTASK_H
#define XTASK_H
#include <event2/event.h>
class XTask
{
public:
//初始化任务
    virtual bool Init() = 0;
    
    void set_sock(int sock)
    {
        this->sock = sock;
    }

    void set_threadid(int thread_id)
    {
        this->thread_id = thread_id;
    }

    int thread_idfunc()
    {
        return thread_id;
    }

    int sockfunc()
    {
        return sock;
    }

    event_base *basefunc()
    {
        return base;
    }

    void set_base(event_base *base)
    {
        this->base = base;
    }

private:
    event_base *base = 0;
    int sock = 0;
    int thread_id = 0;

    
};
#endif

标签:基于,int,void,XThread,base,线程,libevent,include
来源: https://www.cnblogs.com/meng-chao/p/16142827.html