消息队列,AMQP 与 RabbitMQ(五) – 主题(Topics)

主题(Topic)

前面我们已经改进了我们的日志系统,我们使用了一个直接的交换器代替了一个机械笨拙的扇出交换器,然后让我们在接收日志的时候可以有一定的选择性了。

尽管使用 direct 选择器改进了我们的系统,但还是有一些限制:它不能基于多个标准来路由。

在我们的日志系统,我们可能不单单想根据严重性来订阅,还想根据日志的发送方来进行筛选。你可能已经从 syslog 这个 unix 工具里面知道这个概念,基于严重性(info/warn/crit…)和发送的设备(auth/cron/kern…)进行路由。

这可以让我们有更多的自由度,我们可能希望仅仅侦听来自 cron 的严重错误,以及来自 kern 的所有日志。

为了在我们的日志系统里面实现这些,我们需要学习更为复杂的 topic 交换器。

主题交换器(Topic exchange)

发送给 topic 交换器的消息的 routing_key 不可以随便指定,而必须是用 . 符号分隔的一组单词,这些单词可以随便指定,但是通常都会指定与消息相关的一些特性。合法的格式大概会形如:stock.usd.nysequick.orange.rabbit。在 255 字节的限制下你可以放任意多的单词。

绑定的 key 也必须遵循这种形式。topic 交换器背后的逻辑和 direct 相似,一个消息会路由给有绑定和消息匹配的那些队列,但是,有两个特殊的绑定关键字:

  • * 可以代替一整个单词
  • # 可以代替零个或者多个单词

下图可以很好地解释这个逻辑:

python-five

在这个例子里面,我们会发送描述动物的消息。消息会发送一个包含三个单词(两个点)的 routing_key,第一个单词会描述敏捷性,第二个描述颜色,第三个描述物种。

我们创建了三个绑定:Q1 绑定了 *.orange.*Q2 绑定 *.*.rabbit 以及 lazy.#

这些绑定可以这样来总结:

  • Q1 关心所有橙色的动物;
  • Q2 关心兔子和所有懒的动物;

一条通过 quick.orange.rabbit 的消息会被同时路由到两个队列中,但是 quick.orange.fox 就只会发送给第二个队列,并且只发送一次。lazy.pink.rabbit 会发送给第二个队列,只发送一次,尽管它匹配了两个绑定。quick.brown.fox 并不匹配任何绑定,所以它会被丢弃。

再来看看 lazy.orange.male,rabbit,尽管它有四个单词,它也会匹配最后一个绑定,并且发送给第二个队列。

Topic 交换器功能强大,而且可以像其他交换器一样使用。 当一个队列用 # 绑定,他就会发射所有消息,这就跟 fanout 一样; 如果在绑定上不使用通配符,则实际的用法就和 direct 一样;

总结

现在我们要在日志系统里面使用 topic 交换器,我们会假设我们消息的 routing key 会有两个单词:<facility>.<severity>

代码和上次的几乎一样:

首先是 emit_log_topic.py

#!/usr/bin/python3.5

import pika
import sys 

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'

message = ' '.join(sys.argv[2:]) or 'Hello world!'

channel.basic_publish(exchange='topic_logs',
    routing_key=routing_key,
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2, # 消息持久化
    ))  
print(" [x] Sent %r: %r" % (routing_key, message))
connection.close()

然后是 receive_logs_topic.py

#!/usr/bin/python3.5

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
        queue=queue_name,
        routing_key=binding_key)

print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(' [x] %r: %r' % (method.routing_key, body))

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

要收到所有的日志:

./receive_logs_topic.py "#"

收到所有来自 kern 的所有消息:

./receive_logs_topic.py "kern.*"

或者只要收到严重性为 critical 的日志:

./receive_logs_topic "*.critical"

你可以创建多个绑定:

./receive_logs_topic.py "kern.*" "*.critical"

然后发送一个用 kern.critical 路由的消息:

./emit_log_topic.py "kern.critical" "A critical kernel error"

好好摆弄一下这些程序吧。注意这些代码并没有对路由或者绑定的 key 做任何的假设,你可以用来试试两个或多个参数的情况:

出几道题:

  • * 绑定会否捕捉一个空的 routing key?(实测不能)
  • #.* 是否会捕捉 .. 作为 key?(实测可以)能否捕捉到单个单词的 key?(实测可以)
  • a.*.#a.# 有何不同?(前者不能捕捉 a,后者可以)

【转载请附】愿以此功德,回向 >>

原文链接:https://www.huangwenchao.com.cn/2015/11/amqp-rabbitmq-5.html【消息队列,AMQP 与 RabbitMQ(五) – 主题(Topics)】

发表评论

电子邮件地址不会被公开。 必填项已用*标注