编程语言
首页 > 编程语言> > [源码解析] 消息队列 Kombu 之 基本架构

[源码解析] 消息队列 Kombu 之 基本架构

作者:互联网

Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象,是一个把消息传递封装成统一接口的库。其特点是支持多种的符合APMQ协议的消息队列系统。通过本系列,大家可以了解 Kombu 是如何实现 AMQP。本文先介绍相关概念和整体逻辑架构。

[源码解析] 消息队列 Kombu 之 基本架构


目录


0x00 摘要

从本文开始,我们通过一个系列来介绍消息队列 Kombu(为后续Celery分析打基础)。

Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象,是一个把消息传递封装成统一接口的库。其特点是支持多种的符合APMQ协议的消息队列系统。不仅支持原生的AMQP消息队列如RabbitMQ、Qpid,还支持虚拟的消息队列如redis、mongodb、beantalk、couchdb、in-memory等。

通过本系列,大家可以了解 Kombu 是如何实现 AMQP。本文先介绍相关概念和整体逻辑架构。

0x01 AMQP

介绍 AMQP 是因为 Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息网络协议

1.1 基本概念

AMQP的基本概念如下:

1.2 工作过程

工作过程是:

基本如下图:

                  +----------------------------------------------+
                  |                  AMQP Entity                 |
                  |                                              |
                  |                                              |
                  |                                              |
+-----------+     |    +------------+   binding   +---------+    |       +------------+
|           |     |    |            |             |         |    |       |            |
| Publisher | +------> |  Exchange  | +---------> |  Queue  | +--------> |  Consumer  |
|           |     |    |            |             |         |    |       |            |
+-----------+     |    +------------+             +---------+    |       +------------+
                  |                                              |
                  |                                              |
                  +----------------------------------------------+

0x02 Poll系列模型

Kombu 利用了 Poll 模型,所以我们有必要介绍下。这就是IO多路复用。

IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程。IO多路复用适用比如当客户处理多个描述字时(一般是交互式输入和网络套接口)。

与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。

2.1 select

select 通过一个select()系统调用来监视多个文件描述符的数组(在linux中一切事物皆文件,块设备,socket连接等)。

当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位(变成ready),使得进程可以获得这些文件描述符从而进行后续的读写操作(select会不断监视网络接口的某个目录下有多少文件描述符变成ready状态【在网络接口中,过来一个连接就会建立一个'文件'】,变成ready状态后,select就可以操作这个文件描述符了)。

2.2 poll

poll 和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll() 的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

2.3 epoll

epoll由内核直接支持,可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些。

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表 就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了 这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描 述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调 机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

2.4 通俗理解

2.4.1 阻塞I/O模式

阻塞I/O模式下,内核对于I/O事件的处理是阻塞或者唤醒,一个线程只能处理一个流的I/O事件。如果想要同时处理多个流,要么多进程(fork),要么多线程(pthread_create),很不幸这两种方法效率都不高。

2.4.2 非阻塞模式

非阻塞忙轮询的I/O方式可以同时处理多个流。我们只要不停的把所有流从头到尾问一遍,又从头开始。这样就可以处理多个流了,但这样的做法显然不好,因为如果所有的流都没有数据,那么只会白白浪费CPU。

2.4.2.1 代理模式

非阻塞模式下可以把I/O事件交给其他对象(select以及epoll)处理甚至直接忽略。

为了避免CPU空转,可以引进一个代理(一开始有一位叫做select的代理,后来又有一位叫做poll的代理,不过两者的本质是一样的)。这个代理比较厉害,可以同时观察许多流的I/O事件,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有I/O事件时,就从阻塞态中醒来,于是我们的程序就会轮询一遍所有的流(于是我们可以把“忙”字去掉了)。代码长这样:

 while true {  
       select(streams[])  
       for i in streams[] {  
             if i has data  
             read until unavailable  
        }  
 }

于是,如果没有I/O事件产生,我们的程序就会阻塞在select处。但是依然有个问题,我们从select那里仅仅知道了,有I/O事件发生了,但却并不知道是那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。

2.4.2.2 epoll

epoll可以理解为event poll,不同于忙轮询和无差别轮询,epoll只会把哪个流发生了怎样的I/O事件通知我们。此时我们对这些流的操作都是有意义的(复杂度降低到了O(1))。

epoll版服务器实现原理类似于select版服务器,都是通过某种方式对套接字进行检验其是否能收发数据等。但是epoll版的效率要更高,同时没有上限。

在select、poll中的检验,是一种被动的轮询检验,而epoll中的检验是一种主动地事件通知检测,即:当有套接字符合检验的要求,便会主动通知,从而进行操作。这样的机制自然效率会高一点。

同时在epoll中要用到文件描述符,所谓文件描述符实质上是数字。

epoll的主要用处在于:

epoll_list = epoll.epoll()

如果进程在处理while循环中的代码时,一些套接字对应的客户端如果发来了数据,那么操作系统底层会自动的把这些套接字对应的文件描述符写入该列表中,当进程再次执行到epoll时,就会得到了这个列表,此时这个列表中的信息就表示着哪些套接字可以进行收发了。因为epoll没有去依次的查看,而是直接拿走已经可以收发的fd,所以效率高!

0x03 Kombu 基本概念

Kombu的最初的实现叫做carrot,后来经过重构才成了Kombu。

3.1 用途

Kombu 主要用途如下:

3.2 术语

在 Kombu 中,存在多个概念(部分和AMQP类似),他们分别是:

0x04 概念具体说明

4.1 概述

以 redis 为 broker,我们简要说明:

4.2 Connection

Connection是对 MQ 连接的抽象,一个 Connection 就对应一个 MQ 的连接。现在就是对 'redis://localhost:6379' 连接进行抽象。

conn = Connection('redis://localhost:6379')

由之前论述可知,Connection是到broker的连接。从具体代码可以看出,Connection更接近是一个逻辑概念,具体功能都委托给别人完成。

Connection主要成员变量是:

精简版定义如下:

class Connection:
    """A connection to the broker"""

    port = None

    _connection = None
    _default_channel = None
    _transport = None

    #: Iterator returning the next broker URL to try in the event
    #: of connection failure (initialized by :attr:`failover_strategy`).
    cycle = None

    #: Additional transport specific options,
    #: passed on to the transport instance.
    transport_options = None

    #: Strategy used to select new hosts when reconnecting after connection
    #: failure.  One of "round-robin", "shuffle" or any custom iterator
    #: constantly yielding new URLs to try.
    failover_strategy = 'round-robin'

    #: Heartbeat value, currently only supported by the py-amqp transport.
    heartbeat = None

    failover_strategies = failover_strategies

4.3 Channel

Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连接。就是真正的连接。

Channel 可以认为是 redis 操作和连接的封装。每个 Channel 都可以与 redis 建立一个连接,在此连接之上对 redis 进行操作,每个连接都有一个 socket,每个 socket 都有一个 file,从这个 file 可以进行 poll。

4.3.1 定义

简化版定义如下:

class Channel(virtual.Channel):
    """Redis Channel."""

    QoS = QoS

    _client = None
    _subclient = None
    keyprefix_queue = '{p}_kombu.binding.%s'.format(p=KEY_PREFIX)
    keyprefix_fanout = '/{db}.'
    sep = '\x06\x16'
    _fanout_queues = {}
    unacked_key = '{p}unacked'.format(p=KEY_PREFIX)
    unacked_index_key = '{p}unacked_index'.format(p=KEY_PREFIX)
    unacked_mutex_key = '{p}unacked_mutex'.format(p=KEY_PREFIX)
    unacked_mutex_expire = 300  # 5 minutes
    unacked_restore_limit = None
    visibility_timeout = 3600   # 1 hour
    max_connections = 10
    queue_order_strategy = 'round_robin'

    _async_pool = None
    _pool = None

    from_transport_options = (
        virtual.Channel.from_transport_options +
        ('sep',
         'ack_emulation',
         'unacked_key',
		 ......
         'max_connections',
         'health_check_interval',
         'retry_on_timeout',
         'priority_steps')  # <-- do not add comma here!
    )

    connection_class = redis.Connection if redis else None
    
	self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}

4.3.2 redis消息回调函数

关于上面成员变量,这里需要说明的是

 handlers = {dict: 2} 
  {
    'BRPOP':<bound method Channel._brpop_read of >, 
    'LISTEN':<bound method Channel._receive of >
  }

这是redis有消息时的回调函数,即:

大约如下:

            +---------------------------------------------------------------------------------------------------------------------------------------+
            |                                     +--------------+                                   6                       parse_response         |
            |                                +--> | Linux Kernel | +---+                                                                            |
            |                                |    +--------------+     |                                                                            |
            |                                |                         |                                                                            |
            |                                |                         |  event                                                                     |
            |                                |  1                      |                                                                            |
            |                                |                         |  2                                                                         |
            |                                |                         |                                                                            |
    +-------+---+    socket                  +                         |                                                                            |
    |   redis   |  port +-->  fd +--->+                  v                                                                            |
    |           |                                   |           +------+--------+                                                                   |
    |           |    socket                         |           |  Hub          |                                                                   |
    |           |  port +-->  fd +--->----------> |               |                                                                   |
    | port=6379 |                                   |           |               |                                                                   |
    |           |    socket                         |           |     readers +----->  Transport.on_readable                                        |
    |           |  port +-->  fd +--->+           |               |                     +                                             |
    +-----------+                                               +---------------+                     |                                             |
                                                                                                      |                                             |
                                                        3                                             |                                             |
             +----------------------------------------------------------------------------------------+                                             |
             |                                                                                                                                      v
             |                                                                                                                                                  _receive_callback
             |                                                                                                                            5    +-------------+                      +-----------+
+------------+------+                     +-------------------------+                                    'BRPOP' = Channel._brpop_read +-----> | Channel     | +------------------> | Consumer  |
|       Transport   |                     |  MultiChannelPoller     |      +------>  channel . handlers  'LISTEN' = Channel._receive           +-------------+                      +---+-------+
|                   |                     |                         |      |                                                                                           8                |
|                   | on_readable(fileno) |                         |      |                                                                         ^                                  |
|           cycle +---------------------> |          _fd_to_chan +---------------->  channel . handlers  'BRPOP' = Channel._brpop_read               |                                  |
|                   |        4            |                         |      |                             'LISTEN' = Channel._receive                 |                                  |
|  _callbacks[queue]|                     |                         |      |                                                                         |                            on_m  |  9
|          +        |                     +-------------------------+      +------>  channel . handlers  'BRPOP' = Channel._brpop_read               |                                  |
+-------------------+                                                                                    'LISTEN' = Channel._receive                 |                                  |
           |                                                                                                                                         |                                  v
           |                                                7           _callback                                                                    |
           +-----------------------------------------------------------------------------------------------------------------------------------------+                            User Function

手机如图:

4.4 Transport

Transport:真实的 MQ 连接,也是真正连接到 MQ(redis/rabbitmq) 的实例。就是存储和发送消息的实体,用来区分底层消息队列是用amqp、Redis还是其它实现的。

我们顺着上文理一下:

在Kombu 体系中,用 transport 对所有的 broker 进行了抽象,为不同的 broker 提供了一致的解决方案。通过Kombu,开发者可以根据实际需求灵活的选择或更换broker。

Transport负责具体操作,但是 很多操作移交给 loop 与 MultiChannelPoller 进行。

其主要成员变量为:

其中重点是MultiChannelPoller。一个Connection有一个Transport, 一个Transport有一个MultiChannelPoller,对poll操作都是由MultiChannelPoller完成,redis操作由channel完成。

定义如下:

class Transport(virtual.Transport):
    """Redis Transport."""

    Channel = Channel

    polling_interval = None  # disable sleep between unsuccessful polls.
    default_port = DEFAULT_PORT
    driver_type = 'redis'
    driver_name = 'redis'

    implements = virtual.Transport.implements.extend(
        asynchronous=True,
        exchange_type=frozenset(['direct', 'topic', 'fanout'])
    )

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # All channels share the same poller.
        self.cycle = MultiChannelPoller()

4.5 MultiChannelPoller

MultiChannelPoller 定义如下,可以理解为 执行 engine,主要作用是:

或者从逻辑上这么理解,MultiChannelPoller 就是:

具体定义如下:

class MultiChannelPoller:
    """Async I/O poller for Redis transport."""

    eventflags = READ | ERR

    def __init__(self):
        # active channels
        self._channels = set()
        # file descriptor -> channel map.
        self._fd_to_chan = {}
        # channel -> socket map
        self._chan_to_sock = {}
        # poll implementation (epoll/kqueue/select)
        self.poller = poll()
        # one-shot callbacks called after reading from socket.
        self.after_read = set()

4.6 Consumer

Consumer 是消息接收者。Consumer & 相关组件 的作用主要如下:

在具体 Consumer 的实现中,它把 queue 与 channel 联系起来。queue 里面有一个 channel,用来访问redis,也有 Exchange,知道访问具体 redis 哪个key(就是queue对应的那个key)。

Consumer 消费消息是通过 Queue 来消费,然后 Queue 又转嫁给 Channel。

所以服务端的逻辑大致为:

  1. 建立连接;
  2. 创建Exchange ;
  3. 创建Queue,并将Exchange与Queue绑定,Queue的名称为routing_key ;
  4. 创建Consumer对Queue监听;

Consumer 定义如下:

class Consumer:
    """Message consumer.

    Arguments:
        channel (kombu.Connection, ChannelT): see :attr:`channel`.
        queues (Sequence[kombu.Queue]): see :attr:`queues`.
        no_ack (bool): see :attr:`no_ack`.
        auto_declare (bool): see :attr:`auto_declare`
        callbacks (Sequence[Callable]): see :attr:`callbacks`.
        on_message (Callable): See :attr:`on_message`
        on_decode_error (Callable): see :attr:`on_decode_error`.
        prefetch_count (int): see :attr:`prefetch_count`.
    """

    ContentDisallowed = ContentDisallowed

    #: The connection/channel to use for this consumer.
    channel = None

    #: A single :class:`~kombu.Queue`, or a list of queues to
    #: consume from.
    queues = None

    #: Flag for automatic message acknowledgment.
    no_ack = None

    #: By default all entities will be declared at instantiation, if you
    #: want to handle this manually you can set this to :const:`False`.
    auto_declare = True

    #: List of callbacks called in order when a message is received.
    callbacks = None

    #: Optional function called whenever a message is received.
    on_message = None

    #: Callback called when a message can't be decoded.
    on_decode_error = None

    #: List of accepted content-types.
    accept = None

    #: Initial prefetch count
    prefetch_count = None

    #: Mapping of queues we consume from.
    _queues = None

    _tags = count(1)   # global

此时总体逻辑如下图:

+----------------------+               +-------------------+
| Consumer             |               | Channel           |
|                      |               |                   |        +-----------------------------------------------------------+
|                      |               |    client  +-------------> | Redis<ConnectionPool<Connection|
|      channel  +--------------------> |                   |        +-----------------------------------------------------------+
|                      |               |    pool           |
|                      |   +---------> |                   |  |    connection +---------------+                                                  |
|        |             |   |    |      |                   |           |                                                  |
+----------------------+   |    |      +-------------------+           |                                                  |
         |                 |    |                                      v                                                  |
         |                 |    |      +-------------------+       +---+-----------------+       +--------------------+   |
         |                 |    |      | Connection        |       | redis.Transport     |       | MultiChannelPoller |   |
         |                 |    |      |                   |       |                     |       |                    |   |
         |                 |    |      |                   |       |                     |       |     _channels +--------+
         |                 |    |      |                   |       |        cycle +------------> |     _fd_to_chan    |
         |                 |    |      |     transport +---------> |                     |       |     _chan_to_sock  |
         |       +-------->+    |      |                   |       |                     |    +------+ poller         |
         |       |              |      +-------------------+       +---------------------+    |  |     after_read     |
         |       |              |                                                             |  |                    |
         |       |              |                                                             |  +--------------------+
         |       |              |      +------------------+                   +---------------+
         |       |              |      | Hub              |                   |
         |       |              |      |                  |                   v
         |       |              |      |                  |            +------+------+
         |       |              |      |      poller +---------------> | _poll       |
         |       |              |      |                  |            |             |         +-------+
         |       |              |      |                  |            |    _poller+---------> |  poll |
         v       |              |      +------------------+            |             |         +-------+
                 |              |                                      +-------------+
    +-------------------+       |      +----------------+
    | Queue      |      |       |      | Exchange       |
    |      _chann+l     |       +----+ |                |
    |                   |              |                |
    |      exchange +----------------> |     channel    |
    |                   |              |                |
    |                   |              |                |
    +-------------------+              +----------------+

手机如下:

现在我们知道:

4.7 Producer

Producer 是消息发送者。Producer中,主要变量是:

class Producer:
    """Message Producer.

    Arguments:
        channel (kombu.Connection, ChannelT): Connection or channel.
        exchange (kombu.entity.Exchange, str): Optional default exchange.
        routing_key (str): Optional default routing key.
    """

    #: Default exchange
    exchange = None

    #: Default routing key.
    routing_key = ''

    #: Default serializer to use. Default is JSON.
    serializer = None

    #: Default compression method.  Disabled by default.
    compression = None

    #: By default, if a defualt exchange is set,
    #: that exchange will be declare when publishing a message.
    auto_declare = True

    #: Basic return callback.
    on_return = None

    #: Set if channel argument was a Connection instance (using
    #: default_channel).
    __connection__ = None

逻辑如图:

+----------------------+               +-------------------+
| Producer             |               | Channel           |
|                      |               |                   |        +-----------------------------------------------------------+
|                      |               |    client  +-------------> | Redis<ConnectionPool<Connection|
|      channel   +------------------>  |                   |        +-----------------------------------------------------------+
|                      |               |    pool           |
|      exchange        |   +---------> |                   |  |    connection +---------------+                                                  |
|             +        |   |    |      |                   |           |                                                  |
+--+-------------------+   |    |      +-------------------+           |                                                  |
   |          |            |    |                                      v                                                  |
   |          |            |    |      +-------------------+       +---+-----------------+       +--------------------+   |
   |          |            |    |      | Connection        |       | redis.Transport     |       | MultiChannelPoller |   |
   |          +----------------------> |                   |       |                     |       |                    |   |
   |                       |    |      |                   |       |                     |       |     _channels +--------+
   |                       |    |      |                   |       |        cycle +------------> |     _fd_to_chan    |
   |                       |    |      |     transport +---------> |                     |       |     _chan_to_sock  |
   |             +-------->+    |      |                   |       |                     |    +------+ poller         |
   |             |              |      +-------------------+       +---------------------+    |  |     after_read     |
   |             |              |                                                             |  |                    |
   |             |              |                                                             |  +--------------------+
   |             |              |      +------------------+                   +---------------+
   |             |              |      | Hub              |                   |
   |             |              |      |                  |                   v
   |             |              |      |                  |            +------+------+
   |             |              |      |      poller +---------------> | _poll       |
   | publish     |              |      |                  |            |             |         +-------+
   +--------------------------------+  |                  |            |    _poller+---------> |  poll |
                 |              |   |  +------------------+            |             |         +-------+
                 |              |   |                                  +-------------+
    +-------------------+       |   +-----> +----------------+
    | Queue      |      |       |           | Exchange       |
    |      _channel     |       +---------+ |                |
    |                   |                   |                |
    |      exchange +-------------------->  |     channel    |
    |                   |                   |                |
    |                   |                   |                |
    +-------------------+                   +----------------+

手机如图:

4.8 Hub

用户可以通过同步方式自行读取消息,如果不想自行读取,也可以通过Hub(本身构建了一个异步消息引擎)读取。

4.8.1 自己的poller

Hub 是一个eventloop,拥有自己的 poller。

前面在 MultiChannelPoller 中间提到了,MultiChannelPoller  会建立了自己内部的 poller。但是实际上在注册时候,Transport 会使用 hub 的 poller,而非 MultiChannelPoller 内部的 poller。

4.8.2 Connection

Connection注册到Hub,一个Connection对应一个Hub。

hub = Hub()
conn = Connection('redis://localhost:6379')
conn.register_with_event_loop(hub)

4.8.3 联系

在注册过程中,Hub 把自己内部的 poller 配置在 Transport 之中。这样就通过 transport 内部的 MultiChannelPoller 可以把 Hub . poller 和 Channel 对应的 socket 同poll联系起来,一个 socket 在 linux 系统中就是一个file,就可以进行 poll 操作;

因而,如前面所述,这样 MultiChannelPoller 就可以 打通 Channel ---> socket ---> poll ---> fd ---> 读取 redis 这条通路了,就是如果 redis 有数据来了,MultiChannelPoller 就马上通过 poll 得到通知,就去 redis 读取。

def register_with_event_loop(self, loop):
    self.transport.register_with_event_loop(self.connection, loop)

4.8.4 定义

Hub定义如下:

class Hub:
    """Event loop object.
    """

    def __init__(self, timer=None):
        self.timer = timer if timer is not None else Timer()
        self.readers = {}
        self.writers = {}
        self.on_tick = set()
        self.on_close = set()
        self._ready = set()
        self._create_poller()

    @property
    def poller(self):
        if not self._poller:
            self._create_poller()
        return self._poller

    def _create_poller(self):
        self._poller = poll()
        self._register_fd = self._poller.register
        self._unregister_fd = self._poller.unregister

    def add(self, fd, callback, flags, args=(), consolidate=False):
        fd = fileno(fd)
        try:
            self.poller.register(fd, flags)
        except ValueError:
            self._remove_from_loop(fd)
            raise
        else:
            dest = self.readers if flags & READ else self.writers
            if consolidate:
                self.consolidate.add(fd)
                dest[fd] = None
            else:
                dest[fd] = callback, args

    def run_forever(self):
        self._running = True
        try:
            while 1:
                try:
                    self.run_once()
                except Stop:
                    break
        finally:
            self._running = False

    def run_once(self):
        try:
            next(self.loop)
        except StopIteration:
            self._loop = None

    def create_loop(self, ...):
        readers, writers = self.readers, self.writers
        poll = self.poller.poll

        while 1:
                for fd, event in events or ():
                    cb, cbargs = readers[fd]
                    if isinstance(cb, generator):
                        next(cb)
                        cb(*cbargs)
            else:
                # no sockets yet, startup is probably not done.
                sleep(min(poll_timeout, 0.1))
            yield

0x05 总结

我们通过文字和图例来总结下本文。

5.1 逻辑

5.2 示例图

具体如图,可以看到,

+-------------------+
| Channel           |
|                   |        +-----------------------------------------------------------+
|    client  +-------------> | Redis<ConnectionPool<Connection|
|                   |        +-----------------------------------------------------------+
|                   |
|                   |        +---------------------------------------------------+-+
|    pool  +-------------->  |ConnectionPool<Connection|
|                   |        +---------------------------------------------------+-+
|                   |
|                   |  |     _fd_to_chan    |
|     transport +---------> |                     |       |     _chan_to_sock  |
|                   |       |                     |    + | _poll       |
|                  |            |             |         +-------+
|                  |            |    _poller+---------> |  poll |
+------------------+            |             |         +-------+
                                +-------------+
+----------------+         +-------------------+
| Exchange       |         | Queue             |
|                |         |                   |
|                |         |                   |
|     channel    | <------------+ exchange     |
|                |         |                   |
|                |         |                   |
+----------------+         +-------------------+

我们下文用实例来介绍Kombu的启动过程。

因为本文是一个综述,所以大家会发现,一些概念讲解文字会同时出现在后续文章和综述之中。

0xFF 参考

celery 7 优秀开源项目kombu源码分析之registry和entrypoint

IO 多路复用是什么意思?

IO多路复用之select总结

Kombu消息框架

rabbitmq基本原理总结

标签:AMQP,Exchange,队列,self,源码,Kombu,Channel,消息
来源: https://blog.51cto.com/u_15179348/2733983