curl_multi异步高并发服务实现
作者:互联网
自己开发了一个股票软件,功能很强大,需要的点击下面的链接获取:
https://www.cnblogs.com/bclshuai/p/11380657.html
curl_multi 异步高并发服务实现
目录
1 介绍... 1
2 curl_multi异步实现... 1
2.1 curl_multi_poll方式实现异步curl 2
2.1.1 函数调用步骤... 2
2.1.2 实现方案... 2
2.1.3 遇到的问题... 3
2.1.4 curl_multi_poll异步服务封装类实例... 3
2.2 multi_socket实现异步curl 4
1 介绍
https://curl.se/libcurl/c/libcurl-multi.html
libcurl-easy方式是阻塞执行请求,当请求数量过大,或者需要高并发请求时,同步阻塞模式就会显示出性能瓶颈,执行效率低,延时严重,CPU占用率高,程序阻塞卡顿。所以采用异步方式,可以实现高并发请求的应用场景,异步可以在单线程中同时执行多个请求,等待curl文件标志或者自定义文件标志发生变化时,处理请求结果,支持在几千个平行连接上请求数据,基于事件处理结果。
2 curl_multi异步实现
异步请求有两种方式,同步多线程调用会出现CPU占用率过高的情况,导致界面卡死。
有两种方式
(1) 老方式select来判断请求返回结果
(2) multi_socket。
2.1 curl_multi_poll方式实现异步curl
2.1.1 函数调用步骤
(1) curl_multi_init初始化一个multi handle
(2) curl_easy_init 初始化一个easy handle
(3) curl_easy_setopt 给easyhandle设置各种参数
(4) curl_multi_add_handle添加到multihandle
(5) curl_multi_perform异步执行请求,每次执行返回对列中正在运行的数量,为0时,表示执行结束,结束并不意味着所有的请求都成功了,也可能执行失败了。所以需要循环执行该函数。为了减少循环执行的CPU占用率,可以使用curl_multi_poll函数或者curl_multi_fdset配合select函数来判断是否有结果返回,通知读取数据,减少CPU占用。curl_multi_timeout可以为select提供一个合适的超时时间。
(6) curl_multi_info_read 读取返回结果消息队列中的消息,重复调用,直到消息队列为空。返回数据中有个easy handle 用来标识是哪个请求。
(7) curl_multi_remove_handle 将执行结束的easyhandle从multihandle中移除,表示multihandle不再管理此easyhand,可以销毁释放,也可以修改请求连接url和参数,重新加入,复用连接。
(8) curl_easy_cleanup 执行结束后,先清除easy handle
(9) curl_multi_cleanup 执行这个函数,清除multi handle
2.1.2 实现方案
(1) 实现一个服务,程序调用服务的添加任务接口addTask不断的加入任务。
(2) 创建一个线程1不断的从任务队列中取出任务,分配easy handle,给easy_hand设置url等参数。然后添加到multihandle上,去执行请求;并将easyhand和任务之间用map保存起来,表示正在进行的任务;
(3) 创建线程2不断的select或者curl_multi_poll或者curl_multi_wait或者查看multihandle的状态,看是否有数据返回,有数据返回则读取数据。curl_multi_poll和curl_multi_wait比select更好,可以解决连接上限为1024个的问题。curl_multi_poll和curl_multi_wait区别有两个,一个是curl_multi_poll在被等待的时间内,可以调用curl_multi_wakeup激活,curl_multi_poll会加速返回。而curl_multi_wait无法被激活,只能等到有事件触发,或者超时返回。另外一个区别是如果没有文件描述符可以等待,curl_multi_wait会立刻返回,而curl_multi_poll一定要等到超时时间才能返回。
(4) 读取数据会返回easyhand,用easyhand去map中查找对应的任务;然后根据不同的任务属性去处理数据,调用回调函数,将数据返回给程序。
2.1.3 遇到的问题
(1) 出现崩溃,可能是多线程调用libcurl接口的原因;
(2) curl_multi_add_handle添加easyhand返回失败,errocode:8,CURLM_RECURSIVE_API_CALL,错误原因是从回调内部调用API函数。没有找到解决办法,可能和多线程调用有关。设置超时时间curl_easy_setopt(curl, CURLOPT_TIMEOUT, 30); 超时设置为0时会出现崩溃,设置成0表示不超时;并且设置不发出信号,curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);解决问题。
(3) 当使用的easyhand太多时,200个,会出现错误码CURLE_COULDNT_CONNECT ,7的错误,意识无法连接()到主机或代理。connect refused。连接数量太多,需要创建太多socket连接,而服务器端创建的连接数量有限,导致失败。
(4) 一次向multihand添加1000条任务,curl_multi_perform执行返回任务从1000降到0后,并不是所有分任务都执行完了,读取的数据也就600条左右,需要多次调用curl_multi_info_read去读取数据。
2.1.4 curl_multi_poll异步服务封装类实例
本代码实例采用curl_multi_poll实现异步消息的等待,比select性能更加,去除了select上限1024的束缚。把异步请求调用过程封装成一个服务形式,所有的异步请求都可以发给服务去执行,然后通过回调函数返回结果。其中easyhand做成一个连接池的形式,可以重复使用,并且可以复用连接,提高了请求性能。代码实例中运用到了C++11新特性的内容,包括智能指针、std::thread、std::move等。
(1) 封装类头文件
#pragma once #include"curl.h" #include <mutex> #include <condition_variable> #include"BaseDefine.h" class CurlSelectMulti { public: CurlSelectMulti(); ~CurlSelectMulti(); //全局初始化 static void GlobalInit(); //全局反初始化 void GlobleFint(); //初始化 int init(); //反初始化 void finit(); //添加任务到队列 void addTask(shared_ptr<Task>& task); private: //处理任务,循环从对列中获取数据,添加到muitihand void dealTask(); //检查是否有任务完成 void handTaskResult(); //读取已完成的任务进行解析 void readTaskResult(); //从easyhand队列中获取easyhand,没有则新建一个 CURL* GetCurl(); //新建一个easyhand CURL* CreateCurl(); //将使用完的easyhand放入队列 void PutCurl(CURL* curl); //将任务添加到mulitihand,执行任务 void addTaskToMultiRequest(list<shared_ptr<Task>>& listTask); //给easyhand设置参数 int setTaskParameter(CURL* easyhand, shared_ptr<Task>& task); bool m_bDebug=false; CURL* m_pMultiHand=nullptr;//多操作句柄 list<shared_ptr<Task>> m_listTask;//任务列表 mutex m_taskMutex;//任务列表的锁 mutex m_easyHandMutex;//easyhand队列的锁 list<CURL*>m_listEasyHand;// easyhand队列 bool m_bRunning = true;//线程控制函数 thread m_taskAddThread;//投递任务的的线程 thread m_taskHandThread;//判断任务状态, 处理任务的线程 condition_variable m_conVarTask; map<CURL*, std::shared_ptr<Task>> m_mapRuningTask;//正在执行的任务 mutex m_runningTaskMutex; mutex m_curlApiMutex;//多线程调用curl的接口时会出现崩溃,这里加个锁 int m_curlnum = 0;// int m_successnum=0; int m_failednum = 0; int m_addmultFailed; };
(2) 封装类源文件
#include "stdafx.h" #include "CurlSelectMulti.h" static int OnDebug(CURL *, curl_infotype itype, char * pData, size_t size, void *) { if (itype == CURLINFO_TEXT) { //printf("[TEXT]%s\n", pData); } else if (itype == CURLINFO_HEADER_IN) { printf("[HEADER_IN]%s\n", pData); } else if (itype == CURLINFO_HEADER_OUT) { printf("[HEADER_OUT]%s\n", pData); } else if (itype == CURLINFO_DATA_IN) { printf("[DATA_IN]%s\n", pData); } else if (itype == CURLINFO_DATA_OUT) { printf("[DATA_OUT]%s\n", pData); } return 0; } static size_t OnWriteData(void* buffer, size_t size, size_t nmemb, void* lpVoid) { std::string* str = dynamic_cast<std::string*>((std::string *)lpVoid); if (NULL == str || NULL == buffer) { return -1; } char* pData = (char*)buffer; str->append(pData, size * nmemb); return nmemb; } CurlSelectMulti::CurlSelectMulti() { } CurlSelectMulti::~CurlSelectMulti() { finit(); } void CurlSelectMulti::GlobalInit() { curl_global_init(CURL_GLOBAL_ALL); } void CurlSelectMulti::GlobleFint() { curl_global_cleanup(); } int CurlSelectMulti::init() { //创建一个multi句柄 m_pMultiHand = curl_multi_init(); if (m_pMultiHand == nullptr) { return false; } m_bRunning = true; m_taskAddThread=std::move(thread(std::bind(&CurlSelectMulti::dealTask, this))); m_taskHandThread = std::move(thread(std::bind(&CurlSelectMulti::handTaskResult,this))); //m_taskAddThread.join(); //m_taskHandThread.join(); return true; } void CurlSelectMulti::finit() { //让线程自动退出 m_bRunning = false; //通知不在等待 m_conVarTask.notify_all(); //清除所有的easycurl while (m_listEasyHand.size()>0) { auto it = move(m_listEasyHand.front()); curl_multi_remove_handle(m_pMultiHand,it); curl_easy_cleanup(it); m_listEasyHand.pop_front(); } //清除multihand if (m_pMultiHand != nullptr) { curl_multi_cleanup(m_pMultiHand); m_pMultiHand = nullptr; } } void CurlSelectMulti::addTask(shared_ptr<Task>& task) { if (m_listTask.size() > 5000) { //printf("task is full size %d ,abord task %d", m_listTask.size(),task->taskid); return; } unique_lock<mutex> lk(m_taskMutex); m_listTask.push_back(task); //m_conVarTask.notify_one();//通知有任务添加 lk.unlock(); } CURL* CurlSelectMulti::GetCurl() { CURL* curl = NULL; m_easyHandMutex.lock(); if (m_listEasyHand.size()>0) { curl = m_listEasyHand.front(); m_listEasyHand.pop_front(); } m_easyHandMutex.unlock(); if (curl == NULL) { curl = CreateCurl(); } return curl; } CURL* CurlSelectMulti::CreateCurl() { if (m_curlnum >100)//数量太多会出现连接失败的error { return NULL; } m_curlnum++; printf("curl num %d", m_curlnum); CURL* curl = curl_easy_init(); if (NULL == curl) { return NULL; } if (m_bDebug) { curl_easy_setopt(curl, CURLOPT_VERBOSE, 1); curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, OnDebug); } //curl_easy_setopt(curl, CURLOPT_URL, strUrl.c_str()); curl_easy_setopt(curl, CURLOPT_READFUNCTION, NULL); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, OnWriteData); //curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&strResponse); /* enable TCP keep-alive for this transfer */ curl_easy_setopt(curl, CURLOPT_TCP_KEEPALIVE, 1L); /* keep-alive idle time to 120 seconds */ curl_easy_setopt(curl, CURLOPT_TCP_KEEPIDLE, 300L); /* interval time between keep-alive probes: 60 seconds */ curl_easy_setopt(curl, CURLOPT_TCP_KEEPINTVL, 200L); curl_easy_setopt(curl, CURLOPT_TIMEOUT, 100); //支持重定向 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1); //保持会话,不用反复创建连接,据说可以提高效率 curl_easy_setopt(curl, CURLOPT_COOKIESESSION, 1); //设置共享dns cache功能,据说能提高性能 curl_share_setopt(curl, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS); //不验证主机名称 curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0); curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0); //不验证对端的证书 //curl_easy_setopt(curl, CURLOPT_CAINFO, c->msg._caPath.c_str()); /** * 当多个线程都使用超时处理的时候,同时主线程中有sleep或是wait等操作。 * 如果不设置这个选项,libcurl将会发信号打断这个wait从而导致程序退出。 */ curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L); curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 100); return curl; } void CurlSelectMulti::PutCurl(CURL* curl) { m_easyHandMutex.lock(); m_listEasyHand.push_back(curl); m_easyHandMutex.unlock(); //m_conVarEasyHand.notify_all(); } void CurlSelectMulti::dealTask() { while (m_bRunning) { unique_lock<mutex> lk(m_taskMutex); //m_conVarTask.wait(lk);//等待任务添加 if (m_listTask.size() > 0) { list<shared_ptr<Task>> listTask; listTask.swap(m_listTask); lk.unlock(); addTaskToMultiRequest(listTask); } else { lk.unlock(); Sleep(20); } } } void CurlSelectMulti::handTaskResult() { CURLMcode mc = CURLM_OK; int still_running = 0; int ret = 0; while (m_bRunning) { m_curlApiMutex.lock(); //执行请求,并返回正在执行的请求数量 mc = curl_multi_perform(m_pMultiHand, &still_running); //printf("still running num%d,%d\n", still_running, mc); m_curlApiMutex.unlock(); //等待有任务完成的通知,有结果时立刻返回,没有结果时1000ms后等待结束返回,ret返回完成的任务数量 mc = curl_multi_poll(m_pMultiHand, NULL, 0, 1000, &ret); if (mc == CURLM_OK)//有任务 { readTaskResult(); } else { printf("curl_multi_poll error %d", mc); } //if (still_running>0)//有正在执行的请求任务 //{ // while (still_running>0) // { // // //再次执行curl_multi_perform,更新still_running // mc = curl_multi_perform(m_pMultiHand, &still_running); // //printf("still running num%d\n", still_running); // } // printf("task finish\n"); //} //else//没有任务,则等一会,避免一直循环,cpu占用过高 //{ // Sleep(100); //} } } void CurlSelectMulti::readTaskResult() { CURLMsg* m = NULL; do { int msgq = 0; m_curlApiMutex.lock(); m = curl_multi_info_read(m_pMultiHand, &msgq); m_curlApiMutex.unlock(); if (m && (m->msg == CURLMSG_DONE)) { CURL *e = m->easy_handle; // 数据处理 auto it = m_mapRuningTask.find(e); if (it != m_mapRuningTask.end()) { if (m->data.result != 0) { m_failednum++; printf("request error %d,failednum%d,taskid%d,%s\n", m->data.result, m_failednum, m_mapRuningTask[e]->taskid, m_mapRuningTask[e]->strUrl.c_str()); } else { m_successnum++; printf("request success successnum%d,id %d, \n ", m_successnum, m_mapRuningTask[e]->taskid);//, , m_mapRuningTask[e]->strResponse.c_str() } //移除easyhand m_curlApiMutex.lock(); curl_multi_remove_handle(m_pMultiHand, e); m_curlApiMutex.unlock(); //从map中移除 m_runningTaskMutex.lock(); if (it->second->headers != nullptr)//清除数据 { curl_slist_free_all(it->second->headers); } m_mapRuningTask.erase(it); m_runningTaskMutex.unlock(); //放回对列中重复使用 PutCurl(e); } else { //移除easyhand m_curlApiMutex.lock(); curl_multi_remove_handle(m_pMultiHand, e); m_curlApiMutex.unlock(); printf( "find map key failed" ); PutCurl(e); } } } while (m); } void CurlSelectMulti::addTaskToMultiRequest(list<shared_ptr<Task>>& listTask) { while (listTask.empty()==false) { auto item = listTask.front(); CURL* easyhand = GetCurl();//获取easyhand if (easyhand == NULL) { //unique_lock<mutex> lk(m_easyHandMutex); //m_conVarEasyHand.wait(lk);//等待有easyhand被放入 Sleep(1); continue; } //根据任务设置参数,设置url,timeout等参数到easyhand //使用智能指针,指向在堆上创建的对象 /*shared_ptr<Task> task(new Task()); *task = item;*/ if (setTaskParameter(easyhand, item) != 0) { Sleep(2); PutCurl(easyhand); continue; } //将设置好参数的easyhand添加到multihand m_curlApiMutex.lock(); CURLMcode code = curl_multi_add_handle(m_pMultiHand, easyhand);//当任务数量太大时出现添加失败错误码8,有时还崩溃。 m_curlApiMutex.unlock(); if (code!= CURLM_OK) { m_addmultFailed++; string strerror= curl_multi_strerror(code); printf("curl_multi_add_handle failed%d,%s\n", m_addmultFailed, strerror.c_str()); PutCurl(easyhand);// continue; } //加入成功后将easyhand和task加入map,便于返回结果时通过easyhand去查找任务 m_runningTaskMutex.lock(); m_mapRuningTask.insert({ easyhand, item }); printf("running task size:%d\n", m_mapRuningTask.size()); m_runningTaskMutex.unlock(); listTask.pop_front(); } } int CurlSelectMulti::setTaskParameter(CURL* easyhand, shared_ptr<Task>& task) { CURLcode code = CURLE_OK; do { if (task->iType == HttpType::HTTP_POST || task->iType == HttpType::HTTPS_POST) { code = curl_easy_setopt(easyhand, CURLOPT_POST, 1); //post方法 if (code!= CURLE_OK) { printf("curl_easy_setopt error %d", code); break; } code = curl_easy_setopt(easyhand, CURLOPT_POSTFIELDSIZE, task->strPostContent.size()); if (code != CURLE_OK) { printf("curl_easy_setopt error %d", code); break; } code = curl_easy_setopt(easyhand, CURLOPT_POSTFIELDS, task->strPostContent.data()); if (code != CURLE_OK) { printf("curl_easy_setopt error %d", code); break; } } //设置url code = curl_easy_setopt(easyhand, CURLOPT_URL, task->strUrl.c_str()); if (code != CURLE_OK) { printf("curl_easy_setopt error %d", code); break; } code = curl_easy_setopt(easyhand, CURLOPT_WRITEDATA, (void *)&(task->strResponse)); if (code != CURLE_OK) { printf("curl_easy_setopt error %d", code); break; } //设置协议头 if (task->headers != nullptr) { code=curl_easy_setopt(easyhand, CURLOPT_HTTPHEADER, task->headers); if (code != CURLE_OK) { printf("curl_easy_setopt error %d", code); break; } } } while (0); if (code!=CURLE_OK)//清除掉无效的easyhand { printf("setTaskParameter error"); PutCurl(easyhand); return -1; } return 0; }
(3) 服务封装类使用实例
// CurlMultiServer.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" #include <iostream> #include <fstream> #include <thread> #include "CurlSelectMulti.h" #include "CurlSocketMulti.h" using namespace std; int main() { CurlSelectMulti m_multServer; m_multServer.GlobalInit(); m_multServer.init(); int taskid=0; int time = 0; //while (true) { for (int i = 0; i < 10000; i++) { shared_ptr<Task> task = std::make_shared<Task>(); task->iType = HttpType::HTTPS_GET; task->strUrl = ""; taskid++; task->taskid = taskid; m_multServer.addTask(task); } //Sleep(1000); //printf("using time %ds,task num %d",(time++) * 1, time * 100); } getchar(); return 0; }
(4) 性能测试
在如下电脑配置条件下,请求了某个http请求1万次,所用时间14秒,平均一次1.4毫秒。每秒714次请求调用。
另外还有multi_socket和事件libevent结合的方式,且听下回分解。
标签:异步,multi,task,setopt,easyhand,easy,curl 来源: https://www.cnblogs.com/bclshuai/p/15790038.html