Libevent基础之 Reactor模式
作者:互联网
个人作品,未经允许禁止转载!
该代码意在说明reactor的原理,没有深究细节。
详见注释
#include<stdio.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <ctime>
#include <netinet/in.h>
#include <cerrno>
#include <cstdlib>
#include <cstring>
#include <arpa/inet.h>
#include <unistd.h>
#include <iostream>
using namespace std;
#define MAX_BUFFER_SIZE 1024
#define MAX_EVENTS_NUMBER (1024)
#define SERVER_PORT 9998
#define MAX_PENDING_CONNECTIONS 1024
// Reactor结构体,epoll_event中传入,当发生事件时,该事件会由epoll_event携带传出,从而获取到fd的历史信息和回调函数,这是reactor的核心
struct reactor_event_s{
int fd;
int events; // EPOLLIN or EPOLLOUT
void *arg; // Self defined data pointer
void (*callback)(int fd, int events, void *arg); // When events happens, call this callback
int valid; // Is the data valid in an array.etc
// 已经接受的数据及其长度,当一次epoll返回没有接收到完整消息时,数据可以暂时存放在这里,下一次接着使用
// 在有状态协议中,查找fd的历史信息会耗费大量时间,epoll可以帮忙管理历史数据,但是仍然使用了Log(n)的复杂度
// 这个问题可以用协程来解决,当该协程进入CPU调度时,协程的上下文已经放入了他的栈中,无需查找数据
char buffer[MAX_BUFFER_SIZE];
int len;
// 上次活跃事件,可以用于做超时处理。
long last_active; // Last active timestamp, for timeout
};
int epoll_fd;
// Reactor结构体数组,存放已有的Reactor信息
// 这里应该采取更合理的数据结构如链表进行管理
reactor_event_s reactor_events[MAX_EVENTS_NUMBER];
// 初始化一个监听socket,监听端口并将其加入epoll
// 这个监听socket的回调函数是 accpetconn
void init_listen_socket(int epfd, unsigned short port);
// 在指定的reactor_event_s地址上设置reactor信息
void event_set(reactor_event_s *re, int fd, void(*callback)(int, int, void*), void* arg);
// 向reactor添加event,并加入epoll
void event_add(int epfd, int events, reactor_event_s* re);
// 从reactor删除event,并从epoll删除
void event_del(int epfd, reactor_event_s* re);
// 监听socket的回调函数,内部会accept,然后生成新的reactor加入epoll
void acceptconn(int fd, int events, void* callback);
// 接收数据的回调,接收到之后进行回显
void recv_data(int fd, int events, void* callback);
// 发送数据的简单函数
void send_data(int fd, char* data, int len);
bool running = true;
int main(int argc, char* argv[]){
epoll_fd = epoll_create(MAX_EVENTS_NUMBER + 1);
init_listen_socket(epoll_fd, SERVER_PORT);
epoll_event events[MAX_EVENTS_NUMBER + 1];
while(running){
long now = time(nullptr);
// Check over 10s without data, disconnect
for (int i=0; i < MAX_EVENTS_NUMBER; i++){
if (reactor_events[i].valid == 1 && now - reactor_events[i].last_active >= 10){
close(reactor_events[i].fd);
printf("[fd=%d] timeout. disconnect...\n", reactor_events[i].fd);
event_del(epoll_fd, &reactor_events[i]);
}
}
// Check for new event
int nfd = epoll_wait(epoll_fd, events, MAX_EVENTS_NUMBER + 1, 0);
if (nfd < 0){
printf("epoll_awit");
break;
}
else if (nfd > 0){
// printf("%d\n", nfd);
}
for (int i=0; i<nfd; i++){
reactor_event_s* re= static_cast<reactor_event_s*>(events[i].data.ptr);
if ((events[i].events & EPOLLIN) && (re->events & EPOLLIN))
re->callback(re->fd, events[i].events, re->arg);
if ((events[i].events & EPOLLOUT) && (re->events & EPOLLOUT))
re->callback(re->fd, events[i].events, re->arg);
}
}
}
void event_set(reactor_event_s *re, int fd, void(*callback)(int, int, void*), void* arg){
re->fd = fd;
re->events = 0;
re->callback = callback;
re->arg = arg;
re->buffer[0] = '\0';
re->len = 0;
re->last_active = time(NULL);
return ;
}
void event_add(int epfd, int events, reactor_event_s* re){
epoll_event event{};
event.events = re->events = events;
event.data.ptr = re;
int op;
if (re->valid == 1){
op = EPOLL_CTL_MOD;
}
else{
op = EPOLL_CTL_ADD;
re->valid = 1;
}
if (epoll_ctl(epfd, op, re->fd, &event) < 0)
printf("%s: add event failed!\n", __func__);
else printf("%s: add event success!\n", __func__);
return ;
}
void init_listen_socket(int epfd, unsigned short port){
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(listen_fd, F_SETFL, O_NONBLOCK);
event_set(&reactor_events[MAX_EVENTS_NUMBER], listen_fd, acceptconn, &reactor_events[MAX_EVENTS_NUMBER]);
event_add(epfd, EPOLLIN, &reactor_events[MAX_EVENTS_NUMBER]);
sockaddr_in addrin{};
addrin.sin_family = AF_INET;
addrin.sin_addr.s_addr = INADDR_ANY;
addrin.sin_port = htons(port);
if (bind(listen_fd, (sockaddr*)&addrin, sizeof(addrin)) != 0){
perror("bind");
exit(-1);
}
if (listen(listen_fd, MAX_PENDING_CONNECTIONS) != 0){
perror("listen");
exit(-1);
}
return ;
}
void acceptconn(int listen_fd, int events, void* callback){
sockaddr_in caddr;
int client_fd;
socklen_t len = sizeof(caddr);
if ((client_fd = accept(listen_fd, (sockaddr*)&caddr, &len)) == -1){
if (errno != EAGAIN && errno != EINTR){
// handle error here
}
printf("[Error]: Failed to accept in %s: %s\n", __func__, strerror(errno));
return;
}
int i;
// 为什么这里使用 do while(0): https://www.cnblogs.com/lizhenghn/p/3674430.html
do{
for (i=0; i < MAX_EVENTS_NUMBER; i++){
if (reactor_events[i].valid == 0)
break;
}
if (i == MAX_EVENTS_NUMBER){
printf("%s: max connection limit exceeded!\n", __func__);
break;
}
if (fcntl(client_fd, F_SETFL, O_NONBLOCK) < 0){
printf("%s: fnctl set nonblocking failed!\n", __func__);
break;
}
event_set(&reactor_events[i], client_fd, recv_data, &reactor_events[i]);
event_add(epoll_fd, EPOLLIN, &reactor_events[i]);
} while(0);
printf("New connection [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(caddr.sin_addr), ntohs(caddr.sin_port), reactor_events[i].last_active, i);
}
void recv_data(int fd, int events, void* arg){
reactor_event_s *re = static_cast<reactor_event_s*>(arg);
re->last_active = time(nullptr);
int len;
len = recv(fd, re->buffer, MAX_BUFFER_SIZE, 0);
re->len = len;
re->buffer[len] = '\0';
if (len > 0) {
printf("Recv <= %s. Sending Back...\n", re->buffer);
send_data(fd, re->buffer, len);
}
return ;
}
void send_data(int fd, char* data, int len){
send(fd, data, len, 0);
}
void event_del(int epfd, reactor_event_s* re){
epoll_event event{};
if (re->valid != 1) return;
// FIXME: Why is this necessary ?
//event.data.ptr = re;
//event.data.fd = re->fd;
epoll_ctl(epfd, EPOLL_CTL_DEL, re->fd, &event);
re->valid = 0;
return ;
}
标签:Reactor,int,模式,event,re,fd,Libevent,events,reactor 来源: https://blog.csdn.net/WilliamCode/article/details/120675690