编程语言
首页 > 编程语言> > C/C++编程:ZeroMQ线程间收发命令源码分析

C/C++编程:ZeroMQ线程间收发命令源码分析

作者:互联网

C/C++编程:ZeroMQ架构可以看到,线程间通信包括两类:

命令的发送和存储是通过mailbox_t实现的,消息的发送和存储是通过pipe_t实现的。限制先说一下线程间的收发命令。

zeromq的线程可以分为两类:

int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)

另外,两类线程发送命令的方式是一致的。下面,就详细的说一下命令结构、如何发送命令、两类线程如何接收命令

命令

先看一下命令结构(详细的结构参见源码Command.hpp):

//  This structure defines the commands that can be sent between threads.
    struct command_t
    {
        //  Object to process the command.
        zmq::object_t *destination;

        enum type_t
        {
           ...
        } type;

        union {
           ...
        } args;
    };

可以看到,命令由三部分组成,分别是发往目的地destination,命令的类型type,命令的参数args。所谓的命令就是一个对象交代另一个对象去做某些事情,说白了就是告诉一个对象应该调用哪个方法,命令的发出者是一个对象,而接受者是一个线程,线程接收到命令后,根据目的地派发给相应的对象做处理,可以看到命令的destination属性是object_t类型的,在上节介绍类的层次结构图时,说到object_t以及其子类都具有发送和处理命令的功能(没有接收命令的功能),所以有必要弄清楚一件事,对象、object_t、poller、线程、mailbox_t、命令是什么关系?

发命令

一个对象想使用线程的发命令功能,其类就得继承自object_t(源码在Object.hpp/.cpp):

class object_t
    {
    public:
        object_t (zmq::ctx_t *ctx_, uint32_t tid_);
        void process_command (zmq::command_t &cmd_);
        ...
    protected:
        ...
    private:
        zmq::ctx_t *ctx;//  Context provides access to the global state.
        uint32_t tid;//  Thread ID of the thread the object belongs to.
        void send_command (command_t &cmd_);
    }

可以看到:

zmq线程中的mailbox_t对象会被zmq存储在ctx_t对象中。zmq的做法就是,在上下文语境中使用一个容器slots装载线程的mailbox,在新建线程的时候,给线程分配一个线程标志tid和mailbox,把mailbox放入容器的tid那个位置,代码来说就是slots[tid]=mailbox。有了这个基础,线程A给线程B发命令就只要往slots[B.tid]写入命令就可以了:

void zmq::object_t::send_command (command_t &cmd_)
{
    ctx->send_command (cmd_.destination->get_tid (), cmd_);
}
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{
    slots [tid_]->send (command_);
}
void zmq::mailbox_t::send (const command_t &cmd_)
{
    sync.lock();
    cpipe.write (cmd_, false);
    bool ok = cpipe.flush ();
    sync.unlock ();
    if (!ok)
        signaler.send ();
}

io线程收命令

前面说过,每个io线程都含有一个poller,io线程的结构如下(源码在Io_thread_t.hpp/.cpp):

class io_thread_t : public object_t, public i_poll_events
    {
    public:
        io_thread_t (zmq::ctx_t *ctx_, uint32_t tid_);
        ~io_thread_t ();
        void start (); //  Launch the physical thread.
        void stop ();//  Ask underlying thread to stop.
        ...
    private:
        mailbox_t mailbox;//  I/O thread accesses incoming commands via this mailbox.
        poller_t::handle_t mailbox_handle;//  Handle associated with mailbox' file descriptor.
        poller_t *poller;//  I/O multiplexing is performed using a poller object.
    }

zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
    object_t (ctx_, tid_)
{
    poller = new (std::nothrow) poller_t;
    alloc_assert (poller);

    mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
    poller->set_pollin (mailbox_handle);
}

构造函数中把mailbox_t句柄放入poller中,让poller监听其读事件,所以,如果有信号发过来,poller会被唤醒,并调用io_thread_t的in_event:

void zmq::io_thread_t::in_event ()
{
    //  TODO: Do we want to limit number of commands I/O thread can
    //  process in a single go?

    command_t cmd;
    int rc = mailbox.recv (&cmd, 0);

    while (rc == 0 || errno == EINTR) {//如果读管道中有内容或者等待信号的时候被中断,将一直读取
        if (rc == 0)
            cmd.destination->process_command (cmd);
        rc = mailbox.recv (&cmd, 0);
    }

    errno_assert (rc != 0 && errno == EAGAIN);
}

可以看到,in_event使用了mailbox_t的接收命令的功能。接收到命令之后,调用destination处理命令的功能去处理命令。

socket_base_t线程收命令

之前说到socket_base_t的每个实例都可以看成是一个zmq线程,但是比较特殊,并使用使用poller,而是在使用到socket的下面几个方法的时候去检测是否有未处理的命令:

int zmq::socket_base_t::getsockopt (int option_, void *optval_,size_t *optvallen_)
int zmq::socket_base_t::bind (const char *addr_)
int zmq::socket_base_t::connect (const char *addr_)
int zmq::socket_base_t::term_endpoint (const char *addr_)
int zmq::socket_base_t::send (msg_t *msg_, int flags_)
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
void zmq::socket_base_t::in_event ()//这个函数只有在销毁socke的时候会被用到,在后面讲zmq_close的时候会说到

检查的手段就是调用process_commands方法:

int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
{
    int rc;
    command_t cmd;
    if (timeout_ != 0) {
        //  If we are asked to wait, simply ask mailbox to wait.
        rc = mailbox.recv (&cmd, timeout_);
    }
    else {
        some code
        rc = mailbox.recv (&cmd, 0);
    }
    //  Process all available commands.
    while (rc == 0) {
        cmd.destination->process_command (cmd);
        rc = mailbox.recv (&cmd, 0);
    }
    some code
}

可见,最终都是使用mailbox_t的接收命令的功能。

这里有一个值得思考的问题,为什么socket_base_t实例这个线程不使用poller呢?每次使用上面那些方法的时候去检查不是很麻烦吗?

说一下个人理解,不见得正确。socket_base_t实例之所以被认为是一个特殊的线程,是因为其和io_thread_t一样,都具有收发命令的功能,(关于这点可以看一下io_thread_t的源码,可以发现其主要功能就是收发命令),但是socket_base_t实例是由用户线程创建的,也就是依附于用户线程,而zmq中所有通信都是异步了,所以用户线程是不能被阻塞的,一旦使用poller,线程将被阻塞,也就违背了设计初衷。

mailbox_t

上面说到线程间收发命令都是通过mailbox_t实现的,现在就来看看mailbox_t到底是如何实现的,mailbox_t的声明如下(源码位于Mailbox.hpp/.cpp)

    class mailbox_t
    {
    public:
        mailbox_t ();
        ~mailbox_t ();
        fd_t get_fd ();
        void send (const command_t &cmd_);
        int recv (command_t *cmd_, int timeout_);
    private:
        typedef ypipe_t <command_t, command_pipe_granularity> cpipe_t; //  The pipe to store actual commands.
        cpipe_t cpipe;
        signaler_t signaler;//  信号器将信号从写线程传递到读线程。


        mutex_t sync;//只有一个线程从mailbox中接受消息,但是会有大量的线程往mailbox中发送消息,鉴于ypipe需要同步访问两端的两端,我们必须同步发送端
        bool active; //  如果基础管道处于活动状态,即允许我们从中读取命令,则为True。

        //  Disable copying of mailbox_t object.
        mailbox_t (const mailbox_t&);
        const mailbox_t &operator = (const mailbox_t&);
    };

mailbox_t中的有几个属性很关键,有必要说一下

先来想一个问题,既然signaler可作为信号通知,为何还要active这个属性?然后带着问题看源码

现在来看,线程th1如何向线程th2发送命令?在zmq中是这么做的,th1先把命令写入th2的管道cpipe中,然后刷新th2的管道,再使用signaler发送一个信号给th2,告诉th2我向你的管道写了一个命令,你可以去管道读命令了。

void zmq::mailbox_t::send (const command_t &cmd_)
{
    sync.lock();//互斥写命令端
    //关于cpipe的详细实现,会在下一篇详细的介绍,现在只需要知道函数的功能就可以了
    cpipe.write (cmd_, false);//向接受送方mailbox_t管道写入命令,在没有调用flush之前,接收方看不到这个命令
    bool ok = cpipe.flush ();//刷新管道,这个时候接收方能看到刚才那条命令了
    sync.unlock ();
    if (!ok)
        signaler.send ();//发送信号给接受命令的一方
}

再说th2读命令,如果th2是socket_base_t实例线程,先调用process_commands,process_commands会循环调度process_commands的recv函数,直到没有命令可读时退出循环;如果th2是io_thread_t这类信息,会有poller监听信号的到来,然后调用线程的in_event,in_event又会循环调用mailbox_t的recv函数,直到没有命令可读时退出循环,并睡眠,等待再次被信号唤醒。需要注意的是,这两类线程对发送过的信号都在mailbox_t的recv函数中处理的。现在就来看一下mailbox_t是如何接收命令的:

int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
{
    //  Try to get the command straight away.
    if (active) {
        bool ok = cpipe.read (cmd_);
        if (ok)
            return 0;

        //  If there are no more commands available, switch into passive state.
        //  没有命令可读时,先把信箱设置为未激活状态,表示没命令可读,然后把对方发过来的激活信箱的信号处理一下(没什么特殊的处理,就是接受一下)
        active = false;
        signaler.recv ();
    }

    //  Wait for signal from the command sender.
    int rc = signaler.wait (timeout_);//signaler.wait的返回值有三种①wait函数出错,返回-1,并且设置errno=EINTR②返回-1并且errno=EAGAIN,表示信号没等到③等到信号。
    if (rc != 0 && (errno == EAGAIN || errno == EINTR))//这里对应wait的前两种情况
        return -1;

    //  We've got the signal. Now we can switch into active state.
    active = true;//等到激活信箱的信号了,激活信箱

    //  Get a command.
    errno_assert (rc == 0);
    bool ok = cpipe.read (cmd_);
    zmq_assert (ok);
    return 0;
}

从代码上来看,recv是这样工作的,先检查信箱是否激活,如果已经被激活,直接读命令退出;如果没激活,先去等激活信号,等到了就读命令退出,没等到就直接退出。需要注意的是,调用recv的函数都在recv上包裹了一个while,大概是这种形式while(true){ mailbox.recv() ;},(可以看上面源码是怎么调用recv的),也就是调用者会一直调用recv读命令,直到读不出命令为止,然后把激活信号取走,把信箱设置未激活态。这就是接收命令的流程。

所以,active和signaler是这样合作的:写命令线程每写一条命令,先去检查读命令线程是否阻塞,如果阻塞,会调用读命令线程mailbox_t中的signaler,发送一个激活读线程mailbox_t的信号,读线程收到这个命令后在recv函数中把activ设置为true,这时,读线程循环调用recv的时候,发现active为true,就会一直读命令,直到没命令可读时,又把active设置为false,等待下一次信号到来。

现在可以回答上面那个问题了,active是否多余?

先试想一下如果不使用active,每写一条命令都必须发送一个信号读读线程,在大并发的情况下,这也是一笔消耗。而使用active,只需要在读线程睡眠的时候(没有命令可读时,io_thread_t这类线程会睡眠,socket_base_t实例线程特殊,不会睡眠)发送信号唤醒读线程就可以,可以节省大量的资源。

标签:cmd,C++,mailbox,命令,源码,线程,command,zmq
来源: https://blog.csdn.net/zhizhengguan/article/details/120809729