libevent(十二)bufferevent filter zlib 压缩通信(二)
作者:互联网
使用zlib进行文件传输:
客户端:读取文件 -> 输出过滤器进行数据压缩 -> 发送数据
服务端:读取文件 -> 输入过滤器进行数据解压-> 存储数据
main.cpp
#include <event2/event.h>
#include <event2/listener.h>
#include <string.h>
#ifndef _WIN32
#include <signal.h>
#endif
#include <iostream>
#include <zlib.h>
using namespace std;
int main()
{
#ifdef _WIN32
//初始化socket库
WSADATA wsa;
WSAStartup(MAKEWORD(2, 2), &wsa);
#else
//忽略管道信号,发送数据给已关闭的socket
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
return 1;
#endif
std::cout << "test server!\n";
//创建libevent的上下文
event_base* base = event_base_new();
if (base)
{
cout << "event_base_new success!" << endl;
}
void Server(event_base * base);
Server(base);
void Client(event_base * base);
Client(base);
//事件分发处理
if (base)
event_base_dispatch(base);
if (base)
event_base_free(base);
#ifdef _WIN32
WSACleanup();
#endif
return 0;
}
zlib_server.cpp
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <string.h>
#ifndef _WIN32
#include <signal.h>
#endif
#include <iostream>
#include <string>
#include <zlib.h>
using namespace std;
#define SPORT 5001
struct Status
{
bool start = false;
FILE* fp = 0;
z_stream* p;
int recv_num = 0;
int write_num = 0;
~Status()
{
if (p)
{
inflateEnd(p);
}
delete p;
p = 0;
if (fp)
{
fclose(fp);
}
fp = 0;
}
};
bufferevent_filter_result filter_in(evbuffer* s, evbuffer* d,
ev_ssize_t limit, bufferevent_flush_mode mode, void* arg)
{
//1 接收客户端发送的文件名
Status* status = (Status*)arg;
if (!status->start)
{
char data[1024] = { 0 };
int len = evbuffer_remove(s, data, sizeof(data) - 1);
evbuffer_add(d, data, len);
return BEV_OK;
}
//解压
evbuffer_iovec v_in[1];
//读取数据,不清理缓冲
int n = evbuffer_peek(s, -1, NULL, v_in, 1);
if (n <= 0)
{
return BEV_NEED_MORE;
}
z_stream* p = status->p;
//zlib 输入数据大小
p->avail_in = v_in[0].iov_len;
//zlib 输入数据地址
p->next_in = (Byte*)v_in[0].iov_base;
//申请输出空间大小
evbuffer_iovec v_out[1];
evbuffer_reserve_space(d, 4096, v_out, 1);
//zlib 输出空间大小
p->avail_out = v_out[0].iov_len;
//zlib 输出空间地址
p->next_out = (Byte*)v_out[0].iov_base;
//解压数据
int re = inflate(p, Z_SYNC_FLUSH);
if (re != Z_OK)
{
cerr << "inflate failed!" << endl;
}
//解压用了多少数据,从source evbuffer中移除
//p->avail_in 未处理数据大小
int n_read = v_in[0].iov_len - p->avail_in;
//解压后数据大小 传入des evbuffer
//p->avail_out 剩余空间大小
int n_write = v_out[0].iov_len - p->avail_out;
//移除source evbuffer中数据
evbuffer_drain(s, n_read);
//传入des evbuffer
v_out[0].iov_len = n_write;
evbuffer_commit_space(d, v_out, 1);
cout << "Server n_read " << n_read << "\t n_write " << n_write << endl;
status->recv_num += n_read;
status->write_num += n_write;
return BEV_OK;
}
static void read_cb(bufferevent* bev, void* arg)
{
Status* status = (Status*)arg;
if (!status->start)
{
//001接收文件名
char data[1024] = { 0 };
bufferevent_read(bev, data, sizeof(data) - 1);
string out = "out/";
out += data;
//打开写入文件
status->fp = fopen(out.c_str(), "wb");
if (!status->fp)
{
cout << "server open " << out << " failed!" << endl;
return;
}
//002 回复OK
bufferevent_write(bev, "OK", 2);
status->start = true;
return;
}
do
{
//写入文件
char data[1024] = { 0 };
int len = bufferevent_read(bev, data, sizeof(data));
if (len >= 0)
{
fwrite(data, 1, len, status->fp);
fflush(status->fp);
}
} while (evbuffer_get_length(bufferevent_get_input(bev)) > 0);
}
static void event_cb(bufferevent* bev, short events, void* arg)
{
cout << "server event_cb " << events << endl;
Status* status = (Status*)arg;
if (events & BEV_EVENT_EOF)
{
cout << "server event BEV_EVENT_EOF success!" << endl;
cout << "Server recv = " << status->recv_num << endl;
cout << "Server write = " << status->write_num << endl;
delete status;
bufferevent_free(bev);
}
}
static void listen_cb(struct evconnlistener* e, evutil_socket_t s, struct sockaddr* a, int socklen, void* arg)
{
cout << "listen_cb" << endl;
event_base* base = (event_base*)arg;
//1 创建一个bufferevent 用来通信
bufferevent* bev = bufferevent_socket_new(base, s, BEV_OPT_CLOSE_ON_FREE);
Status* status = new Status();
status->p = new z_stream();
inflateInit(status->p);
//2 添加过滤 并设置输入回调
bufferevent* bev_filter = bufferevent_filter_new(bev,
filter_in, // 输入过滤函数
0, // 输出过滤
BEV_OPT_CLOSE_ON_FREE, // 关闭filter同时管理bufferevent
0, // 清理回调
status // 传递参数
);
//3 设置回调 读取 事件(处理连接断开)
bufferevent_setcb(bev_filter, read_cb, 0, event_cb, status);
bufferevent_enable(bev_filter, EV_READ | EV_WRITE);
}
void Server(event_base* base)
{
cout << "----begin Server----" << endl;
//监听端口(socket ,bind,listen 绑定事件)
sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(SPORT);
evconnlistener* ev = evconnlistener_new_bind(base, // libevent的上下文
listen_cb, // 接收到连接的回调函数
base, // 回调函数获取的参数 arg
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, // 地址重用,evconnlistener关闭同时关闭socket
10, // 连接队列大小,对应listen函数
(sockaddr*)&sin, // 绑定的地址和端口
sizeof(sin)
);
}
zlib_client.cpp
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <string.h>
#ifndef _WIN32
#include <signal.h>
#endif
#include <iostream>
#include <zlib.h>
using namespace std;
#define FILEPATH "001.txt"
struct ClientStatus
{
FILE* fp = 0;
bool end = false;
bool startSend = false;
z_stream* z_output = 0;
int readNum = 0;
int sendNum = 0;
~ClientStatus()
{
if (z_output)
{
deflateEnd(z_output);
}
delete z_output;
z_output = 0;
if (fp)
{
fclose(fp);
}
fp = 0;
}
};
bufferevent_filter_result filter_out(evbuffer* s, evbuffer* d,
ev_ssize_t limit, bufferevent_flush_mode mode, void* arg)
{
ClientStatus* sta = (ClientStatus*)arg;
//压缩文件,发送文件名消息去掉
if (!sta->startSend)
{
char data[1024] = { 0 };
int len = evbuffer_remove(s, data, sizeof(data));
evbuffer_add(d, data, len);
return BEV_OK;
}
//开始压缩文件(取出buffer中数据的引用)
evbuffer_iovec v_in[1];
int n = evbuffer_peek(s, -1, 0, v_in, 1);
if (n<=0)
{
//调用write回调, 清理空间
if (sta->end)
{
return BEV_OK;
}
//没有数据 BEV_NEED_MORE 不会进入写入回调
return BEV_NEED_MORE;
}
//记下zlib上下文
z_stream* p = sta->z_output;
if (!p)
{
return BEV_ERROR;
}
//zlib 输入数据大小
p->avail_in = v_in[0].iov_len;
//zlib 输入数据地址
p->next_in = (Byte*)v_in[0].iov_base;
//申请输出空间大小
evbuffer_iovec v_out[1];
evbuffer_reserve_space(d, 4096, v_out, 1);
//zlib 输出空间大小
p->avail_out = v_out[0].iov_len;
//zlib 输出空间地址
p->next_out = (Byte*)v_out[0].iov_base;
//压缩
int re = deflate(p, Z_SYNC_FLUSH);
if (re != Z_OK)
{
cerr << "deflate failed!" << endl;
}
//压缩用了多少数据,从source evbuffer中移除
//p->avail_in 未处理数据大小
int n_read = v_in[0].iov_len - p->avail_in;
//压缩后数据大小 传入des evbuffer
//p->avail_out 剩余空间大小
int n_write = v_out[0].iov_len - p->avail_out;
//移除source evbuffer中数据
evbuffer_drain(s, n_read);
//传入des evbuffer
v_out[0].iov_len = n_write;
evbuffer_commit_space(d, v_out, 1);
cout << "Client n_read " << n_read << "\t n_write " << n_write << endl;
sta->readNum += n_read;
sta->sendNum += n_write;
return BEV_OK;
}
static void client_read_cb(bufferevent* bev, void* arg)
{
ClientStatus* sta = (ClientStatus*)arg;
//002 接收服务端发送的OK回复
char data[1024] = { 0 };
int len = bufferevent_read(bev, data, sizeof(data) - 1);
if (strcmp(data, "OK") == 0)
{
cout << data << endl;
sta->startSend = true;
//开始发送文件,触发写入回调
bufferevent_trigger(bev, EV_WRITE, 0);
}
else
{
bufferevent_free(bev);
}
cout << "client_read_cb " << len << endl;
}
static void client_write_cb(bufferevent* bev, void* arg)
{
cout << "client_write_cb" << endl;
ClientStatus* s = (ClientStatus*)arg;
FILE* fp = s->fp;
//判断什么时候清理资源
if (s->end)
{
//判断缓冲是否有数据,如果有刷新
//获取过滤器绑定的buffer
bufferevent* be = bufferevent_get_underlying(bev);
//获取输出缓冲及其大小
evbuffer* evb = bufferevent_get_output(be);
int len = evbuffer_get_length(evb);
if (len <= 0)
{
cout << "Client readNum = " << s->readNum << endl;
cout << "Client sendNum = " << s->sendNum << endl;
//立刻清理 如果缓冲有数据,不会发送
bufferevent_free(bev);
delete s;
return;
}
//刷新缓冲
bufferevent_flush(bev, EV_WRITE, BEV_FINISHED);
return;
}
if (!fp)return;
//读取文件
char data[1024] = { 0 };
int len = fread(data, 1, sizeof(data), fp);
if (len <= 0)
{
fclose(fp);
s->end = true;
//刷新缓冲
bufferevent_flush(bev, EV_WRITE, BEV_FINISHED);
return;
}
//发送文件
bufferevent_write(bev, data, len);
}
static void client_event_cb(bufferevent* be, short events, void* arg)
{
cout << "client_event_cb " << events << endl;
if (events & BEV_EVENT_CONNECTED)
{
cout << "client BEV_EVENT_CONNECTED" << endl;
//001 发送文件名
bufferevent_write(be, FILEPATH, strlen(FILEPATH));
//初始化文件句柄
FILE* fp = fopen(FILEPATH, "rb");
if (!fp)
{
cout << "open file " << FILEPATH << " failed!" << endl;
}
ClientStatus* s = new ClientStatus();
s->fp = fp;
//初始化zlib上下文
s->z_output = new z_stream();
deflate(s->z_output, Z_DEFAULT_COMPRESSION);
//创建输出过滤
bufferevent* bev_filter = bufferevent_filter_new(be, 0, filter_out,
BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS, 0, s);
//设置读取、写入和事件的回调
bufferevent_setcb(bev_filter, client_read_cb, client_write_cb, client_event_cb, s);
bufferevent_enable(bev_filter, EV_READ | EV_WRITE);
}
}
void Client(event_base* base)
{
cout << "-----begin Client-----" << endl;
//连接服务端
sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(5001);
evutil_inet_pton(AF_INET, "127.0.0.1", &sin.sin_addr.s_addr);
bufferevent* bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
//只绑定事件回调,用来确认连接成功
bufferevent_enable(bev, EV_READ | EV_WRITE);
bufferevent_setcb(bev, 0, 0, client_event_cb, 0);
bufferevent_socket_connect(bev, (sockaddr*)&sin, sizeof(sin));
//接收回复确认OK
}
Makefile
test_buffer_filter_zlib:main.cpp zlib_server.cpp zlib_client.cpp
g++ $^ -o $@ -levent -lz
./$@
clean:
rm -rf test_buffer_filter_zlib
rm -rf *.o
标签:bufferevent,zlib,len,filter,evbuffer,include,data,out 来源: https://blog.csdn.net/wsp_1138886114/article/details/120996993