Celery是Python中最流行的异步消息队列框架,支持RabbitMQ、Redis、ZoopKeeper等作为Broker,而对这些消息队列的抽象,都是通过Kombu实现的。Kombu实现了对AMQP transport和non-AMQP transports(Redis、Amazon SQS、ZoopKeeper等)的兼容。

AMQP中的各种概念,Message、Producer、Exchange、Queue、Consumer、Connection、Channel在Kombu中都相应做了实现,另外Kombu还实现了Transport,就是存储和发送消息的实体,用来区分底层消息队列是用amqp、Redis还是其它实现的。

Message:消息,发送和消费的主体
Producer: 消息发送者
Consumer:消息接收者
Exchange:交换机,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列
Queue:消息队列,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息
Connection:对消息队列连接的抽象,可以将其理解成一个包含所有角色消费者和生产者等的集合和中间站
Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连接
Transport:真实的MQ连接,区分底层消息队列的实现,对于不同的Transport的支持

********************************************************************part1:official example************************************************************************
代码示例
先从官网示例代码开始:

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print body
    message.ack()

# connections

with Connection('amqp://guest:guest@localhost//') as conn:

    # produce
    producer = conn.Producer(serializer='json')
    producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
                      exchange=media_exchange, routing_key='video',
                      declare=[video_queue])

    # the declare above, makes sure the video queue is declared
    # so that the messages can be delivered.
    # It's a best practice in Kombu to have both publishers and
    # consumers declare the queue. You can also declare the
    # queue manually using:
    #     video_queue(conn).declare()

    # consume
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()

上面阐述了一个完整的任务生成和消费的过程
衍生:同一个消费者消费多个队列的消息
# Consume from several queues on the same channel:
video_queue = Queue('video', exchange=media_exchange, key='video')
image_queue = Queue('image', exchange=media_exchange, key='image')

with connection.Consumer([video_queue, image_queue],
                         callbacks=[process_media]) as consumer:
    while True:
        connection.drain_events()

************************************************************part2:roles explaination************************************************************
基本上,各种角色都出场了。各种角色的使用都要从建立Connection开始。
Connection的初始化可以看成消息库amqp使用的入口,使用方式如下:
>>> from kombu import Connection
>>> conn = Connection('amqp://guest:guest@localhost:5672//')
******************************************************************************************************************************************************
****Connection 功能1: 初始化Transport对象,Transport对象感知broker代理的类型,可以通过Transport来生成不感知消息队列实现的channel对象(channel是生产函数Transport的产品)
当初始化Connection对象时,Connection中并不会初始化Connection中的Transport对象属性,只有在调用conn.connect()函数或者获取connection属性的时候才会生成,相当于python的懒加载机制(也称惰性加载,惰性实现,常见使用:生成器),
懒加载机制的优点是推迟加载操作,直到不得不这么做。当操作开销很大,懒加载机制是比较好的方式。
所以现在的连接其实并未真正建立,只有在需要使用的时候才真正建立连接并将连接缓存:
懒加载实现方法,描述符property:
@property
def connection(self):
    """The underlying connection object.
    Warning:
        This instance is transport specific, so do not
        depend on the interface of this object.
    """
    if not self._closed:
        if not self.connected:
            self.declared_entities.clear()
            self._default_channel = None
            self._connection = self._establish_connection()
            self._closed = False
        return self._connection
加载方式:
>>> conn.connect()
def connect(self):
    """Establish connection to server immediately."""
    self._closed = False
    return self.connection
实践:
>>> from kombu import Connection, Exchange, Queue
>>> conn = Connection('redis://root:***@localhost//')
>>> conn._connection
>>> conn.connect()
<kombu.transport.redis.Transport object at 0x7f3681133ac0>
>>> conn._connection
<kombu.transport.redis.Transport object at 0x7f3681133ac0>
>>> conn.transport
<kombu.transport.redis.Transport object at 0x7f3681133ac0>
通过Transport,我们可以生成channel(底层连接对象),通过Transport创建channel的使用方式如下:
conn_inst = self.transport.establish_connection() 
# python3\Lib\site-packages\kombu\transport\virtual\base.py 
  def establish_connection(self):
    # creates channel to verify connection.
    # this channel is then used as the next requested channel.
    # (returned by ``create_channel``).
    self._avail_channels.append(self.create_channel(self))
    return self
调用Transport的establish_connection函数,创建channel, 并加入到transport对象中的_avail_channels队列里,由transport对象统一管理
实践:
>>> conn.transport.establish_connection()
<kombu.transport.redis.Transport object at 0x7f3681133ac0>
>>> conn.transport._avail_channels
[<kombu.transport.redis.Channel object at 0x7f36811816d0>]

channel连接需要显式的关闭:
>>> conn.release()    #    调用Transport中所有channel的channel.close()方法关闭连接

由于Connection实现了上下文生成器:
def __enter__(self):
    return self
def __exit__(self, *args):
    self.release()
所以可以使用with语句,以免忘记关闭channel连接,使用方式如下:
with Connection() as connection:
    # work with connection
    

******************************************************************************************************************************************************
****Connection 功能2:使用Connection类建立Procuder(生产者)和Consumer(消费者)
producer = conn.Producer(serializer='json')
consumer = conn.Consumer(video_queue, callbacks=[process_media])
源码:
def Producer(self, channel=None, *args, **kwargs):
    """Create new :class:`kombu.Producer` instance."""
    from .messaging import Producer
    return Producer(channel or self, *args, **kwargs)

def Consumer(self, queues=None, channel=None, *args, **kwargs):
    """Create new :class:`kombu.Consumer` instance."""
    from .messaging import Consumer
    return Consumer(channel or self, queues, *args, **kwargs)
(1)Producer创建
方式一:连接创建后,可以使用连接创建Producer:
producer = conn.Producer(serializer='json')
为什么要使用Connection对象来生成Producer对象解释:
生产者需要建立和消息队列的连接,因此通过Connection对象来获取Transport对象管理的channel对象
producer对象初始化时会调用函数revive:
    def revive(self, channel):
        """Revive the producer after connection loss."""
        # is_connection 判断channel(初始化使用的Connection对象)是否为Connection对象,如果是将Transport对象管理的channel对象赋值给Producer的_channel属性
        if is_connection(channel):
            connection = channel
            self.__connection__ = connection
            channel = ChannelPromise(lambda: connection.default_channel)
        if isinstance(channel, ChannelPromise):
            self._channel = channel
            self.exchange = self.exchange(channel)
        else:
            # Channel already concrete
            self._channel = channel
            if self.on_return:
                self._channel.events['basic_return'].add(self.on_return)
            self.exchange = self.exchange(channel)
# 接下来需要涉及到queue和exchange对象,这里先介绍下:
*************************************************************************role queue and exchange*******************************************************************
Exchange:交换机,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列
Queue:消息队列,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息
首先注意:对象Exchange 和 Queue不通过Connection对象来创建,创建方式如下:
media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
<1> exchange交换机:
exchange的初始化方式:
    def __init__(self, name='', type='', channel=None, **kwargs):
        super().__init__(**kwargs)
        self.name = name or self.name    #交换机名称
        self.type = type or self.type    # 交换机交换方式
        self.maybe_bind(channel)    # 如果初始化传入channel对象,则绑定channel对象
<2> queue队列:
queue的初始化方式:
    def __init__(self, name='', exchange=None, routing_key='',
                 channel=None, bindings=None, on_declared=None,
                 **kwargs):
        super().__init__(**kwargs)
        self.name = name or self.name
        if isinstance(exchange, str):
            self.exchange = Exchange(exchange)
        elif isinstance(exchange, Exchange):
            self.exchange = exchange
        self.routing_key = routing_key or self.routing_key
        self.bindings = set(bindings or [])
        self.on_declared = on_declared

        # allows Queue('name', [binding(...), binding(...), ...])
        if isinstance(exchange, (list, tuple, set)):
            self.bindings |= set(exchange)
        if self.bindings:
            self.exchange = None

        # exclusive implies auto-delete.
        if self.exclusive:
            self.auto_delete = True
        self.maybe_bind(channel)
*********************************************************************************************************************************************************************
回归正题,讲下Producer发送消息过程, 使用方式:
producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
                  exchange=media_exchange, routing_key='video',
                  declare=[video_queue])
源码:
def _publish(self, body, priority, content_type, content_encoding,
┆   ┆   ┆   ┆headers, properties, routing_key, mandatory,
┆   ┆   ┆   ┆immediate, exchange, declare):
┆   channel = self.channel    
     # channel对象prepare_message方法返回的是一个字典,包含了以下信息{'body': body, 'content-encoding': content_encoding,'content-type': content_type,'headers': headers or {},'properties': properties or {}}
     # 这个是第一次对消息进行改写,加上了额外信息
┆   message = channel.prepare_message(
┆   ┆   body, priority, content_type,
┆   ┆   content_encoding, headers, properties,
┆   )  
     # 如果声明了队列,如video_queue,发布消息时会指定到队列中:如果entity也就是video_queue,没有绑定channel,使用消费者的channel对象绑定队列video_queue对象if declare:
┆   ┆   maybe_declare = self.maybe_declare
┆   ┆   [maybe_declare(entity) for entity in declare]

┆   # handle autogenerated queue names for reply_to
┆   reply_to = properties.get('reply_to')
┆   if isinstance(reply_to, Queue):
┆   ┆   properties['reply_to'] = reply_to.name
    # 剩下的事情交给channel对象来做return channel.basic_publish(
┆   ┆   message,
┆   ┆   exchange=exchange, routing_key=routing_key,
┆   ┆   mandatory=mandatory, immediate=immediate,
┆   )

********************************************************************************role channel *********************************************************************
Channel又是Transport创建的, 不感知底层的队列存储方式:
创建过程:
chan = self.transport.create_channel(self.connection)
首先介绍下Transport创建过程
当创建Connection时,需要传入hostname,类似于:amqp://guest:guest@localhost:5672//
通过获取hostname的scheme,比如redis,来区分创建的Transport的类型
transport = transport or urlparse(hostname).scheme
Transport创建过程源码:

self.transport_cls = transport

transport_cls = get_transport_cls(transport_cls)

def get_transport_cls(transport=None):
    """Get transport class by name.

    The transport string is the full path to a transport class, e.g.::

    ┆   "kombu.transport.pyamqp:Transport"

    If the name does not include `"."` (is not fully qualified),
    the alias table will be consulted.
    """
    if transport not in _transport_cache:
    ┆   _transport_cache[transport] = resolve_transport(transport)
    return _transport_cache[transport]

transport = TRANSPORT_ALIASES[transport]

TRANSPORT_ALIASES = {
    ...

    'redis': 'kombu.transport.redis:Transport',
    
    ...
}
以Redis为例,Transport类在/kombu/transport/redis.py文件,继承自/kombu/transport/virtual/base.py中的Transport类。

Channel对象创建:
class Channel(AbstractChannel, base.StdChannel):
    def __init__(self, connection, **kwargs):
        self.connection = connection
        self._consumers = set()
        self._cycle = None 
        self._tag_to_queue = {} 
        self._active_queues = [] 
        self._qos = None
        self.closed = False
        ... 
其中,_consumers是相关联的消费者标签集合,_active_queues是相关联的Queue列表,_tag_to_queue则是消费者标签与Queue的映射:

消费者角色consumer对象调用consumer()时会执行和channel建立以下联系:
self._tag_to_queue[consumer_tag] = queue
self._consumers.add(consumer_tag)
self._active_queues.append(queue)

发送消息basic_publish方法:
    def basic_publish(self, message, exchange, routing_key, **kwargs):
        """Publish message."""
        self._inplace_augment_message(message, exchange, routing_key)
        
        if exchange:
            return self.typeof(exchange).deliver(
                message, exchange, routing_key, **kwargs
            )
        # anon exchange: routing_key is the destination queue
        return self._put(routing_key, message, **kwargs)
如果存在exchange交换机,使用exchange来插入消息,标准交换机分为以下三种:
STANDARD_EXCHANGE_TYPES = {
    'direct': DirectExchange,
    'topic': TopicExchange,
    'fanout': FanoutExchange,
}

如果不存在exchange交换机,直接使用routing_key所在队列来插入消息数据
def _put(self, queue, message, **kwargs):
┆   """Deliver message."""
┆   pri = self._get_message_priority(message, reverse=False)

┆   with self.conn_or_acquire() as client:
┆   ┆   client.lpush(self._q_for_pri(queue, pri), dumps(message))
client是一个redis.StrictRedis连接:

def _create_client(self, asynchronous=False):
┆   if asynchronous:
┆   ┆   return self.Client(connection_pool=self.async_pool)
┆   return self.Client(connection_pool=self.pool)

self.Client = self._get_client()

def _get_client(self):
┆   if redis.VERSION < (3, 2, 0):
┆   ┆   raise VersionMismatch(
┆   ┆   ┆   'Redis transport requires redis-py versions 3.2.0 or later. '
┆   ┆   ┆   'You have {0.__version__}'.format(redis))
┆   return redis.StrictRedis
Redis将消息置于某个列表(lpush)中。还会根据是否异步的选项选择不同的connection_pool。

Consumer
现在消息已经被放置与队列中,那么消息又被如何使用呢?

Consumer初始化需要声明Channel和要消费的队列列表以及处理消息的回调函数列表:

with Consumer(connection, queues, callbacks=[process_media], accept=['json']):
    connection.drain_events(timeout=1)
当Consumer实例被当做上下文管理器使用时,会调用consume方法:

def __enter__(self):
    self.consume()
    return self
consume方法代码:

def consume(self, no_ack=None):
    """Start consuming messages.

    Can be called multiple times, but note that while it
    will consume from new queues added since the last call,
    it will not cancel consuming from removed queues (
    use :meth:`cancel_by_queue`).

    Arguments:
        no_ack (bool): See :attr:`no_ack`.
    """
    queues = list(values(self._queues))
    if queues:
        no_ack = self.no_ack if no_ack is None else no_ack

        H, T = queues[:-1], queues[-1]
        for queue in H:
            self._basic_consume(queue, no_ack=no_ack, nowait=True)
        self._basic_consume(T, no_ack=no_ack, nowait=False)
使用_basic_consume方法处理相关的队列列表中的每一项,其中处理最后一个Queue时设置标志nowait=False。

_basic_consume方法代码:

def _basic_consume(self, queue, consumer_tag=None,
                   no_ack=no_ack, nowait=True):
    tag = self._active_tags.get(queue.name)
    if tag is None:
        tag = self._add_tag(queue, consumer_tag)
        queue.consume(tag, self._receive_callback,
                      no_ack=no_ack, nowait=nowait)
    return tag
是将消费者标签以及回调函数传给Queue的consume方法。

Queue的consume方法代码:

def consume(self, consumer_tag='', callback=None, 
            no_ack=None, nowait=False):     
    """Start a queue consumer.      

    Consumers last as long as the channel they were created on, or
    until the client cancels them.

    Arguments:             
        consumer_tag (str): Unique identifier for the consumer.
            The consumer tag is local to a connection, so two clients
            can use the same consumer tags. If this field is empty
            the server will generate a unique tag.

        no_ack (bool): If enabled the broker will automatically
            ack messages.

        nowait (bool): Do not wait for a reply.

        callback (Callable): callback called for each delivered message.
    """
    if no_ack is None:
        no_ack = self.no_ack            
    return self.channel.basic_consume(
        queue=self.name,
        no_ack=no_ack,
        consumer_tag=consumer_tag or '',
        callback=callback,                        # 消费者中定义的回调函数,_receive_callback,主要运行message = m2p(message),也就是Convert raw message to :class:`Message` instance.
                                                    将message转换成Message类实例,然后很关键点,这个时候会调用consumer对象的callbacks属性里面的函数。
        nowait=nowait,
        arguments=self.consumer_arguments)
又回到了Channel,Channel的basic_consume代码:

def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
    """Consume from `queue`."""     
    self._tag_to_queue[consumer_tag] = queue
    self._active_queues.append(queue)

    def _callback(raw_message):         
        message = self.Message(raw_message, channel=self)
        if not no_ack:
            self.qos.append(message, message.delivery_tag)
        return callback(message)                            

    self.connection._callbacks[queue] = _callback  # _callback调用后会生成Message类实例
    self._consumers.add(consumer_tag)

    self._reset_cycle()   
Channel将Consumer标签,Consumer要消费的队列,以及标签与队列的映射关系都记录下来,等待循环调用。另外,还通过Transport将队列与回调函数列表的映射关系记录下来,以便于从队列中取出消息后执行回调函数。

真正的调用是下面这行代码实现的:

connection.drain_events(timeout=1)
现在来到Transport的drain_events方法:

def drain_events(self, connection, timeout=None):
    time_start = monotonic()
    get = self.cycle.get
    polling_interval = self.polling_interval
    if timeout and polling_interval and polling_interval > timeout:
        polling_interval = timeout
    while 1:
        try: 
            get(self._deliver, timeout=timeout)
        except Empty:
            if timeout is not None and monotonic() - time_start >= timeout:
                raise socket.timeout()
            if polling_interval is not None:
                sleep(polling_interval)
        else:
            break
看上去是在无限执行get(self._deliver, timeout=timeout)

get是self.cycle的一个方法,cycle是一个FairCycle实例:

self.cycle = self.Cycle(self._drain_channel, self.channels, Empty)

@python_2_unicode_compatible
class FairCycle(object):
    """Cycle between resources.

    Consume from a set of resources, where each resource gets
    an equal chance to be consumed from.

    Arguments: 
        fun (Callable): Callback to call.
        resources (Sequence[Any]): List of resources.
        predicate (type): Exception predicate.
    """

    def __init__(self, fun, resources, predicate=Exception):
        self.fun = fun
        self.resources = resources
        self.predicate = predicate
        self.pos = 0

    def _next(self):
        while 1:
            try:
                resource = self.resources[self.pos]
                self.pos += 1
                return resource
            except IndexError:
                self.pos = 0
                if not self.resources:
                    raise self.predicate()

    def get(self, callback, **kwargs):
        """Get from next resource."""
        for tried in count(0):  # for infinity
            resource = self._next()
            try:
                return self.fun(resource, callback, **kwargs)
            except self.predicate:
                # reraise when retries exchausted.
                if tried >= len(self.resources) - 1:
                    raise
FairCycle接受两个参数,fun是要执行的函数fun,而resources作为一个迭代器,每次提供一个item供fun调用。

此处的fun是_drain_channel,resources是channels:

def _drain_channel(self, channel, callback, timeout=None):
    return channel.drain_events(callback=callback, timeout=timeout)
Transport相关联的每一个channel都要执行drain_events。

Channel的drain_events代码:

def drain_events(self, timeout=None, callback=None):
    callback = callback or self.connection._deliver
    if self._consumers and self.qos.can_consume():
        if hasattr(self, '_get_many'):
            return self._get_many(self._active_queues, timeout=timeout)
        return self._poll(self.cycle, callback, timeout=timeout)
    raise Empty()
    
def can_consume(self):
    """Return true if the channel can be consumed from.

    Used to ensure the client adhers to currently active
    prefetch limits.
    """
    pcount = self.prefetch_count    # self.prefetch_count可以看成是并发消费消息的限制,如果为0,代表未限制
    return not pcount or len(self._delivered) - len(self._dirty) < pcount    #self._delivered,存储了消息标签和消息的映射,在消息被消费时放入,self._dirty 消息消费结束后调用ack后添加,代表已经消费完的消息
    
    
_poll代码:

def _poll(self, cycle, callback, timeout=None):
    """Poll a list of queues for available messages."""
    return cycle.get(callback)
又回到了FairCycle,Channel的FairCycle实例:

def _reset_cycle(self):
    self._cycle = FairCycle(
        self._get_and_deliver, self._active_queues, Empty)
_get_and_deliver方法从队列中取出消息,然后调用Transport传递过来的_deliver方法:

def _get_and_deliver(self, queue, callback):
    message = self._get(queue)
    callback(message, queue)
_deliver代码:

def _deliver(self, message, queue):
    if not queue:
        raise KeyError(
            'Received message without destination queue: {0}'.format(
                message))
    try:
        callback = self._callbacks[queue]
    except KeyError:
        logger.warning(W_NO_CONSUMERS, queue)
        self._reject_inbound_message(message)
    else:
        callback(message)
做的事情是根据队列取出注册到此队列的回调函数列表,然后对消息执行列表中的所有回调函数。

回顾
可见,Kombu中Channel和Transport非常重要,Channel记录了队列列表、消费者列表以及两者的映射关系,而Transport记录了队列与回调函数的映射关系。Kombu对所有需要监听的队列_active_queues都查询一遍,直到查询完毕或者遇到一个可以使用的Queue,然后就获取消息,回调此队列对应的callback。Celery是Python中最流行的异步消息队列框架,支持RabbitMQ、Redis、ZoopKeeper等作为Broker,而对这些消息队列的抽象,都是通过Kombu实现的。Kombu实现了对AMQP transport和non-AMQP transports(Redis、Amazon SQS、ZoopKeeper等)的兼容。

 

原文地址:http://www.cnblogs.com/kevin-zsq/p/16845368.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性