在前一篇博文《Dive Into Protocol Buffers Python API》中对protobuf的Python API的代码进行了分析。现在进入实践阶段,利用protobuf的service
API实现一套异步RPC机制。
严谨起见,从wikipedia上摘录下一般情况下一次RPC调用的过程:
- The client calls the client stub. The call is a local procedure call, with parameters pushed on to the stack in the normal way.
- The client stub packs the parameters into a message and makes a system call to send the message. Packing the parameters is called marshalling.
- The client’s local operating system sends the message from the client machine to the server machine.
- The local operating system on the server machine passes the incoming packets to the server stub.
- The server stub unpacks the parameters from the message. Unpacking the parameters is called unmarshalling.
- Finally, the server stub calls the server procedure. The reply traces the same steps in the reverse direction.
上面过程中的第1和第6步已经由protobuf的service
API为我们实现好了,我们只需要在proto文件中定义所需的具体调用接口即可。
对于第2和第5步的marshalling和unmarshalling步骤,service
API虽然没有为我们完全实现,但是protobuf为方法以及参数已经准备好了完善的serialization的机制,我们只需要自己决定如何用这些序列化的数据拼装数据包即可。
最后,第3和第4步的通信机制则是完全需要由我们自己来实现的,这也是protobuf设计的初衷,在最多变的部分(多种多样的网络结构、协议和通信机制)留出足够的空间让程序员可以针对特定场景自己实现,使得protobuf可以应用在更多的场景。
回到标题所说的Asynchronous RPC。一次函数调用通常包含了输入和输出两个过程。对于RPC来说,我们可以像大多数本地函数那样,在进行调用之后一直等待,直到计算结果返回才继续向下执行。但是由于网络传输的过程相对比较耗时,采取这样的策略无疑是非常低效的。因此我们采取另外一种策略:调用者发送调用请求之后不等待结果的返回就立即继续执行后续的操作,当收到RPC返回的计算结果之后再回来处理。这里将前一种策略称为Synchronous RPC,而后一种就是本文要实现的Asynchronous RPC。
实现的方式其实也很简单,就是把客户端发起的一次RPC调用拆分成两次来处理:首先由客户端发起RPC调用,之后无需等待继续向后执行;而服务端接收到RPC调用请求并处理完成之后,再向客户端发起另外一次RPC调用,将计算结果通过参数通知客户端。
关于RPC需要说明的东西大概就到这里,接下来我们首先解决第3和第4步的通信机制的实现。
实现通信层
我们选择使用asyncore和TCP协议实现RPC的通信层。关于asyncore的具体用法可以参考asyncore的文档。
首先将端到端的链接和传输抽象出来,一个端到端的通信可以用下面这样一个TcpConnection
来进行封装:
1 | class TcpConnection(asyncore.dispatcher): |
客户端负责主动向服务端发起连接请求,在请求成功后维护自己到服务端的一条连接。因此我们可以通过继承TcpConnection
并增加connect
行为得到通信的客户端:
1 | class TcpClient(TcpConnection): |
服务端则负责监听并接受客户端的连接请求,并为每一个客户维护一条连接:
1 | class TcpServer(asyncore.dispatcher): |
至此,我们就完成了一个简陋但有效的C/S模式的通信层。
实现Echo服务
有了通信层,我们就可以继续向下进行。既然是RPC,那么就不可能脱离具体的业务,因此这里以经典的Echo服务为例,利用protobuf实现RPC。
我们为Echo定义proto如下:
1 | package nightfade; |
如前所述,因为要实现的是Asynchronous RPC,所以RPC调用分为两部分:
客户端首先调用echo
,之后服务端接收到RPC请求并处理之后再调用respond
将结果通知客户端。
使用protoc编译proto文件以及对生成的文件的分析这里就不在赘述,可以参考《Dive Into Protocol Buffers Python API》。这里需要关注的问题有两个:
- 如何实现Service。
- 如何将实现好的Service与我们的通信层关联起来。
因为Echo服务本身非常简单,所以第一个问题可以轻易解决:
1 | class EchoService(IEchoService): |
接下来我们需要考虑的就是与通信层的关联问题。
要将protobuf的service
与通信层关联的关键在于RpcChannel
。
首先来看调用端这一边。
调用端通过stub对RPC过程的调用最终会转向对RpcChannel.CallMethod()
的调用,而这个方法也正是protobuf留给我们实现调用端进行marshalling和数据发送的地方。这样一来问题就很容易解决了,我们为RpcChannel实现CallMethod
方法:
- 无论是调用端还是被调用端,一个
method_descriptor
在其所在Service内的index是一致的。因此method_descriptor的部分只需要对其index进行marshalling即可。 - RPC调用的参数可以直接使用protobuf的
SerializeToString()
方法进行marshalling,进而在接收端通过ParseFromString()
方法unmarshalling。 - 数据包的Framing问题,则使用一个简单的方案:在数据包之前发送一个32位整数的HEAD用来告知接收端后续数据包的大小。
具体实现来看代码:
1 | class RpcChannel(service.RpcChannel): |
接下来实现被调用端。
protobuf的service
API在被调用端为我们完成的工作是,当使用合适的method_descriptor
和request
参数调用IEchoService.CallMethod()
时,会自动调用我们对相应方法接口的具体实现。因此在服务端需要做的工作主要由:
- 接受调用端发来的数据。
- 对接收到的数据包进行unmashalling,解析得到
method_descriptor
和request
参数。 - 调用
EchoService.CallMethod()
。
我们实现的TcpConnection
可以完成接受数据的工作,只是还没能与后续的步骤关联起来。既然marshalling的工作是由RpcChannel
来完成的,unmarshalling的功能我们也同样在RpcChannel
中实现,为其增加receive
方法。当TcpConnection
接受到数据之后,就交给RpcChannel.receive
进行处理。
1 | def receive(self, data): |
其中rpc_parser
负责将数据流unmarshalling成一系列的method_descriptor
和request
参数,具体实现就不再贴代码了。service_local
则是服务端提供的服务EchoService
。
至此,我们的整个RPC调用的的基本实现就已经完成了!限于篇幅,所以只贴了一些代码片段,完整的代码可以查看我的repository:https://github.com/nightfade/protobuf-RPC。
其他
在这个RPC的实现中,其实还欠缺了一个重要部分RpcController
。这个部分是干什么用的呢?依然引用wikipedia的一段说明:
An important difference between remote procedure calls and local calls is that remote calls can fail because of unpredictable network problems. Also, callers generally must deal with such failures without knowing whether the remote procedure was actually invoked. Idempotent procedures (those that have no additional effects if called more than once) are easily handled, but enough difficulties remain that code to call remote procedures is often confined to carefully written low-level subsystems.
简单来说,RPC过程总是可能由于网络问题等不可预测的原因出错的,我们需要有一种途径来捕获并处理RPC过程中所发生的错误。RpcController
就是为此而存在的,它定义了一些常用的错误处理的抽象接口,可以根据具体的场景进行实现。
鉴于RpcController
的定义非常简单明确,并且是和具体场景紧密关联的,这里就不在上面花费更多精力了。以后业务逻辑逐渐复杂的时候,再根据需要case by case的进行实现即可。