基于libevent线程池实现
作者:互联网
XThreadPool.h
#ifndef XTHREADPOOL_H
#define XTHREADPOOL_H
#include <vector>
class XThread;
class XTask;
class XThreadPool
{
private:
int threadCount = 0; // 线程数量
int lastThread = -1;
std::vector<XThread *> threads;
public:
// 单例模式
static XThreadPool *Get()
{
static XThreadPool p;
return &p;
}
// 初始化所有线程并启动线程
void Init(int threadCount);
// 分发线程
void Dispatch(XTask *task);
};
#endif
XThreadPool.cpp
#include <XTask.h>
#include <XThread.h>
#include <XThreadPool.h>
#include <iostream>
#include <thread>
using namespace std;
void XThreadPool::Init(int threadCount)
{
this->threadCount = threadCount;
this->lastThread = -1;
for (int i = 0; i < threadCount; i++)
{
XThread *t = new XThread();
t->id = i + 1; // 传递线程编号
cout << "create thread " << i << endl;
t->Start(); // 启动线程
threads.push_back(t);
this_thread::sleep_for(chrono::microseconds(10)); // 10ms
}
}
// 分发线程
void XThreadPool::Dispatch(XTask *task)
{
// 轮询
if (!task)
{
return;
}
int tid = (lastThread + 1) % threadCount;
lastThread = tid;
XThread *t = threads[tid];
t->AddTask(task);
// 激活线程
t->Activate();
}
XThread.h
#ifndef XTHREAD_H
#define XTHREAD_H
#include <event2/bufferevent.h>
#include <event2/event.h>
#include <list>
#include <mutex>
class XTask;
class XThread
{
public:
// 启动线程
void Start();
// 线程入口函数
void Main();
//安装线程,初始化event_base和管道监听事件,用于激活线程
bool Setup();
// 收到主线程发出的激活消息,(线程池的分发调用)
void Notify(evutil_socket_t fd, short which);
// 线程激活
void Activate();
// 添加处理的任务,一个线程可以同时处理多个任务,共用一个event_base
void AddTask(XTask *t);
int id = 0; // 线程编号
private:
int notify_send_fd = 0;
event_base *base = 0;
std::list<XTask *> tasks; // 任务列表
std::mutex tasks_mutex; // 线程安全互斥
};
#endif
XThread.cpp
#include "XThread.h"
#include <XTask.h>
#include <event2/buffer.h>
#include <event2/event.h>
#include <iostream>
#include <thread>
#include <unistd.h>
using namespace std;
// 激活线程任务的事件回调函数
static void NotifyCB(evutil_socket_t fd, short which, void *arg)
{
XThread *t = (XThread *)arg;
t->Notify(fd, which);
}
void XThread::Notify(evutil_socket_t fd, short which)
{
// 水平触发,只要没有接收完成,会再次进来
char buf[2] = {0};
int re = read(fd, buf, 1);
if (re <= 0)
{
return;
}
cout << "id = " << id << buf << endl;
XTask *task = NULL;
// 获取任务,并初始化任务
tasks_mutex.lock();
if (tasks.empty())
{
tasks_mutex.unlock();
return;
}
task = tasks.front(); // 先进先出
tasks.pop_front();
tasks_mutex.unlock();
task->Init();
}
// 添加处理的任务,一个线程可以同时处理多个任务,共用一个event_base
void XThread::AddTask(XTask *t)
{
if (!t)
return;
t->set_base(this->base);
tasks_mutex.lock();
tasks.push_back(t);
tasks_mutex.unlock();
}
// 线程激活
void XThread::Activate()
{
int re = write(this->notify_send_fd, "c", 1);
if (re <= 0)
{
cerr << "Activate failed" << endl;
}
}
// 启动线程
void XThread::Start()
{
Setup();
// 启动线程
thread th(&XThread::Main, this);
// 断开与主线程联系
th.detach();
}
// 线程入口函数
void XThread::Main()
{
cout << id << "void XThread::Main() begin" << endl;
if (!base)
{
cerr << "Thread::Main() failed! base is null" << endl;
}
event_base_dispatch(base);
event_base_free(base);
cout << id << "void XThread::Main() end" << endl;
}
//安装线程,初始化event_base和管道监听事件,用于激活线程
bool XThread::Setup()
{
// linux用管道 创建为管道,不能用send recv ,用read write
// fds[0] 读 fds[1]写
int fds[2];
if (pipe(fds))
{
cerr << "pipe failed" << endl;
return false;
}
// 读取绑定到event事件中,写入要保存
notify_send_fd = fds[1];
// 创建libevent上下文,无锁
event_config *ev_config = event_config_new();
event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
this->base = event_base_new_with_config(ev_config);
event_config_free(ev_config);
if (!base)
{
cerr << "event_base_new_with_config failed in thread" << endl;
}
// 添加管道监听事件,用于激活线程执行任务
event *ev = event_new(base, fds[0], EV_READ | EV_PERSIST, NotifyCB, this);
event_add(ev, NULL);
return true;
}
XTask.h
#ifndef XTASK_H
#define XTASK_H
#include <event2/event.h>
class XTask
{
public:
//初始化任务
virtual bool Init() = 0;
void set_sock(int sock)
{
this->sock = sock;
}
void set_threadid(int thread_id)
{
this->thread_id = thread_id;
}
int thread_idfunc()
{
return thread_id;
}
int sockfunc()
{
return sock;
}
event_base *basefunc()
{
return base;
}
void set_base(event_base *base)
{
this->base = base;
}
private:
event_base *base = 0;
int sock = 0;
int thread_id = 0;
};
#endif
标签:基于,int,void,XThread,base,线程,libevent,include 来源: https://www.cnblogs.com/meng-chao/p/16142827.html