C++ IOCP模型代码(改之小猪代码)
作者:互联网
昨天看了一下小猪的代码,自己也下载下来学习了一下,发现存在内存泄漏、线程出现错误一直等待I/O完成导致一直休眠的问题,
然后按照自己的代码风格修改了一些问题,现在不会出现内存泄漏和常见线程错误导致一直休眠的问题了。代码如下,感兴趣的小伙伴可以自己复制运行测试
推荐这篇转载至小猪的文章,有配图https://blog.csdn.net/weixin_30360497/article/details/95383219
iocp_model.h
1 #pragma once 2 #include <WinSock2.h> 3 #include <MSWSock.h> 4 #include <Ws2tcpip.h> 5 #include <cstdint> 6 #include <vector> 7 #include <map> 8 #include "thread_pool.h" 9 #include "lock_free_stack.h" 10 11 namespace iocp { 12 using fun1 = typename std::function<void(SOCKADDR_IN*, char*)>; 13 14 // 缓存大小 15 const uint16_t buff_len = 4096; 16 // 投递Accept请求数量(灵活设置:根据流量大小) 17 const uint8_t post_accept_num = 8; 18 // iocp_model函数指针 19 class iocp_model; 20 21 // 操作类型--_OPERATION_TYPE 22 enum class operation_type { 23 null, 24 accept, //连接 25 send, //发送 26 recv, //接收 27 close, //关闭 28 }; 29 30 // I/O操作--_PER_IO_CONTEXT 31 struct io_operate { 32 OVERLAPPED overlapped; //I/O信息 33 SOCKET accept; //连接套接字 34 WSABUF wsabuf; //存储数据缓冲区 35 char buffer[buff_len]; //缓冲区 36 operation_type type; //I/O操作类型 37 io_operate(); 38 ~io_operate(); 39 // 重置缓冲区 40 void reset_buf(); 41 }; 42 43 // socket信息--_PER_SOCKET_CONTEXT 44 struct io_data { 45 SOCKET socket; //套接字 46 SOCKADDR_IN addr_info; //地址信息 47 std::vector<io_operate*> operate_data; //I/O操作上下文 48 io_data(); 49 ~io_data(); 50 // 添加I/O操作数据 51 io_operate* add_io_operate(); 52 // 删除指定I/O操作数据 53 void del_io_operate(io_operate* data); 54 }; 55 56 // iocp模型 57 class iocp_model { 58 private: 59 thread_pool* _thread_pool; //线程池 60 io_data _listen; //监听信息 61 std::map<SOCKET, io_data*> _client; //客户端信息 62 uint8_t _work_num; //工作线程数 63 HANDLE _exit; //退出事件句柄 64 HANDLE _complete_port; //完成端口句柄 65 io_operate _io_operate; //io标记 66 CRITICAL_SECTION _critical; //临界区 67 LPFN_ACCEPTEX _acceptex_ptr; //AcceptEx函数指针 68 LPFN_GETACCEPTEXSOCKADDRS _acceptex_addr_ptr; //GetAcceptExSockaddrs函数指针 69 std::shared_ptr<fun1> _data_process; //recv数据处理函数 70 71 // 初始化socket 72 void init_socket(uint16_t port); 73 // 清理资源 74 void clear_all(); 75 // 添加客户端数据 76 void add_client(io_data* data); 77 // 删除客户端数据 78 void del_client(SOCKET sock); 79 // 查找客户端数据 80 std::map<SOCKET, io_data*>::iterator find_client(SOCKET sock); 81 // 投递accept请求 82 void post_accept(io_operate* io); 83 // 投递recv请求 84 bool post_recv(io_data* data, io_operate* operate); 85 // 连接处理 86 void accept_process(io_data* data, io_operate* operate); 87 // 接收处理 88 void recv_process(io_data* data, io_operate* operate); 89 // 断开处理 90 void close_process(io_data* data, io_operate* operate); 91 protected: 92 public: 93 iocp_model(); 94 ~iocp_model(); 95 // 启动 96 void start(uint16_t port); 97 // 关闭 98 void close(); 99 // 设置recv数据处理函数 100 void set_data_process(fun1&& func); 101 }; 102 }
iocp_model.cpp
1 #include "iocp_model.h" 2 #include <assert.h> 3 #include <stdio.h> 4 #include <Windows.h> 5 #pragma comment(lib, "Mswsock.lib") 6 #pragma comment(lib, "ws2_32.lib") 7 8 #define del_ptr(ptr) if (ptr) { delete ptr; ptr = nullptr; } 9 #define clear_socket(socket) if (socket != INVALID_SOCKET) { closesocket(socket); socket = INVALID_SOCKET; } 10 #define clear_handle(handle) if (handle != NULL && handle != INVALID_HANDLE_VALUE) { CloseHandle(handle); handle = NULL; } 11 #define recv_func(addr, data) if (_data_process) { (*_data_process)(addr, data); } 12 13 namespace iocp { 14 inline io_operate::io_operate() : overlapped({}), accept(INVALID_SOCKET), wsabuf({ buff_len, buffer }), type(operation_type::null) { 15 reset_buf(); 16 } 17 inline io_operate::~io_operate() { 18 //clear_socket(accept); 关闭这个socket之后总是出现955或者1236错误,应该是有io操作未完成导致的,还没解决.. 19 } 20 void io_operate::reset_buf() { 21 memset(buffer, 0, buff_len); 22 } 23 24 inline io_data::io_data() : socket(INVALID_SOCKET), addr_info({}) {} 25 inline io_data::~io_data() { 26 if (socket != INVALID_SOCKET) { 27 clear_socket(socket); 28 } 29 for (auto data : operate_data) { 30 delete data; 31 } 32 operate_data.clear(); 33 } 34 // 添加I/O操作数据 35 io_operate* io_data::add_io_operate() { 36 operate_data.push_back(new io_operate); 37 return operate_data.back(); 38 } 39 // 删除指定I/O操作数据 40 void io_data::del_io_operate(io_operate* data) { 41 auto re = std::find(operate_data.begin(), operate_data.end(), data); 42 if (re != operate_data.end()) { 43 operate_data.erase(re); 44 } 45 } 46 47 iocp_model::iocp_model() : _thread_pool(&thread_pool::instance()), _work_num(2 * std::thread::hardware_concurrency()), _exit(NULL), _complete_port(NULL), _data_process(nullptr) {} 48 iocp_model::~iocp_model() {} 49 50 void iocp_model::init_socket(uint16_t port) { 51 try { 52 WSADATA wsa; 53 assert(WSAStartup(MAKEWORD(2, 2), &wsa) == 0); 54 if (LOBYTE(wsa.wVersion) != 2 || HIBYTE(wsa.wVersion) != 2) { 55 throw "版本号错误"; 56 } 57 // 构造监听Socket 58 _listen.socket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED); 59 if (_listen.socket == INVALID_SOCKET) { 60 throw "初始化监听套接字失败"; 61 } 62 // 绑定监听Socket到完成端口 63 if (CreateIoCompletionPort((HANDLE)_listen.socket, _complete_port, (ULONG_PTR)&_listen, 0) == NULL) { 64 clear_socket(_listen.socket); 65 throw "绑定监听套接字到完成端口失败"; 66 } 67 // 配置监听信息 68 SOCKADDR_IN sockaddr = {}; 69 sockaddr.sin_family = AF_INET; //协议簇(使用的网络协议) 70 sockaddr.sin_addr.S_un.S_addr = ADDR_ANY; //(任何地址) 71 sockaddr.sin_port = htons(port); //(端口号) 72 // 绑定端口 73 assert(bind(_listen.socket, (SOCKADDR*)&sockaddr, sizeof(SOCKADDR)) != SOCKET_ERROR); 74 // 开始监听 75 assert(listen(_listen.socket, SOMAXCONN) != SOCKET_ERROR); 76 } 77 catch (const char* err) { 78 printf("%s(Error: %d)\n", err, WSAGetLastError()); 79 WSACleanup(); 80 } 81 } 82 83 void iocp_model::clear_all() { 84 // 清理客户端数据 85 EnterCriticalSection(&_critical); 86 for (auto data : _client) { 87 del_ptr(data.second); 88 } 89 for (auto data : _listen.operate_data) { 90 del_ptr(data); 91 } 92 _client.clear(); 93 _listen.operate_data.clear(); 94 LeaveCriticalSection(&_critical); 95 // 释放临界区 96 DeleteCriticalSection(&_critical); 97 // 释放句柄 98 clear_handle(_exit); 99 clear_handle(_complete_port); 100 // 关闭socket 101 clear_socket(_listen.socket); 102 } 103 104 void iocp_model::add_client(io_data* data) { 105 _client[data->socket] = data; 106 printf("accept_client num: %zd\n", _client.size()); 107 } 108 109 void iocp_model::del_client(SOCKET sock) { 110 EnterCriticalSection(&_critical); 111 auto re = _client.find(sock); 112 if (re != _client.end()) { 113 del_ptr(re->second); 114 _client.erase(re); 115 printf("close_client num: %zd\n", _client.size()); 116 } 117 LeaveCriticalSection(&_critical); 118 } 119 120 std::map<SOCKET, io_data*>::iterator iocp_model::find_client(SOCKET sock) { 121 auto re = _client.find(sock); 122 if (re != _client.end()) { 123 return re; 124 } 125 return _client.end(); 126 } 127 128 void iocp_model::post_accept(io_operate* operate) { 129 assert(_listen.socket != INVALID_SOCKET); 130 try { 131 // 设置io操作数据 132 operate->type = operation_type::accept; 133 operate->reset_buf(); 134 // 准备参数 135 DWORD bytes = 0; 136 DWORD addr_len = sizeof(SOCKADDR_IN) + 16; 137 WSABUF* wsabuf = &operate->wsabuf; 138 OVERLAPPED* overlapped = &operate->overlapped; 139 // 准备客户端socket 140 operate->accept = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); 141 if (operate->accept == INVALID_SOCKET) { 142 throw "创建Socket失败"; 143 } 144 // 投递AcceptEx请求 145 if (!_acceptex_ptr(_listen.socket, operate->accept, wsabuf->buf, wsabuf->len - (addr_len * 2), addr_len, addr_len, &bytes, overlapped) && WSAGetLastError() != WSA_IO_PENDING) { 146 throw "创建AcceptEx请求失败"; 147 } 148 } 149 catch (const char* err) { 150 printf("%s(Error: %d)\n", err, WSAGetLastError()); 151 EnterCriticalSection(&_critical); 152 _listen.del_io_operate(operate); 153 LeaveCriticalSection(&_critical); 154 } 155 } 156 157 bool iocp_model::post_recv(io_data* data, io_operate* operate) { 158 assert(_listen.socket != INVALID_SOCKET); 159 // 设置io操作数据 160 operate->reset_buf(); 161 operate->type = operation_type::recv; 162 // 准备参数 163 DWORD flags = NULL, bytes = NULL; 164 WSABUF* wsabuf = &operate->wsabuf; 165 OVERLAPPED* overlapped = &operate->overlapped; 166 // 投递recv请求 167 if ((SOCKET_ERROR == WSARecv(operate->accept, wsabuf, 1, &bytes, &flags, overlapped, NULL)) && (WSA_IO_PENDING != WSAGetLastError())) { 168 EnterCriticalSection(&_critical); 169 data->del_io_operate(operate); 170 LeaveCriticalSection(&_critical); 171 return false; 172 } 173 return true; 174 } 175 176 void iocp_model::accept_process(io_data* data, io_operate* operate) { 177 // 准备参数 178 try { 179 SOCKADDR_IN* client_addr = nullptr; 180 SOCKADDR_IN* local_addr = nullptr; 181 int addr_len = sizeof(SOCKADDR_IN); 182 char ip[16] = {}; 183 // 通过GetAcceptExSockAddrs获取客户端/本地SOCKADDR_IN以及客户端第一组数据 184 _acceptex_addr_ptr(operate->wsabuf.buf, operate->wsabuf.len - ((addr_len + 16) * 2), addr_len + 16, addr_len + 16, (LPSOCKADDR*)&local_addr, &addr_len, (LPSOCKADDR*)&client_addr, &addr_len); 185 printf("客户端 %s:%d 连接到服务器\n", inet_ntop(AF_INET, &client_addr->sin_addr, ip, sizeof(ip)), ntohs(client_addr->sin_port)); 186 // 处理接收的数据 187 recv_func(client_addr, operate->buffer); 188 // 备份当前客户端数据 189 io_data* new_data = new io_data; 190 new_data->socket = operate->accept; 191 memcpy_s(&(new_data->addr_info), addr_len, client_addr, addr_len); 192 // 绑定客户端socket到完成端口 193 if (CreateIoCompletionPort((HANDLE)new_data->socket, _complete_port, (ULONG_PTR)operate, 0) == 0) { 194 del_ptr(new_data); 195 throw "绑定客户端socket到完成端口失败"; 196 } 197 // 投递recv的I/O请求 198 io_operate* new_operate = new_data->add_io_operate(); 199 new_operate->type = operation_type::recv; 200 new_operate->accept = new_data->socket; 201 EnterCriticalSection(&_critical); 202 if (!post_recv(new_data, new_operate)) { 203 new_data->del_io_operate(new_operate); 204 LeaveCriticalSection(&_critical); 205 throw "投递recv请求失败"; // C26115 206 } 207 // 添加客户端信息 208 add_client(new_data); 209 LeaveCriticalSection(&_critical); 210 // 重新投递accept请求 211 post_accept(operate); 212 } 213 catch (const char* err) { 214 printf("%s(Error: %d)\n", err, WSAGetLastError()); 215 } 216 } 217 218 void iocp_model::recv_process(io_data* data, io_operate* operate) { 219 auto re = find_client(operate->accept); 220 if (re != _client.end()) { 221 recv_func(&re->second->addr_info, operate->buffer); 222 } 223 post_recv(data, operate); 224 } 225 226 void iocp_model::close_process(io_data* data, io_operate* operate) { 227 EnterCriticalSection(&_critical); 228 auto re = find_client(operate->accept); 229 if (re != _client.end()) { 230 char ip[16]; 231 printf("客户端 %s:%d 关闭连接\n", inet_ntop(AF_INET, &re->second->addr_info.sin_addr, ip, sizeof(ip)), ntohs(re->second->addr_info.sin_port)); 232 del_ptr(re->second); 233 _client.erase(re); 234 } 235 printf("close_client num: %zd\n", _client.size()); 236 LeaveCriticalSection(&_critical); 237 //del_client(operate->accept); 238 } 239 240 void iocp_model::start(uint16_t port) { 241 // 初始化临界区 242 InitializeCriticalSection(&_critical); 243 // 建立完成端口 244 _complete_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); 245 assert(_complete_port != NULL); 246 // 初始化socket 247 init_socket(port); 248 try { 249 GUID guid_acceptex = WSAID_ACCEPTEX; 250 GUID guid_acceptex_addr = WSAID_GETACCEPTEXSOCKADDRS; 251 DWORD bytes = 0; 252 // 获取AcceptEx函数指针 253 if (WSAIoctl(_listen.socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid_acceptex, sizeof(GUID), &_acceptex_ptr, sizeof(LPFN_ACCEPTEX), &bytes, NULL, NULL) == SOCKET_ERROR) { 254 throw "获取AcceptEx函数指针失败"; 255 } 256 // 获取GetAcceptExSockAddrs函数指针 257 if (WSAIoctl(_listen.socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid_acceptex_addr, sizeof(GUID), &_acceptex_addr_ptr, sizeof(LPFN_GETACCEPTEXSOCKADDRS), &bytes, NULL, NULL) == SOCKET_ERROR) { 258 throw "获取GetAcceptExSockAddrs函数指针失败"; 259 } 260 for (int i = 0; i < post_accept_num; ++i) { 261 // 投递AcceptEx请求 262 post_accept(_listen.add_io_operate()); 263 } 264 } 265 catch (const char* err) { 266 printf("%s(Error: %d)\n", err, WSAGetLastError()); 267 clear_all(); 268 } 269 // 建立工作线程 270 std::atomic<uint8_t> id(0); 271 std::atomic<uint16_t> count(0); 272 for (int i = 0; i < _work_num; ++i) { 273 _thread_pool->add([&](uint8_t id) { 274 printf("工作线程%d启动\n", id); 275 // 准备参数 276 io_data* data = nullptr; 277 OVERLAPPED* overlapped = nullptr; 278 BOOL status; 279 DWORD bytes = 0; 280 io_operate* operate = nullptr; 281 while (WaitForSingleObject(_exit, 0) != WAIT_OBJECT_0) { 282 try { 283 status = GetQueuedCompletionStatus(_complete_port, &bytes, (PULONG_PTR)&data, &overlapped, INFINITE); 284 if (data == NULL) { 285 break; 286 } 287 if (!status) { 288 DWORD err = GetLastError(); 289 if (err == WAIT_TIMEOUT) { 290 if (send(data->socket, "", 0, 0) == SOCKET_ERROR) { 291 throw "客户端异常断开"; 292 } 293 else { 294 throw "等待的操作过时"; 295 } 296 } 297 else if (err == ERROR_NETNAME_DELETED) { 298 throw "指定的网络名不再可用"; 299 } 300 else if (err == ERROR_OPERATION_ABORTED) { 301 throw "由于线程退出或应用程序请求,已中止 I/O 操作"; 302 } 303 else { 304 printf("其他错误, 退出线程!\n"); 305 break; 306 } 307 } 308 else { 309 operate = CONTAINING_RECORD(overlapped, io_operate, io_operate::overlapped); 310 // 客户端正常断开/或者发送0byte数据 311 if (bytes == 0 && operate->type != operation_type::null) { 312 operate->type = operation_type::close; 313 } 314 // I/O操作 315 switch (operate->type) { 316 case operation_type::accept: { 317 accept_process(data, operate); 318 } break; 319 case operation_type::send: { 320 printf("收到来自客户端的发送请求\n"); 321 } break; 322 case operation_type::recv: { 323 recv_process(data, operate); 324 } break; 325 case operation_type::close: { 326 close_process(data, operate); 327 } break; 328 default: { 329 printf("I/O操作类型异常\n"); 330 } 331 } 332 } 333 } 334 catch (const char* err) { 335 operate = CONTAINING_RECORD(overlapped, io_operate, io_operate::overlapped); 336 operate->type = operation_type::null; 337 PostQueuedCompletionStatus(_complete_port, 0, (ULONG_PTR)data, overlapped); 338 printf("%s [已重启IOCP] (Error: %d)\n", err, WSAGetLastError()); 339 } 340 } 341 printf("工作线程(%d)退出\n", id); 342 if (--_work_num == 0) { 343 clear_all(); 344 } 345 }, ++id); 346 } 347 } 348 349 void iocp_model::close() { 350 if (_listen.socket != INVALID_SOCKET) { 351 // 通知线程退出 352 SetEvent(_exit); 353 // 通知完成端口操作退出 354 for (int i = 0; i < _work_num; ++i, PostQueuedCompletionStatus(_complete_port, 0, (DWORD)NULL, NULL)); 355 } 356 } 357 358 void iocp_model::set_data_process(fun1&& func) { 359 _data_process = std::make_shared<fun1>(std::move(func)); 360 } 361 }
标签:改之,socket,IOCP,代码,client,io,operate,data,addr 来源: https://www.cnblogs.com/muzzik/p/12615129.html