消息队列,AMQP 与 RabbitMQ(二) – 工作队列(Work queues)

2. 工作队列(或任务队列)

工作队列的目标是避免立即处理一个资源密集的任务,然后必须等待其结束。换之以将这个任务加到计划执行的列表中。我们将一个任务封装成一个消息,然后将其塞到队列里面去。一个 worker 进程就会在后台运行,然后从这个队列里面弹出任务,并最终将其处理掉。如果有多个 worker 去做这个事情,这些 worker 就会分别将这些任务搞定。

python-two

这个概念在你希望通过一个短暂的 HTTP 请求来处理一个复杂的任务时非常有用(例如 ACM 提交到判题这样的任务就尤其适合)。

我们将前面的一些公共的定义代码放到 base.py 里面先:

# base.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

然后我们定义一个创建任务(插入消息)的脚本 new_task.py,用来替代上面的 send.py 的工作:

#!/usr/bin/python3.5
# new_task.py

from base import connection, channel
import sys

message = ''.join(sys.argv[1:]) or 'Hello World!'
channel.basic_publish(exchange='',
    routing_key='hello',
    body=message)
print(" [x] Sent %r" % message)
connection.close()

然后,相应地,如果我们调用这个脚本,那么参数系列会被串起来传到队列中:

./new_task.py "this is a message called:" hello

然后我们也来将 receive.py 改成 worker.py,我们让其处理一个消息时妆模作样地要延续数秒,秒数取决于消息内容中点字符 . 出现的次数。

#!/usr/bin/python3.5
# worker.py

from base import connection, channel
import time

def callback(ch, method, properties, body):
    print(' [x] Received %r' % body)
    time.sleep(body.decode().count('.'))
    print(' [x] Done')

print(' [*] Waiting for messages. To exit press CTRL+C')

channel.basic_consume(
    callback,
    queue='hello',
    no_ack=True)

channel.start_consuming()

有了这个之后,我们可以在终端同时开启两个或者多个 worker:

./worker.py

然后连续发送几个任务:

./new_task ..Hello

这样我们可以看到几个终端会每隔两秒(因为参数 body 是 ..Hello,里面包含了两个点符号)并行而且交替地获取信息,并且进行处理输出。

这种任务的方式就叫做轮询调度 – Round Robin

这种调度方式很机械地将任务依次指派给不同的 worker,就像派扑克一样。

消息确认

很容易会想到一个问题:如果一个 consumer (worker.py) 在执行一个任务到一半的时候跑死了,这个消息就会丢失。

但是我们不希望这个结果,我们希望一个 worker.py 在处理一个消息中途死掉了,这个消息可以再重新交给另一个 worker 进行处理。

于是,RabbitMQ 支持消息的确认,acknowlegement,类似于回执发送,一个 ack 会在 consumer 处理完消息之后发回给 RabbitMQ,这个时候 RabbitMQ 才会正式从队列删除这个消息。

反之,如果 worker 中途跑死,没有发回 ack,RabbitMQ 就知道这个消息没有完全处理,就会将其转发给另一个 comsumer,这样你就可以信任即使 worker 偶尔跑死,也不会发生消息的丢失。

RabbitMQ 确认 worker 中断是通过 connection 的中断确认的,而不是超时,所以即使时间很长,只要连接没断,就会继续等待。

默认情况下,acknowlegement 是开启的,但是我们前面的例子通过 no_ack=True 将其关闭了,我们现在将 worker.py 改写一下,让 ack 生效:

def callback(ch, method, properties, body):
    print(' [x] Received %r' % body)
    time.sleep(body.decode().count('.'))
    print(' [x] Done')
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback, queue='hello')

可以看到,是否开启 ack 取决于 worker,而不是 RabbitMQ,因此可以认为,如果关闭了 ack,就相当于一接收到信息没等处理完就 ack 回去,让 RabbitMQ 将这个消息删掉。

注意到这个 basic_ack 的调用很容易忘记写,这样的话,只要连接存续,rabbitmq 就不会真正地丢弃消息,并且 worker 连接断开的时候,所有处理过的消息都会被重新发送!这样被 hold 住的消息就会慢慢吃光你的内存!

通过下面的命令可以查看到被 hold 住的消息数量:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

消息持久化(Message Durability)

然而,当 rabbitmq-server 停止的时候,消息还是会丢失。

默认情况下 rabbitmq-server 会在退出的时候清掉消息和队列,为了在这种情况下保留消息和队列,我们需要将消息和队列都分别声明为持久化的(durable)。

channel.queue_declare(queue='hello', durable=True)

注意,如果我们以前已经声明过这个队列,再去声明不会起效果(也就是说先声明一个非持久化的队列,再声明一次同样名称的队列为持久化,后面的不会起效。)

我们重启一下 rabbitmq 即可让原来的队列消失:

service rabbitmq-server restart

同时,这段声明应当同时对生产端和消费端执行。

另外我们也许要让消息也持久化:

channel.basic_publish(exchange='',
                      routing_key="hello",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

上面的 delivery_mode = 2 即指定了这条消息为持久化的。

公平的指派

注意到如果两个 worker,通过 round-robin 的指派方式,RabbitMQ 会在消息入队的时候就指派好 worker,而不会照顾 worker 的压力。(注意,如果是这样,出队的顺序是完全不依赖于入队的顺序的!)

我们可以通过下面的方式来改变这个行为,让 worker 完成一个任务,返回 ack 之后再向其指派下一个任务,而不是一进队的时候就进行指派:

channel.basic_qos(prefetch_count=1)

最终版本

base.py

#!/usr/bin/python3.5

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello', durable=True)

new_task.py

#!/usr/bin/python3.5

from base import connection, channel
import pika
import sys 

message = ''.join(sys.argv[1:]) or 'Hello World!'
channel.basic_publish(exchange='',
    routing_key='hello',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2, # 消息持久化
    ))  
print(" [x] Sent %r" % message)
connection.close()

worker.py

#!/usr/bin/python3.5

from base import connection, channel
import time

def callback(ch, method, properties, body):
    print(' [x] Received %r' % body)
    time.sleep(body.decode().count('.'))
    print(' [x] Done')
    ch.basic_ack(delivery_tag=method.delivery_tag)

print(' [*] Waiting for messages. To exit press CTRL+C')

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='hello')
channel.start_consuming()

【转载请附】愿以此功德,回向 >>

原文链接:https://www.huangwenchao.com.cn/2015/11/amqp-rabbitmq-2.html【消息队列,AMQP 与 RabbitMQ(二) – 工作队列(Work queues)】

《消息队列,AMQP 与 RabbitMQ(二) – 工作队列(Work queues)》有1个想法

发表评论

电子邮件地址不会被公开。 必填项已用*标注