JSON-RPC是一个无状态且轻量级的远程过程调用(RPC)协议。JSON-RPC应用很广泛,比如以太坊的API。JSON-RPC的python实现较多,我选择了Exploding Labs 提供的python版本。主要是其它库都比较古老,而e-labs的实现采用最新版本python,支持类型系统,还有一些函数式编程的范式,代码也很简洁,值得学习。
e-labs的JSON-RPC分成客户端和服务端两个库,分别是jsonrpcclient和jsonrpcserver,
代码版本如下表:
名称 |
版本 |
jsonrpcclient |
4.0.2 |
jsonrpcserver |
5.0.9 |
准备好代码后,我们可以开始json-rpc的源码阅读,本文包括下面几个部分:
- JSON-RPC规范
- jsonrpcclient的实现
- jsonrpcserver的实现
- 小结
- 小技巧
JSON-RPC规范
JSON-RPC规范,我这里借用jsonrpcserver中的验证规则文件简单介绍一下,文件如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
# request-schema.json
{
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "A JSON RPC 2.0 request",
"oneOf": [
{
"description": "An individual request",
"$ref": "#/definitions/request"
},
{
"description": "An array of requests",
"type": "array",
"items": { "$ref": "#/definitions/request" },
"minItems": 1
}
],
"definitions": {
"request": {
"type": "object",
"required": [ "jsonrpc", "method" ],
"properties": {
"jsonrpc": { "enum": [ "2.0" ] },
"method": {
"type": "string"
},
"id": {
"type": [ "string", "number", "null" ],
"note": [
"While allowed, null should be avoided: http://www.jsonrpc.org/specification#id1",
"While allowed, a number with a fractional part should be avoided: http://www.jsonrpc.org/specification#id2"
]
},
"params": {
"type": [ "array", "object" ]
}
},
"additionalProperties": false
}
}
}
|
文件描述了JSON-RPC的规则,如下:
- json-rpc请求可以是单个的request对象,也是是批量的request对象数组
- 每个request对象需要符合:
- 必填字段jsonrpc,值枚举类型。目前2.0,其实就是版本号。(之前有1.0版本)
- 必填字段method, 字符串类型。远程函数的名称。
- id字段,支持字符串,数字或者空。为空表示通知无需回应(result)。id确保响应可以一一对应到请求上。
- params字段,支持数组或者字典。
JSON-RPC响应部分的规则是:
- jsonrpc字段,值为2.0
- result字段,值为调用结果
- error字段,值为异常信息,包括code,message和data三个字段,规范定义了详细的错误清单。
- id同请求的id
- result和error二选一
强烈建议大家阅读参考链接中的规范原文,介绍的非常清晰,中文翻译也很到位,有助于对JSON-RPC规范完全理解。
jsonrpcclient的实现
模块文件 |
功能描述 |
id_generators.py |
id生成器 |
requests.py |
请求信息封装 |
response.py |
响应信息封装 |
sentinels.py |
定义NOID,用于通知类请求 |
utils.py |
一些工具函数 |
examples |
一些示例 |
从示例可以知道JSON-RPC,可以使用不同的底层协议比如http,websocket和tcp(zeromq实现)等。我们看最简单的基于http实现的实例:
1
2
3
4
5
6
7
8
9
10
11
|
from jsonrpcclient import request, parse, Ok
import logging
import requests
response = requests.post("http://localhost:5000/", json=request("ping"))
parsed = parse(response.json())
if isinstance(parsed, Ok):
print(parsed.result)
else:
logging.error(parsed.message)
|
这段api展示了:
- jsonrpcclient只是封装请求request和响应Ok,数据请求的发送由不同协议提供,这里使用requests,另外还有aiohttp的实现等。
- resquest函数封装请求,parse解析响应
- 正常的结果展示result信息,错误的结果展示message信息
request代码很简单, 封装请求成符合JSON-RPC规范的字符串:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
# requests.py
def request_pure(
id_generator: Iterator[Any],
method: str,
params: Union[Dict[str, Any], Tuple[Any, ...]],
id: Any,
) -> Dict[str, Any]:
return {
"jsonrpc": "2.0",
"method": method,
**(
{"params": list(params) if isinstance(params, tuple) else params}
if params
else {}
),
"id": id if id is not NOID else next(id_generator),
}
def request_impure(
id_generator: Iterator[Any],
method: str,
params: Union[Dict[str, Any], Tuple[Any, ...], None] = None,
id: Any = NOID,
) -> Dict[str, Any]:
return request_pure(
id_generator or id_generators.decimal(), method, params or (), id
)
request_natural = partial(request_impure, id_generators.decimal())
...
request = request_natural
|
所以示例中的请求,可以等价下面的curl命令:
1
|
$ curl -X POST http://localhost:5001 -d '{"jsonrpc": "2.0", "method": "ping", "params": {}, "id": 1}'
|
response处理也很简单:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
# response.py
class Ok(NamedTuple):
result: Any
id: Any
def __repr__(self) -> str:
return f"Ok(result={self.result!r}, id={self.id!r})"
class Error(NamedTuple):
code: int
message: str
data: Any
id: Any
def __repr__(self) -> str:
return f"Error(code={self.code!r}, message={self.message!r}, data={self.data!r}, id={self.id!r})"
Response = Union[Ok, Error]
|
定义Response类型,是Ok或者Error。Ok和Error是两个可命名元祖。
parse就是将结果json字典解析成对应的Response:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
def to_result(response: Dict[str, Any]) -> Response:
return (
Ok(response["result"], response["id"])
if "result" in response
else Error(
response["error"]["code"],
response["error"]["message"],
response["error"].get("data"),
response["id"],
)
)
def parse(response: Deserialized) -> Union[Response, Iterable[Response]]:
return (
map(to_result, response) if isinstance(response, list) else to_result(response)
)
|
也可以直接使用parse_json函数,从json字符串生成结果:
1
|
parse_json = compose(parse, json.loads)
|
这里的map,componse等都是函数式编程。在server中函数式编程使用的更多,可见作者非常喜欢函数式编程的思想
jsonrpcserver的实现
jsonrpcclient实现非常简单,jsonrpcserver的实现会略微复杂点,但是还是可以很好的理解的,我们一起继续。jsonrpcserver的主要模块如下:
模块 |
描述 |
main.py/async_main.py |
main文件,分别是同步和异步版本 |
dispatcher.py/async_dispatcher.py |
rpc服务的分配器实现 |
methods.py |
rpc函数的装饰器 |
request.py |
请求处理 |
response.py |
响应处理 |
result.py |
结果处理 |
examplse |
一些示例 |
通用,我们先从示例入手,看看api的使用。下面是flask版本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
# flask_server.py
from flask import Flask, Response, request
from jsonrpcserver import method, Result, Success, dispatch
app = Flask(__name__)
@method
def ping() -> Result:
return Success("pong")
@app.route("/", methods=["POST"])
def index():
return Response(
dispatch(request.get_data().decode()), content_type="application/json"
)
if __name__ == "__main__":
app.run()
|
从示例我们可以知道,rpc服务其实就2大步骤:
- 使用method装饰ping函数,使它支持rpc调用,ping函数返回的是一个特点的Result数据结构
- 所有rpc调用的http-url都是根目录,服务使用dispatch调度rpc请求
先看第一步rpc装饰器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
# methods.py
Method = Callable[..., Result]
Methods = Dict[str, Method]
global_methods = dict()
def method(
f: Optional[Method] = None, name: Optional[str] = None
) -> Callable[..., Any]:
"""A decorator to add a function into jsonrpcserver's internal global_methods dict.
The global_methods dict will be used by default unless a methods argument is passed
to `dispatch`.
Functions can be renamed by passing a name argument:
@method(name=bar)
def foo():
...
"""
def decorator(func: Method) -> Method:
nonlocal name
global_methods[name or func.__name__] = func
return func
return decorator(f) if callable(f) else cast(Method, decorator)
|
- 将所有的rpc函数都封装到global_methods字典中
- 函数需要返回Result类型
第2步中,main模块提供了dispatch的api,主要就是下面的函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
# main.py
def dispatch_to_response(
request: str,
methods: Optional[Methods] = None,
*,
context: Any = NOCONTEXT,
deserializer: Callable[[str], Deserialized] = json.loads,
validator: Callable[[Deserialized], Deserialized] = default_validator,
post_process: Callable[[Response], Any] = identity,
) -> Union[Response, List[Response], None]:
"""Takes a JSON-RPC request string and dispatches it to method(s), giving Response
namedtuple(s) or None.
This is a public wrapper around dispatch_to_response_pure, adding globals and
default values to be nicer for end users.
Args:
request: The JSON-RPC request string.
methods: Dictionary of methods that can be called - mapping of function names to
functions. If not passed, uses the internal global_methods dict which is
populated with the @method decorator.
context: If given, will be passed as the first argument to methods.
deserializer: Function that deserializes the request string.
validator: Function that validates the JSON-RPC request. The function should
raise an exception if the request is invalid. To disable validation, pass
lambda _: None.
post_process: Function that will be applied to Responses.
Returns:
A Response, list of Responses or None.
Examples:
>>> dispatch('{"jsonrpc": "2.0", "method": "ping", "id": 1}')
'{"jsonrpc": "2.0", "result": "pong", "id": 1}'
"""
return dispatch_to_response_pure(
deserializer=deserializer,
validator=validator,
post_process=post_process,
context=context,
methods=global_methods if methods is None else methods,
request=request,
)
|
- request 请求的函数名称
- methods 可供调用的函数集合,默认就是之前rpc装饰器中存储的global_methods
- deserializer 请求的反序列化函数,validator请求验证器
- post_process响应处理函数
post_process主要就是根据结果类型,分别取不同的字段并序列化:
1
2
3
4
5
6
|
def to_serializable_one(response: ResponseType) -> Union[Deserialized, None]:
return (
serialize_error(response._error)
if isinstance(response, Left)
else serialize_success(response._value)
)
|
dispatch的实现,主要是下面2个函数dispatch_request和call,前者查找rpc函数,后者执行rpc函数。dispatch_request内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
def dispatch_request(
methods: Methods, context: Any, request: Request
) -> Tuple[Request, Result]:
"""Get the method, validates the arguments and calls the method.
Returns: A tuple containing the Result of the method, along with the original
Request. We need the ids from the original request to remove notifications
before responding, and create a Response.
"""
return (
request,
get_method(methods, request.method)
.bind(partial(validate_args, request, context))
.bind(partial(call, request, context)),
)
|
这里使用了oslash这个函数式编程库,我们可以简单的使用unix的管道思想去理解:
- 使用get_method查找rpc响应函数
- 使用validate_args验证rpc请求
- 使用call执行rpc调用
- 3个步骤依次执行,前者的返回值会作为后缀的参数
重中之重是call函数,原理非常简单:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
def call(request: Request, context: Any, method: Method) -> Result:
"""Call the method.
Handles any exceptions raised in the method, being sure to return an Error response.
Returns: A Result.
"""
try:
result = method(*extract_args(request, context), **extract_kwargs(request))
# validate_result raises AssertionError if the return value is not a valid
# Result, which should respond with Internal Error because its a problem in the
# method.
validate_result(result)
# Raising JsonRpcError inside the method is an alternative way of returning an error
# response.
except JsonRpcError as exc:
return Left(ErrorResult(code=exc.code, message=exc.message, data=exc.data))
# Any other uncaught exception inside method - internal error.
except Exception as exc:
logger.exception(exc)
return Left(InternalErrorResult(str(exc)))
return result
|
- 使用args和kwargs动态执行rpc函数,并将结果进行返回
- 捕获异常,返回标准错误
这里的Left是函数式编程中的概念,我们可以从response的实现,简单了解一下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# response.py
class SuccessResult(NamedTuple):
result: Any = None
class ErrorResult(NamedTuple):
code: int
message: str
data: Any = NODATA # The spec says this value may be omitted
# Union of the two valid result types
Result = Either[ErrorResult, SuccessResult]
def Success(*args: Any, **kwargs: Any) -> Either[ErrorResult, SuccessResult]:
return Right(SuccessResult(*args, **kwargs))
def Error(*args: Any, **kwargs: Any) -> Either[ErrorResult, SuccessResult]:
return Left(ErrorResult(*args, **kwargs))
|
SuccessResult和ErrorResult是python的两个标准对象;Result是oslash中定义的联合对象,在ErrorResult, SuccessResult中二选一,有些类似rust中的Option;Right封装了正确的结果,Left封装了错误的结果。
这一部分需要一些函数式编程的基础,如果不太理解,推荐阅读参考链接。
小结
我们一起学习了JSON-RPC规范,并且了解了Exploding Labs如何使用 现代python 实现该规范,也接触了一些函数式编程的方式。
小技巧
业务有时候需要自己实现一个简单的自增id,我们也许会用全局变量来做:
1
2
3
4
5
6
7
|
start = 0
def gen1():
start +=1
return count
# 调用
id = gen1()
|
全局变量会形成一些污染,利用闭包的特性,我们可以优化成这样:
1
2
3
4
5
6
7
8
9
10
|
def gen2():
start = 0
def incr():
start +=1
return count
return incr
gen = gen2()
# 调用
id = gen()
|
json-rpc里提供了使用yeild关键字实现的版本:
1
2
3
4
5
6
7
8
9
10
11
12
|
def hexadecimal(start: int = 1) -> Iterator[str]:
"""
Incremental hexadecimal numbers.
e.g. 1, 2, 3, .. 9, a, b, etc.
Args:
start: The first value to start with.
"""
while True:
yield "%x" % start
start += 1
|
次密接触隔离前的发文,有些仓促,还望读者朋友们见谅
参考链接