消息队列,AMQP 与 RabbitMQ(六) – 远程过程调用(Remote procedure call – PRC)

6. 远程过程调用

在第二个教程里面我们学习了如何使用工作队列来向多 worker 分布耗时的任务。

但是如果我们需要在远程计算机调用一个函数并且等待结果的时候呢?好吧,这是一个不同的故事。

这个模式被广泛定义为:RPC – Remote Procedure Call – 远程过程调用

在这个教程里面,我们将要使用 RabbitMQ 来构建一个 RPC 系统:一个客户端和一个可伸缩的 RPC 服务器。由于手上没有什么真正耗时的任务值得用来做分布式,我们将要创建一个很笨拙的 RPC 服务,用于返回 Fibinacci 数。

客户端接口

为了说明 RPC 服务怎么用法,我们要创建一个简单的客户端类,它会发出一个名叫 call 的方法来发出远程调用请求,然后阻塞直到收到结果。

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)
关于 RPC 的提示

RPC 是一个很普通但是备受争议的计算模式。当编程者不了解一个方法是本地的还是一个很慢的 RPC 的时候,就会出问题。由此引起的混乱会导致一个无法预料的系统,并且加入了很多不必要的调试复杂性。这样的话,错误使用 RPC 就很容易产生无法维护的意大利面条式代码,而不是简化了软件。

默默记住这些,然后考虑下面的一些建议:

  • 明确地表示哪些函数是本地的,哪些是远程的;
  • 为系统编写文档,充分明确组件之间的依赖;
  • 处理错误的情况,如果 RPC 长时间卡死应该怎么办;

如果你没有搞清楚,就避免使用 RPC。如果你可以,你应该使用一个异步的管道,而不是 RPC 式的挂起,结果应该异步推送到下一个计算环节。

回调队列(Callback queue)

一般通过 RabbitMQ 来做 RPC 是很简单的。客户端发送一个请求消息然后服务端回复一个响应消息。为了让客户端收到响应,我们需要在客户端请求的时候发送一个 callback 队列地址,我们来试试:

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
    routing_key='rpc_queue',
    properties=pika.BasicProperties(
        reply_to=callback_queue,
        ),
    body=request)

# 下面写一些代码来从 callback_queue 读取响应
消息的属性

AMQP 协议预定义了一个 14个属性集合可以随消息一起发送。大部分都很少使用,除了下面这些:

  • delivery_mode: 表示了一个消息是持续的(用 2 表示)还是短暂的(任意其他值)。你可能在 第二个教程 里面记得有这个属性;
  • content_type: 描述编码的 mime 类型,例如经常使用的 JSON 我们会设置这个属性为 application/json
  • reply_to: 用来命名一个回调队列
  • correlation_id: 用于使远程调用的请求和响应相关联

关联编号(Corrlation id)

在上面表示的方法,我们建议每个 RPC 请求创建一个回调队列。这是相当低效的,但是我们有一个更好的方法,就是我们对于每个 Client 客户端只建立一个单独的回调队列。

那么问题来了,从回调队列里面接收消息,是不能充分清晰地得出这个响应是哪个请求发出的。这就是 correlation_id 的用武之地了。我们后面将要为每个请求发送一个唯一的值,然后根据这个,我们就可以匹配一个响应和一个请求。如果我们看到一个未知的 correlation_id,我们就可以放心地将其删掉,因为它不属于我们的请求。

你可能会问,为什么我们要忽略回调队列里面的未知消息,而不是抛出一个错误呢?这是因为这很可能会导致一个服务器端的条件竞争。尽管概率不大,但是有可能 RPC 服务器会在发送响应之后死掉了,但是却没有来得及发送一个 acknowledgement 回执给 request,这样的话,重启的 RPC 服务就回重新处理这个请求,这就是为何在客户端我们必须妥善处理重复的响应,而且 RPC 必须实现理想的幂等性(调用一次和调用多次应当导致同样的结果)。

摘要

python-six

我们的 RPC 会看起来像这样:

  • 当客户端开启,它创建一个匿名回调队列
  • 对于一个 PRC 请求,客户端发送一个带有两个属性的消息:reply_to 为回调队列, correlation_id 则为每次请求生成一个唯一值
  • 请求会发送给一个 rpc_queue 队列
  • RPC worker(即 server)正在等待队列上面的请求,当一个请求出现,它执行工作然后将结果发送一个消息到回调队列(reply_to)返回给客户端
  • 客户端等待回调队列里面的数据,档一个数据出现,它查看 correlation_id 属性,如果它匹配到请求的值,他就把结果响应给应用

总结

这是 rpc_server.py

#!/usr/bin/python3.5
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0: return 0
    elif n == 1: return 1
    return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)
    print('[.] fib(%s)' % n)
    response = fib(n)
    ch.basic_publish(exchange='',
        routing_key=props.reply_to,
        properties=pika.BasicProperties(
            correlation_id=props.correlation_id),
        body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print('[x] Awaiting RPC requests')
channel.start_consuming()
  • 我们先如常创建一个链接以及声明一个队列
  • 然后我们定义了 fibinacci 函数,注意只接受整数输入(注意输入不要太大,超过 40 估计就跑不出来了,因为递归算法超慢)
  • 我们定义了一个回调 basic_consume,这是 PRC 服务的核心,它会在 request 到达的时候执行,它工作之后发送响应;
  • 我们可能想会更多的服务器进程,为了更好的分配负载我们需要设置 prefetch_count 设置

这是 rpc_client.py

#!/usr/bin/python3.5
import pika
import uuid

class FibonacciRpcClient(object):

    def __init__(self):

        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
            queue=self.callback_queue)

    def on_response(self, ch, method, props, body):

        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n): 
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
                ),
            body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

while True:

    n = int(input())

    if n <= 0: break

    print('[x] Requesting fib(%d)' % n)

    response = fibonacci_rpc.call(n)

    print('[.] Got %r' % response)

我们可以这样启动服务:

$ ./rpc_server.py
[x] Awaiting RPC requests

然后我们开启客户端:

$ ./rpc_client.py
30
[x] Requesting fib(30)
[.] Got 832040
20
[x] Requesting fib(20)
[.] Got 6765

现在我们的设计不是 RPC 服务的唯一实现,但是它有一些重要的优势:

  • 如果 PRC 服务太慢,完全可以多开实现扩展
  • 在客户端, PRC 要求只发送和接收一个消息,queue_declare 不需要同步。于是 RPC 客户端只需要一个工作网络即可。

我们的代码还是相当简单,并没有试图去解决一些更复杂(但很重要)的问题,例如:

  • 如果没有服务端,客户端如何响应?
  • 客户端对 RPC 是否需要设置超时?
  • 如果服务端跑挂了抛出异常,是否应该送回给客户端?
  • 在处理之前是否需要处理不合法的消息(例如边界检查)?

如果你想试验一下,可以查找 rabbitmq-management 插件,用于查看队列非常有用。

后记

更多知识:RabbitMQ的几种应用场景


【转载请附】愿以此功德,回向 >>

原文链接:https://www.huangwenchao.com.cn/2015/11/amqp-rabbitmq-6.html【消息队列,AMQP 与 RabbitMQ(六) – 远程过程调用(Remote procedure call – PRC)】

发表评论

电子邮件地址不会被公开。 必填项已用*标注