编程语言
首页 > 编程语言> > C++ IOCP模型代码(改之小猪代码)

C++ IOCP模型代码(改之小猪代码)

作者:互联网

昨天看了一下小猪的代码,自己也下载下来学习了一下,发现存在内存泄漏、线程出现错误一直等待I/O完成导致一直休眠的问题,

然后按照自己的代码风格修改了一些问题,现在不会出现内存泄漏和常见线程错误导致一直休眠的问题了。代码如下,感兴趣的小伙伴可以自己复制运行测试

推荐这篇转载至小猪的文章,有配图https://blog.csdn.net/weixin_30360497/article/details/95383219

iocp_model.h

  1 #pragma once
  2 #include <WinSock2.h>
  3 #include <MSWSock.h>
  4 #include <Ws2tcpip.h>
  5 #include <cstdint>
  6 #include <vector>
  7 #include <map>
  8 #include "thread_pool.h"
  9 #include "lock_free_stack.h"
 10 
 11 namespace iocp {
 12     using fun1 = typename std::function<void(SOCKADDR_IN*, char*)>;
 13 
 14     // 缓存大小
 15     const uint16_t buff_len = 4096;
 16     // 投递Accept请求数量(灵活设置:根据流量大小)
 17     const uint8_t post_accept_num = 8;
 18     // iocp_model函数指针
 19     class iocp_model;
 20 
 21     // 操作类型--_OPERATION_TYPE
 22     enum class operation_type {
 23         null,
 24         accept,        //连接
 25         send,        //发送
 26         recv,        //接收
 27         close,        //关闭
 28     };
 29 
 30     // I/O操作--_PER_IO_CONTEXT
 31     struct io_operate {
 32         OVERLAPPED overlapped;            //I/O信息               
 33         SOCKET accept;                    //连接套接字
 34         WSABUF wsabuf;                    //存储数据缓冲区 
 35         char buffer[buff_len];            //缓冲区
 36         operation_type type;            //I/O操作类型
 37         io_operate();
 38         ~io_operate();
 39         // 重置缓冲区
 40         void reset_buf();
 41     };
 42 
 43     // socket信息--_PER_SOCKET_CONTEXT
 44     struct io_data {
 45         SOCKET socket;                            //套接字
 46         SOCKADDR_IN addr_info;                    //地址信息
 47         std::vector<io_operate*> operate_data;    //I/O操作上下文
 48         io_data();
 49         ~io_data();
 50         // 添加I/O操作数据
 51         io_operate* add_io_operate();
 52         // 删除指定I/O操作数据
 53         void del_io_operate(io_operate* data);
 54     };
 55 
 56     // iocp模型
 57     class iocp_model {
 58     private:
 59         thread_pool* _thread_pool;            //线程池
 60         io_data _listen;                    //监听信息
 61         std::map<SOCKET, io_data*> _client;    //客户端信息
 62         uint8_t _work_num;                    //工作线程数
 63         HANDLE    _exit;                        //退出事件句柄
 64         HANDLE    _complete_port;                //完成端口句柄
 65         io_operate _io_operate;                //io标记
 66         CRITICAL_SECTION _critical;            //临界区
 67         LPFN_ACCEPTEX _acceptex_ptr;                    //AcceptEx函数指针
 68         LPFN_GETACCEPTEXSOCKADDRS _acceptex_addr_ptr;    //GetAcceptExSockaddrs函数指针
 69         std::shared_ptr<fun1> _data_process;            //recv数据处理函数
 70 
 71         // 初始化socket
 72         void init_socket(uint16_t port);
 73         // 清理资源
 74         void clear_all();
 75         // 添加客户端数据
 76         void add_client(io_data* data);
 77         // 删除客户端数据
 78         void del_client(SOCKET sock);
 79         // 查找客户端数据
 80         std::map<SOCKET, io_data*>::iterator find_client(SOCKET sock);
 81         // 投递accept请求
 82         void post_accept(io_operate* io);
 83         // 投递recv请求
 84         bool post_recv(io_data* data, io_operate* operate);
 85         // 连接处理
 86         void accept_process(io_data* data, io_operate* operate);
 87         // 接收处理
 88         void recv_process(io_data* data, io_operate* operate);
 89         // 断开处理
 90         void close_process(io_data* data, io_operate* operate);
 91     protected:
 92     public:
 93         iocp_model();
 94         ~iocp_model();
 95         // 启动
 96         void start(uint16_t port);
 97         // 关闭
 98         void close();
 99         // 设置recv数据处理函数
100         void set_data_process(fun1&& func);
101     };
102 }

iocp_model.cpp

  1 #include "iocp_model.h"
  2 #include <assert.h>
  3 #include <stdio.h>
  4 #include <Windows.h>
  5 #pragma comment(lib, "Mswsock.lib")
  6 #pragma comment(lib, "ws2_32.lib")
  7 
  8 #define del_ptr(ptr) if (ptr) { delete ptr; ptr = nullptr; }
  9 #define clear_socket(socket) if (socket != INVALID_SOCKET) { closesocket(socket); socket = INVALID_SOCKET; }
 10 #define clear_handle(handle) if (handle != NULL && handle != INVALID_HANDLE_VALUE) { CloseHandle(handle); handle = NULL; }
 11 #define recv_func(addr, data) if (_data_process) { (*_data_process)(addr, data); }
 12 
 13 namespace iocp {
 14     inline io_operate::io_operate() : overlapped({}), accept(INVALID_SOCKET), wsabuf({ buff_len, buffer }), type(operation_type::null) {
 15         reset_buf();
 16     }
 17     inline io_operate::~io_operate() {
 18         //clear_socket(accept);        关闭这个socket之后总是出现955或者1236错误,应该是有io操作未完成导致的,还没解决..
 19     }
 20     void io_operate::reset_buf() {
 21         memset(buffer, 0, buff_len);
 22     }
 23 
 24     inline io_data::io_data() : socket(INVALID_SOCKET), addr_info({}) {}
 25     inline io_data::~io_data() {
 26         if (socket != INVALID_SOCKET) {
 27             clear_socket(socket);
 28         }
 29         for (auto data : operate_data) {
 30             delete data;
 31         }
 32         operate_data.clear();
 33     }
 34     // 添加I/O操作数据
 35     io_operate* io_data::add_io_operate() {
 36         operate_data.push_back(new io_operate);
 37         return operate_data.back();
 38     }
 39     // 删除指定I/O操作数据
 40     void io_data::del_io_operate(io_operate* data) {
 41         auto re = std::find(operate_data.begin(), operate_data.end(), data);
 42         if (re != operate_data.end()) {
 43             operate_data.erase(re);
 44         }
 45     }
 46 
 47     iocp_model::iocp_model() : _thread_pool(&thread_pool::instance()), _work_num(2 * std::thread::hardware_concurrency()), _exit(NULL), _complete_port(NULL), _data_process(nullptr) {}
 48     iocp_model::~iocp_model() {}
 49 
 50     void iocp_model::init_socket(uint16_t port) {
 51         try {
 52             WSADATA wsa;
 53             assert(WSAStartup(MAKEWORD(2, 2), &wsa) == 0);
 54             if (LOBYTE(wsa.wVersion) != 2 || HIBYTE(wsa.wVersion) != 2) {
 55                 throw "版本号错误";
 56             }
 57             // 构造监听Socket
 58             _listen.socket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED);
 59             if (_listen.socket == INVALID_SOCKET) {
 60                 throw "初始化监听套接字失败";
 61             }
 62             // 绑定监听Socket到完成端口
 63             if (CreateIoCompletionPort((HANDLE)_listen.socket, _complete_port, (ULONG_PTR)&_listen, 0) == NULL) {
 64                 clear_socket(_listen.socket);
 65                 throw "绑定监听套接字到完成端口失败";
 66             }
 67             // 配置监听信息
 68             SOCKADDR_IN sockaddr = {};
 69             sockaddr.sin_family = AF_INET;                //协议簇(使用的网络协议)
 70             sockaddr.sin_addr.S_un.S_addr = ADDR_ANY;    //(任何地址)
 71             sockaddr.sin_port = htons(port);            //(端口号)
 72             // 绑定端口
 73             assert(bind(_listen.socket, (SOCKADDR*)&sockaddr, sizeof(SOCKADDR)) != SOCKET_ERROR);
 74             // 开始监听
 75             assert(listen(_listen.socket, SOMAXCONN) != SOCKET_ERROR);
 76         }
 77         catch (const char* err) {
 78             printf("%s(Error: %d)\n", err, WSAGetLastError());
 79             WSACleanup();
 80         }
 81     }
 82 
 83     void iocp_model::clear_all() {
 84         // 清理客户端数据
 85         EnterCriticalSection(&_critical);
 86         for (auto data : _client) {
 87             del_ptr(data.second);
 88         }
 89         for (auto data : _listen.operate_data) {
 90             del_ptr(data);
 91         }
 92         _client.clear();
 93         _listen.operate_data.clear();
 94         LeaveCriticalSection(&_critical);
 95         // 释放临界区
 96         DeleteCriticalSection(&_critical);
 97         // 释放句柄
 98         clear_handle(_exit);
 99         clear_handle(_complete_port);
100         // 关闭socket
101         clear_socket(_listen.socket);
102     }
103 
104     void iocp_model::add_client(io_data* data) {
105         _client[data->socket] = data;
106         printf("accept_client num: %zd\n", _client.size());
107     }
108 
109     void iocp_model::del_client(SOCKET sock) {
110         EnterCriticalSection(&_critical);
111         auto re = _client.find(sock);
112         if (re != _client.end()) {
113             del_ptr(re->second);
114             _client.erase(re);
115             printf("close_client num: %zd\n", _client.size());
116         }
117         LeaveCriticalSection(&_critical);
118     }
119 
120     std::map<SOCKET, io_data*>::iterator iocp_model::find_client(SOCKET sock) {
121         auto re = _client.find(sock);
122         if (re != _client.end()) {
123             return re;
124         }
125         return _client.end();
126     }
127 
128     void iocp_model::post_accept(io_operate* operate) {
129         assert(_listen.socket != INVALID_SOCKET);
130         try {
131             // 设置io操作数据
132             operate->type = operation_type::accept;
133             operate->reset_buf();
134             // 准备参数
135             DWORD bytes = 0;
136             DWORD addr_len = sizeof(SOCKADDR_IN) + 16;
137             WSABUF* wsabuf = &operate->wsabuf;
138             OVERLAPPED* overlapped = &operate->overlapped;
139             // 准备客户端socket
140             operate->accept = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
141             if (operate->accept == INVALID_SOCKET) {
142                 throw "创建Socket失败";
143             }
144             // 投递AcceptEx请求
145             if (!_acceptex_ptr(_listen.socket, operate->accept, wsabuf->buf, wsabuf->len - (addr_len * 2), addr_len, addr_len, &bytes, overlapped) && WSAGetLastError() != WSA_IO_PENDING) {
146                 throw "创建AcceptEx请求失败";
147             }
148         }
149         catch (const char* err) {
150             printf("%s(Error: %d)\n", err, WSAGetLastError());
151             EnterCriticalSection(&_critical);
152             _listen.del_io_operate(operate);
153             LeaveCriticalSection(&_critical);
154         }
155     }
156 
157     bool iocp_model::post_recv(io_data* data, io_operate* operate) {
158         assert(_listen.socket != INVALID_SOCKET);
159         // 设置io操作数据
160         operate->reset_buf();
161         operate->type = operation_type::recv;
162         // 准备参数
163         DWORD flags = NULL, bytes = NULL;
164         WSABUF* wsabuf = &operate->wsabuf;
165         OVERLAPPED* overlapped = &operate->overlapped;
166         // 投递recv请求
167         if ((SOCKET_ERROR == WSARecv(operate->accept, wsabuf, 1, &bytes, &flags, overlapped, NULL)) && (WSA_IO_PENDING != WSAGetLastError())) {
168             EnterCriticalSection(&_critical);
169             data->del_io_operate(operate);
170             LeaveCriticalSection(&_critical);
171             return false;
172         }
173         return true;
174     }
175 
176     void iocp_model::accept_process(io_data* data, io_operate* operate) {
177         // 准备参数
178         try {
179             SOCKADDR_IN* client_addr = nullptr;
180             SOCKADDR_IN* local_addr = nullptr;
181             int addr_len = sizeof(SOCKADDR_IN);
182             char ip[16] = {};
183             // 通过GetAcceptExSockAddrs获取客户端/本地SOCKADDR_IN以及客户端第一组数据
184             _acceptex_addr_ptr(operate->wsabuf.buf, operate->wsabuf.len - ((addr_len + 16) * 2), addr_len + 16, addr_len + 16, (LPSOCKADDR*)&local_addr, &addr_len, (LPSOCKADDR*)&client_addr, &addr_len);
185             printf("客户端 %s:%d 连接到服务器\n", inet_ntop(AF_INET, &client_addr->sin_addr, ip, sizeof(ip)), ntohs(client_addr->sin_port));
186             // 处理接收的数据
187             recv_func(client_addr, operate->buffer);
188             // 备份当前客户端数据
189             io_data* new_data = new io_data;
190             new_data->socket = operate->accept;
191             memcpy_s(&(new_data->addr_info), addr_len, client_addr, addr_len);
192             // 绑定客户端socket到完成端口
193             if (CreateIoCompletionPort((HANDLE)new_data->socket, _complete_port, (ULONG_PTR)operate, 0) == 0) {
194                 del_ptr(new_data);
195                 throw "绑定客户端socket到完成端口失败";
196             }
197             // 投递recv的I/O请求
198             io_operate* new_operate = new_data->add_io_operate();
199             new_operate->type = operation_type::recv;
200             new_operate->accept = new_data->socket;
201             EnterCriticalSection(&_critical);
202             if (!post_recv(new_data, new_operate)) {
203                 new_data->del_io_operate(new_operate);
204                 LeaveCriticalSection(&_critical);
205                 throw "投递recv请求失败";        // C26115
206             }
207             // 添加客户端信息
208             add_client(new_data);
209             LeaveCriticalSection(&_critical);
210             // 重新投递accept请求
211             post_accept(operate);
212         }
213         catch (const char* err) {
214             printf("%s(Error: %d)\n", err, WSAGetLastError());
215         }
216     }
217 
218     void iocp_model::recv_process(io_data* data, io_operate* operate) {
219         auto re = find_client(operate->accept);
220         if (re != _client.end()) {
221             recv_func(&re->second->addr_info, operate->buffer);
222         }
223         post_recv(data, operate);
224     }
225 
226     void iocp_model::close_process(io_data* data, io_operate* operate) {
227         EnterCriticalSection(&_critical);
228         auto re = find_client(operate->accept);
229         if (re != _client.end()) {
230             char ip[16];
231             printf("客户端 %s:%d 关闭连接\n", inet_ntop(AF_INET, &re->second->addr_info.sin_addr, ip, sizeof(ip)), ntohs(re->second->addr_info.sin_port));
232             del_ptr(re->second);
233             _client.erase(re);
234         }
235         printf("close_client num: %zd\n", _client.size());
236         LeaveCriticalSection(&_critical);
237         //del_client(operate->accept);
238     }
239 
240     void iocp_model::start(uint16_t port) {
241         // 初始化临界区
242         InitializeCriticalSection(&_critical);
243         // 建立完成端口
244         _complete_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
245         assert(_complete_port != NULL);
246         // 初始化socket
247         init_socket(port);
248         try {
249             GUID guid_acceptex = WSAID_ACCEPTEX;
250             GUID guid_acceptex_addr = WSAID_GETACCEPTEXSOCKADDRS;
251             DWORD bytes = 0;
252             // 获取AcceptEx函数指针
253             if (WSAIoctl(_listen.socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid_acceptex, sizeof(GUID), &_acceptex_ptr, sizeof(LPFN_ACCEPTEX), &bytes, NULL, NULL) == SOCKET_ERROR) {
254                 throw "获取AcceptEx函数指针失败";
255             }
256             // 获取GetAcceptExSockAddrs函数指针
257             if (WSAIoctl(_listen.socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid_acceptex_addr, sizeof(GUID), &_acceptex_addr_ptr, sizeof(LPFN_GETACCEPTEXSOCKADDRS), &bytes, NULL, NULL) == SOCKET_ERROR) {
258                 throw "获取GetAcceptExSockAddrs函数指针失败";
259             }
260             for (int i = 0; i < post_accept_num; ++i) {
261                 // 投递AcceptEx请求
262                 post_accept(_listen.add_io_operate());
263             }
264         }
265         catch (const char* err) {
266             printf("%s(Error: %d)\n", err, WSAGetLastError());
267             clear_all();
268         }
269         // 建立工作线程
270         std::atomic<uint8_t> id(0);
271         std::atomic<uint16_t> count(0);
272         for (int i = 0; i < _work_num; ++i) {
273             _thread_pool->add([&](uint8_t id) {
274                 printf("工作线程%d启动\n", id);
275                 // 准备参数
276                 io_data* data = nullptr;
277                 OVERLAPPED* overlapped = nullptr;
278                 BOOL status;
279                 DWORD bytes = 0;
280                 io_operate* operate = nullptr;
281                 while (WaitForSingleObject(_exit, 0) != WAIT_OBJECT_0) {
282                     try {
283                         status = GetQueuedCompletionStatus(_complete_port, &bytes, (PULONG_PTR)&data, &overlapped, INFINITE);
284                         if (data == NULL) {
285                             break;
286                         }
287                         if (!status) {
288                             DWORD err = GetLastError();
289                             if (err == WAIT_TIMEOUT) {
290                                 if (send(data->socket, "", 0, 0) == SOCKET_ERROR) {
291                                     throw "客户端异常断开";
292                                 }
293                                 else {
294                                     throw "等待的操作过时";
295                                 }
296                             }
297                             else if (err == ERROR_NETNAME_DELETED) {
298                                 throw "指定的网络名不再可用";
299                             }
300                             else if (err == ERROR_OPERATION_ABORTED) {
301                                 throw "由于线程退出或应用程序请求,已中止 I/O 操作";
302                             }
303                             else {
304                                 printf("其他错误, 退出线程!\n");
305                                 break;
306                             }
307                         }
308                         else {
309                             operate = CONTAINING_RECORD(overlapped, io_operate, io_operate::overlapped);
310                             // 客户端正常断开/或者发送0byte数据
311                             if (bytes == 0 && operate->type != operation_type::null) {
312                                 operate->type = operation_type::close;
313                             }
314                             // I/O操作
315                             switch (operate->type) {
316                             case operation_type::accept: {
317                                 accept_process(data, operate);
318                             } break;
319                             case operation_type::send: {
320                                 printf("收到来自客户端的发送请求\n");
321                             } break;
322                             case operation_type::recv: {
323                                 recv_process(data, operate);
324                             } break;
325                             case operation_type::close: {
326                                 close_process(data, operate);
327                             } break;
328                             default: {
329                                 printf("I/O操作类型异常\n");
330                             }
331                             }
332                         }
333                     }
334                     catch (const char* err) {
335                         operate = CONTAINING_RECORD(overlapped, io_operate, io_operate::overlapped);
336                         operate->type = operation_type::null;
337                         PostQueuedCompletionStatus(_complete_port, 0, (ULONG_PTR)data, overlapped);
338                         printf("%s [已重启IOCP] (Error: %d)\n", err, WSAGetLastError());
339                     }
340                 }
341                 printf("工作线程(%d)退出\n", id);
342                 if (--_work_num == 0) {
343                     clear_all();
344                 }
345                 }, ++id);
346         }
347     }
348 
349     void iocp_model::close() {
350         if (_listen.socket != INVALID_SOCKET) {
351             // 通知线程退出
352             SetEvent(_exit);
353             // 通知完成端口操作退出
354             for (int i = 0; i < _work_num; ++i, PostQueuedCompletionStatus(_complete_port, 0, (DWORD)NULL, NULL));
355         }
356     }
357 
358     void iocp_model::set_data_process(fun1&& func) {
359         _data_process = std::make_shared<fun1>(std::move(func));
360     }
361 }

 

标签:改之,socket,IOCP,代码,client,io,operate,data,addr
来源: https://www.cnblogs.com/muzzik/p/12615129.html