消息队列,AMQP 与 RabbitMQ(三) – 发布/订阅(Publish/Subscribe)

3. 发布/订阅(Publish/Subscribe)

前面我们创建了一个工作队列,这里我们假设每个任务只会发送给一个 worker,这次我们来干点完全不一样的,我们会将一条消息发给多个消费者,这个模式被称作“发布/订阅”。

为了说明这个模式,我们要创建一个简单的日志系统,它包括了两个程序:第一个会发射日志消息,而第二个会接收并且打印它们。

在我们的日志系统每个运行的接收副本(worker)都会(分别)接收到消息。这样我们就可以用一个接收器来将日志写到硬盘上,然后用另一个接收器将日志打印在屏幕。

本质上说,发布(publish)日志就是要将它广播给接收器们。

交换器(Exchanges)

现在我们来介绍一下完整的消息模型:

  • producer 是发送消息的应用
  • queue 是一个存储消息的缓冲器
  • consumer 是一个接收消息的应用

在 Rabbit 的关键思想中,一个 producer 是不应该直接将消息发给队列的。

交换器 Exchange 是一个非常简单的东西,一方面它接受 producer 的消息,另一方面它将消息压到队列里面去。交换器必须清晰知道应该要对接收到的消息做些什么:将它交给一个特定的队列?是否要发送给多个队列?或者将其忽略掉?这些规则是由交换器的类型 Exchange Type 所决定的。

exchanges

有几种可用的交换器类型:directtopicheadersfanout 四种。我们先看最后一种 fanout,我们先来创建一个这种类型的 exchange,叫做 log:

channel.exchange_declare(exchange='logs', type='fanout')

这个交换器很简单,你可以从名字上面猜到,它仅仅将所有收到的消息广播到它所知道的所有队列中,我们现在正需要一个这样工作的交换器。

查看交换器
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs      fanout
amq.direct      direct
amq.topic       topic
amq.fanout      fanout
amq.headers     headers
...done.

在这里有一些 amq.* 交换器,它们是默认被创建的,但是目前不见得用得上。

匿名交换器

前面的部分我们对交换器知之甚少,但是我们依然可以将消息发送给队列。我们之所以能够这样做,是因为我们在使用一个默认的交换器,通过指定一个空字符串:

channel.basic_publish(exchange='',
    routing_key='hello',
    body=message)

这个 exchange 参数就是交换器的名称,空的字符串制定了使用默认交换器或者说匿名(nameless)交换器:消息会被路由至 routing_key 所指定的队列,如果这个队列存在。

现在,我们就可以用同样的方式来命名我们的交换器:

channel.basic_publish(exchange='logs',
    routing_key='',
    body=message)

临时队列(Temporary queues)

考虑到我们之前给了队列一个名字 (hello),队列的名字很关键,因为我们需要让 workers 用来区分不同的队列。为了能让生产者和消费者共享一个队列,给一个名字很重要。

但是我们现在的 logger 并不需要如此,我们希望侦听所有的 log 消息,而不仅仅是一部分,我们也对刚刚产生的消息感兴趣,而不是旧的。为了解决这些我们需要两样东西:

首先, 我们连接到 Rabbit 的时候,需要一个新的清空的队列,我们会创建一个随机名称的队列(或者干脆让服务器自动给我们随机起一个名),我们可以通过在定义队列的时候缺省名称来做到这一点:

result = channel.queue_declare()

在这个时候 result.method.queue 包含了一个随机的队列名次。大概会长这个样:amq.gen-JzTY20BRgK0-HjmUJj0wLg

第二,当我们断开消费者的连接的时候,我们这个队列应当被删除。这里有一个 exclusive 标记让我们来做这个:

result = channel.queue_declare(exclusive=True)

绑定(Binding)

我们已经创建了一个扇出交换器和一个队列,现在我们需要告诉交换器把消息发送给我们的队列。这种存在于交换器和队列之间的关系我们叫做绑定:

bindings

channel.queue_bind(exchange='logs',
    queue=result.method.queue)

现在开始 log 交换器就会把消息加入我们的队列。

我们可以通过 rabbitmqctl list_bindings 来列出所有存在的绑定。


总结

我们现在用来发送 log 消息的生产者程序和我们之前看到没有多大不一样。最大的改变在于我们现在要把消息发送给一个交换器而不是留空。我们发送时需要提供一个 routing_key,但是当交换器的类型是扇出(fanout)的时候它的值会被忽略。这里就是发送的脚本 emit_log.py

#!/usr/bin/python3.5

import pika
import sys 

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', type='fanout')

message = ''.join(sys.argv[1:]) or 'info: Hello World!'
channel.basic_publish(exchange='logs',
    routing_key='',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2, # 消息持久化
    ))  
print(" [x] Sent %r" % message)
connection.close()

你可以看到,在我们创建连接之后我们定义了一个交换器。这一步是必要的因为发布消息到不存在的交换器是不允许的。

如果我们还没有将队列绑定到这个交换器,所有发过来的消息都会丢弃,但这正是我们希望的:如果没有消费者在侦听,我们可以很放心地舍弃这些信息。

然后是我们的接收端 receive_logs.py

#!/usr/bin/python3.5

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(' [x] %r' % body)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

这就完成了,如果你想将消息保存到一个文件里面,只需要灵开一个终端然后执行:

./receive_logs.py > logs_from_rabbit.log

如果想将所有的消息显示在屏幕上,打开一个新的终端然后运行:

./receive_logs.py

当然,发送消息这样来做:

./emit_log.py Your messages

使用 rabbitmqctl list_bindings 你可以确认这些代码的确创建了我们想要的绑定和队列。


【转载请附】愿以此功德,回向 >>

原文链接:https://www.huangwenchao.com.cn/2015/11/amqp-rabbitmq-3.html【消息队列,AMQP 与 RabbitMQ(三) – 发布/订阅(Publish/Subscribe)】

发表评论

电子邮件地址不会被公开。 必填项已用*标注