Brpc代码分析 Server代码详解七
作者:互联网
2021SC@SDUSC
因为brpc使用的是epoll的边缘触发,所以将fd设置为非阻塞,然后设置socket的send,recv buffer大小,然后将当前fd加入到event_dispatcher
在GetGlobalEventDispatcher中,会只进行一次初始化dispatcher的工作,会创建FLAGS_event_dispatcher_num个dispatcher,默认为1,构造函数中会创建epoll fd,然后调用Start
在Start中会启动一个bthread,执行RunThis,RunThis会执行dispatcher的Run
Run中就是循环进行epoll_wait,
然后对判断事件,如果是EPOLLIN事件,那么执行StartInputEvent,如果是EPOLLOUT,则执行HandleEpollOut
到现在为止,dispatcher的Start就执行结束了,然后回到Socket的ResetFileDescriptor,会执行AddConsumer,将socket_id存到epoll_event的data中,然后注册EPOLLIN事件,到这里,server端初始化过程就完成了,接下来就坐等请求到来。
然后回到epoll_wait,当有新请求到来时,epoll_wait返回,遍历每一个事件,执行Socket::StartInputEvent;注册epoll事件时将socketid注册在了epoll_data的u64中,所以首先通过SocketId address到Socket,为了保证一个fd上只有一个bthread在处理,这里引入了一个atomic变量_nevent,通过_nevent判断当前socket是否有bthread正在处理,如果有的话就什么都不做,因为正在处理的线程执行完后会执行新事件,如果没有的话就使用bthread_start_urgent启动一个bthread执行ProcessEvent来处理新消息,此时epoll bthread让出当前worker的处理,worker执行新建的ProcessEvent bthread,而epoll bthread则被steal到另一个worker线程执行。这里就是官网中所说,brpc不区分io线程和worker线程,epoll bthread不负责数据的读取,IO线程的问题在于一个线程同时只能读一个fd,当多个繁忙的fd聚集在一个IO线程中时,一些读取就被延迟了;另外epoll bthread让出当前worker给ProcessEvent bthread,这样使其有更好的cache locality,可以尽快地读取fd上的数据。
ProcessEvent则是调用on_edge_triggered_events,在Socket Create的时候将该回调函数设置为了OnNewConnections。
这里会通过MoreReadEvents循环判断当前fd上是否还有新的事件,然后执行OnNewConnectionsUntilEAGAIN,这里判断是否有新事件的方法就是上文提到的_nevent。
在OnNewConnectionsUntilEAGAIN中,首先通过accept拿到一个已完成的连接,然后从监听socket中获取user,即之前的acceptor,然后创建Socket,回调函数设置为OnNewMessages,接下来的过程和之前创建监听Socket过程一样,执行AddConsumer,将socket_id存到epoll_event的data中,注册EPOLLIN事件,然后这个新连接有数据来了之后会在epoll_wait返回,执行ProcessEvent,调用到OnNewMessages。
InputMessenger负责从fd上切割和处理消息,它通过用户回调函数理解不同的格式。Parse一般是把消息从二进制流上切割下来,运行时间较固定;Process则是进一步解析消息(比如反序列化为protobuf)后调用用户回调,时间不确定。若一次从某个fd读取出n个消息(n > 1),InputMessenger会启动n-1个bthread分别处理前n-1个消息,最后一个消息则会在原地被Process。InputMessenger会逐一尝试多种协议,由于一个连接上往往只有一种消息格式,InputMessenger会记录下上次的选择,而避免每次都重复尝试。
可以看到,fd间和fd内的消息都会在brpc中获得并发,这使brpc非常擅长大消息的读取,在高负载时仍能及时处理不同来源的消息,减少长尾的存在。
这里的handler就是一开始初始化注册的所有server端协议。进入while循环,计算一次读取数据的长度,DoRead会执行_read_buf.append_from_file_descriptor,_read_buf是IOPortal类型,如上篇博客所讲,这个方法会调用readv将fd中的数据读入到iobuf的block中。
因为这一次数据读取可能会包含多个消息,因此下面会有另一个while循环,每次调用CutInputMessage尝试从iobuf中切割一条消息,如上文,server端是支持多协议的,所以这里第一次会尝试使用所有的协议进行一次parse,因为大多数情况下一个连接上只有一种协议,因此尝试一遍之后会记录下来执行成功的协议,之后将首先尝试记录的协议。
这里的parse就是上文提到的注册协议中的parse方法,这里以协议baidu_std为例简单介绍一下,baidu_std官方介绍在这里https://github.com/apache/incubator-brpc/blob/master/docs/cn/baidu_std.md,可以看到判断是否是baidu_std的方法就是判断前4 个字节是否为”prpc”,这里的copy_to为显式拷贝,因为在判断是否为baidu_std协议的过程中不能消费数据,否则可能会影响其他协议解析;如果iobuf中的数据不够4个字节且是”prpc”的前缀,那么返回PARSE_ERROR_NOT_ENOUGH_DATA错误,这个表示到目前为止不违反当前协议,但是数据不足一个消息,因此会触发重新DoRead的过程;如果前4个字节就是”prpc”,那么满足baidu_std协议,接下来将解析包体长度和包体中的元数据包长度存入到body_size和meta_size中,如果iobuf中数据长度不足body_size + 12(4 + 4 + 4),那么同样会触发重读,否则将iobuf中的meta和其他数据切割到msg中并返回,这里是零拷贝。
如果一次DoRead读入了n条消息,那么前n-1条消息会通过QueueMessage后台启动了n-1个bthread进行处理,而最后一个消息会被析构函数RunLastMessage执行,原地执行process函数,即协议中的process。
还是以baidu_std为例,首先是从iobuf中解析pb格式的meta,
然后创建req,res,Controller,然后进行流控,解析req,如果有attachment也解析出来,最后调用CallMethod,CallMethod方法在编译protobuf时生成,会调用到用户定义的Echo方法
最后在Echo中done_guard析构时会调用done的Run方法,发送response到client。
标签:baidu,bthread,Brpc,epoll,代码,Server,fd,消息,执行 来源: https://blog.csdn.net/m0_54588463/article/details/121322108