消息队列,AMQP 与 RabbitMQ(四) – 路由(Routing)

4. 路由(Routing)

在前面的教程里面我们构建了一个简单的日志系统。我们现在可以讲日志消息广播给多个接收者。

在这个教程里面我们要在这个基础上加一些特性,我们要令仅仅订阅一个消息的子集成为可能。例如,我们要把严重错误的消息写到日志文件里面(节省空间),但是仍然要把所有的消息显示在控制台中。

绑定

前面我们已经创建过帮顶了,我们会这样来写:

channel.queue_bind(exchange=exchange_name, queue=queue_name)

绑定是存在于交换器和队列之间的关系。换而言之:队列对交换器的消息感兴趣。

在我们的绑定定义里面,我们可以再给一个 routing_key 的参数进去。为了避免和 basic_publish 方法里面的参数搅浑,我们后面将其称作绑定关键字(binding key)下面是我们定义的方法:

channel.queue_bind(exchange=exchange_name,
    queue=queue_name,
    routing_key='black')

这个绑定 key 的含义会根据交换器类型而不一样,对于扇出交换器,就会忽略掉这个参数。

直接交换(Direct Exchange)

我们前面做的日志系统会把所有的消息都发给所有的消费者。我们希望扩展这个,使得消息能够按照服务性质进行过滤。例如我们可能希望只将严重错误的消息记录到硬盘中,以节省空间。

这次我们使用一个直接交换器(direct exchange)。它的路由算法很简单:一个指定 binding_key 的消息发送时,会指派到 binding_key 匹配的那些绑定的队列中去。

为了说明这个问题,看图:

direct-exchange

在这个设置里面,看一下直接交换器 X 有两个队列绑定在上面,第一个队列通过 orange 关键字绑定,第二个通过 black 和 green 这两个进行绑定。

在这样一个配置之下,一个用 orange 作为路由关键字的消息,会被发送到 Q1 队列,而 routing key 为 black 或者 green 的消息就会发送到 Q2 队列,其他的所有消息都会被丢弃。

多绑定(Multiple bindings)

direct-exchange-multiple

将一个交换器通过完全相同的绑定名称绑定不同的队列是完全合法的。在前面的例子基础上,我们还可以在 X 和 Q1 之间添加一个 black 绑定,这样一来,我们发送的 routing key 为 black 的消息,行为就好像扇出交换器一样,他们会被同时分发给 Q1 和 Q2。

发射日志(Emitting logs)

我们会把这个模型用在我们的日志系统中。这次我们不用扇出交换器,转而使用一个直接交换器。我们会提供日志的严重性作为 routing key。这样的话接收脚本就可以选择它所关心的严重性来接收,让我们先看一下如何发射日志。

和往常一样,我们先创建一个交换器:

channel.exchange_declare(exchange='direct_logs', type='direct')

然后我们已经可以发送一条消息了:

channel.basic_publish(exchange='direct_logs',
    routing_key=severity,
    body=message)

为了简化这些事情,我们限定 severity 会在 info / warning / error 中任取一个。

订阅(Subscribing)

接收消息会和上一节的教程一样,唯一的不同是我们会根据我们所选择的严重性进行绑定:

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
        queue=queue_name,
        routing_key=severity)

总结

python-four

这是 emit_log_direct.py:

#!/usr/bin/python3.5

import pika
import sys 

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'

message = ' '.join(sys.argv[2:]) or 'Hello world!'

channel.basic_publish(exchange='direct_logs',
    routing_key=severity,
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2, # 消息持久化
    ))  
print(" [x] Sent %r: %r" % (severity, message))
connection.close()

这是 receive_logs_direct.py:

#!/usr/bin/python3.5

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
        queue=queue_name,
        routing_key=severity)

print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(' [x] %r: %r' % (method.routing_key, body))

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

现在就可以根据需要来开启队列,如果我们想将 warning 和 error 的消息存放到一个文件:

./receive_logs_direct.py warning error > logs_from_rabbit.log

如果我们想打印所有消息到屏幕:

./receive_logs_direct.py

发射消息的方法,以发射一个错误日志为例:

./emit_log_direct.py error "Run. Run. Or it will explode."

【转载请附】愿以此功德,回向 >>

原文链接:https://www.huangwenchao.com.cn/2015/11/amqp-rabbitmq-4.html【消息队列,AMQP 与 RabbitMQ(四) – 路由(Routing)】

发表评论

电子邮件地址不会被公开。 必填项已用*标注