• redis协议规范
  • redis-py概述
  • redis-py基础使用
    • RedisCommand
    • Redis连接
    • 连接池
  • pipeline
  • LuaScript
  • lock

redis协议规范

RESP(Redis Serialization Protocol)是Redis客户端和服务端的通讯协议。数据示例如下:

1
2
3
4
5
6
+OK\r\n
-Error message\r\n
:1000\r\n
$6\r\nfoobar\r\n
*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
*3\r\n:1\r\n:2\r\n:3\r\n

协议定义了5种类型:

  1. +前缀表示字符串,后接字符串文本,以\r\n结尾,通常用于命令结果
  2. -前缀表示异常信息,后接以空格连接的两个字符串,以\r\n结尾
  3. :前缀表示整数,后接整数,以\r\n结尾
  4. $前缀表示定长的字符串,后接字符串长度,\r\n和字符串文本,以\r\n结尾
  5. *前缀表示数组,后接数组的长度和\r\n,数组的每个元素可以由上面4种类型构成

协议还约定了Null等的实现,详情请看参考链接部分。下面示例了 LLEN mylist 的请求和响应

1
2
3
C: *2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n

S: :48293\r\n
  • 客户端发送了 LLEN mylist指令,指令序列化成RESP长度为2的数组,2个定长字符串分别是llen和mylist。
  • 服务端响应整数48293,即mylist数据的长度。

Request-Response model是redis服务的请求响应模型,可以对比http协议的模式。redis服务端响应客户端的指令,处理后响应回复客户端,可以简单理解为一问一答。当然pipeline,pub/sub和monitor除外。

Redis-py 源码概述

本文使用的redis-py版本是3.5.3, 文件及包信息是:

名称 描述
client redis的api
connection 连接,连接池等
exceptions 异常和错误
lock 锁的实现
sentinel 扩展的哨兵连接
utils 工具
_compat 都版本适配包

redis-py未依赖其它的包,代码量虽然不多,6000行左右,但是100%理解还是需要一定的时间和基础。本文从redis-py日常使用出发,也是redis-py的README中内容,介绍这些基础功能在源码中的实现。

redis-py基础使用

RedisCommand

redis-py的简单使用:

1
2
3
4
5
6
>>> import redis
>>> r = redis.Redis(host='localhost', port=6379, db=0)
>>> r.set('foo', 'bar')
True
>>> r.get('foo')
b'bar'

追踪redis-py的实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# client.py

class Redis(object)
    
     def __init__(self, host='localhost', port=6379,
                 db=0, ..):
        ...
        connection_pool = ConnectionPool(**kwargs)
        self.connection_pool = connection_pool
        ...
                 
    def set(self, name, value, ex=None, px=None, nx=False, xx=False, keepttl=False)
        ...
        return self.execute_command('SET', *pieces)
    
    def get(self, name):
        return self.execute_command('GET', name)
    
     # COMMAND EXECUTION AND PROTOCOL PARSING
    def execute_command(self, *args, **options):
        "Execute a command and return a parsed response"
        conn = self.connection or pool.get_connection(command_name, **options)
        conn.send_command(*args)
        return self.parse_response(conn, command_name, **options)

注意:为了便于理解,示例代码和实际的代码有出入,省去了复杂的逻辑和异常等

  • redis首先创造了一个到redis服务的连接,
  • redis包装了redis的所有指令,使用命令模式执行指令。
  • 执行命令就是使用创建的连接发送指令,然后解析和获取响应。这和redis协议上的Request-Response model行为一致。

Redis连接

继续查看连接的创建和执行:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# connection.py
class Connection(object)
    
    def __init__(...):
        self.host = host
        self.port = int(port)
        self._sock = connect()
    
    def connect():
        for res in socket.getaddrinfo(self.host, self.port, self.socket_type,
                                      socket.SOCK_STREAM):
            family, socktype, proto, canonname, socket_address = res
            sock = socket.socket(family, socktype, proto)
            ...
            # TCP_NODELAY
            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
            # connect
            sock.connect(socket_address)
            ...
            return sock
    
    def pack_command(self, *args):
        command = []
        args = tuple(args[0].encode().split()) + args[1:]
        ...
        buff = SYM_EMPTY.join((SYM_STAR, str(len(args)).encode(), SYM_CRLF))
        for arg in imap(self.encoder.encode, args):
            buff = SYM_EMPTY.join(
                        (buff, SYM_DOLLAR, str(arg_length).encode(), SYM_CRLF))
            output.append(buff)
            output.append(arg)
            ...
        return command
    
    def send_command(self, *args, **kwargs):
        
        command = self.pack_command(args)
            
        if isinstance(command, str):
                command = [command]
        for item in command:
            self._sock.sendall(*args, **kwargs)
        
  • connection维持了一个socket连接
  • 收到redis的命令使用pack_command进行RESP的序列化打包
  • 数据包使用socket发送
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# connection.py

class Connection(object)
    
    def __init__(...,parser_class=PythonParser,...);
        self._parser = parser_class(socket_read_size=socket_read_size)

    def read_response(self):
        response = self._parser.read_response()
        return response

class PythonParser(BaseParser):
    "Plain Python parsing class"
    def __init__(self, socket_read_size):
        self.socket_read_size = socket_read_size
        ...
        self._sock = connection._sock
        self._buffer = SocketBuffer(self._sock,
                                    self.socket_read_size,
                                    connection.socket_timeout)
        self.encoder = connection.encoder
    
    def read_response(self):
        raw = self._buffer.readline()

        byte, response = raw[:1], raw[1:]

        # server returned an error
        if byte == b'-':
            response = nativestr(response)
            ...
        # single value
        elif byte == b'+':
            pass
        # int value
        elif byte == b':':
            response = long(response)
        # bulk response
        elif byte == b'$':
            length = int(response)
            response = self._buffer.read(length)
        # multi-bulk response
        elif byte == b'*':
            length = int(response)
            response = [self.read_response() for i in xrange(length)]
        if isinstance(response, bytes):
            response = self.encoder.decode(response)
        return response
    
  • connection创建了一个parser用于读取和解析服务响应
  • 默认的PythonParser使用SocketBuffer读取socket数据
  • read_response实现了RESP协议的解析过程。对于每行数据\r\n,第一个字符是响应类型,剩下的数据内容,如果是multi-bulk还需要循环读取多行。建议对比协议和发送请求进行详细阅读理解。

PythonParser是pure-python的实现,如果希望更高效,可以额外安装hiredis,会提供一个基于c的解析器HiredisParser

连接池

redis-py使用连接池来提高执行效率,主要的使用方法3个步骤,创建连接池,从连接池中获取有效连接执行命令,完成后释放连接,语句如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# redis.py
connection_pool = ConnectionPool(**kwargs)
pool.get_connection(command_name, **options)

try:
    conn.send_command(*args)
    ...
finally:
    ...
    pool.release(conn)

连接池一定要注意释放,可以用try/finally,也可以使用上下文装饰器,这里使用了前者

连接池的具体实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# connection.py
class ConnectionPool(object):
    
    def __init__(...):
        self._available_connections = []
        self._in_use_connections = set()
    
    def make_connection(self):
        "Create a new connection"
        return self.connection_class(**self.connection_kwargs)
    
    def get_connection(self, command_name, *keys, **options)
        try:
            connection = self._available_connections.pop()
        except IndexError:
            connection = self.make_connection()
        self._in_use_connections.add(connection)
        ...
        connection.connect()
        return connection
        
    def release(self, connection):
        "Releases the connection back to the pool"
        
        try:
            self._in_use_connections.remove(connection)
        except KeyError:
            # Gracefully fail when a connection is returned to this pool
            # that the pool doesn't actually own
            pass

        self._available_connections.append(connection)
  • 连接池内部使用可用连接数组和正在使用连接集合管理所有连接
  • 获取连接时候,优先从可用连接数组获取;没有可用连接会创建新的连接
  • 所有获取到的连接会加入正在使用连接, 如果当前连接未连接会先建立连接
  • 连接释放时会从正在使用连接集合中移除,然后加入可用连接数组数组,等待复用

到这里,我们基本理顺了一个redis指令执行的流程:

1
2
r = redis.Redis(host='localhost', port=6379, db=0)
r.set('foo', 'bar')

pipeline

redis还支持pipeline管线模式,可以批量发送一些命令,然后获取所有的结果:

1
2
3
4
>>> r = redis.Redis(...)
>>> pipe = r.pipeline()
>>> pipe.set('foo', 'bar').sadd('faz', 'baz').incr('auto_number').execute()
[True, True, 6]

pipeline的继承自redis,做了一些扩展

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class Pipeline(Redis)
    def __init__(...):
        self.command_stack = []
        
    def execute_command(self, *args, **kwargs):
        self.command_stack.append((args, options))
        return self
        
    def execute(self, raise_on_error=True):
        "Execute all the commands in the current pipeline"
        stack = self.command_stack
        
        execute = self._execute_pipeline
        
        execute(conn, stack, raise_on_error)
    
    def _execute_pipeline(self, connection, commands, raise_on_error):
        # build up all commands into a single request to increase network perf
        all_cmds = connection.pack_commands([args for args, _ in commands])
        connection.send_packed_command(all_cmds)

        response = []
        for args, options in commands:
            response.append(
                self.parse_response(connection, args[0], **options))
     
        return response
  • pipeline使用一个stack来临时存储批量发送的命令,同时返回自身,这样可以支持链式语法
  • execute时候才正式发送指令
  • 发送指令后再依次获取服务响应,打包称一个数组统一返回

LuaScript

redis使用lua脚本来处理事务,使用方法如下:

1
2
3
4
5
6
7
8
9
>>> r = redis.Redis()
>>> lua = """
... local value = redis.call('GET', KEYS[1])
... value = tonumber(value)
... return value * ARGV[1]"""
>>> multiply = r.register_script(lua)
>>> r.set('foo', 2)
>>> multiply(keys=['foo'], args=[5])
10
  • lua脚本中定义了KEYS和ARGV两个数组用于接受参数,KEY的第一个值(lua数组从1开始)是key的名称,ARGV的第一个值是倍数
  • 脚本需要进行注册
  • redis-py中把参数传递给脚本并执行得到结果

脚本的实现原理:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# client.py
class Redis(object):

    def register_script(self, script):
        return Script(self, script
        
    def script_load(self, script):
        "Load a Lua ``script`` into the script cache. Returns the SHA."
        return self.execute_command('SCRIPT LOAD', script)
    
    def evalsha(self, sha, numkeys, *keys_and_args):
        return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args)

class Script(object):
    "An executable Lua script object returned by ``register_script``"

    def __init__(self, registered_client, script):
        self.registered_client = registered_client
        self.script = script
        # Precalculate and store the SHA1 hex digest of the script.
        ...
        self.sha = hashlib.sha1(script).hexdigest()

    def __call__(self, keys=[], args=[], client=None):
        "Execute the script, passing any required ``args``"
        args = tuple(keys) + tuple(args)
        # make sure the Redis server knows about the script
        ...
        try:
            return client.evalsha(self.sha, len(keys), *args)
        except NoScriptError:
            # Maybe the client is pointed to a differnet server than the client
            # that created this instance?
            # Overwrite the sha just in case there was a discrepancy.
            self.sha = client.script_load(self.script)
            return client.evalsha(self.sha, len(keys), *args
  • lua脚本通过 script load 加载到redis服务,并获得一个sha值,sha值可以重用,避免多次加载同一脚本
  • 通过 evalsha 执行脚本

lock

redis-py还提供了一个全局锁的实现, 可以跨进程同步:

1
2
3
4
5
try:
    with r.lock('my-lock-key', blocking_timeout=5) as lock:
        # code you want executed only after the lock has been acquired
except LockError:
    # the lock wasn't acquired

下面是其实实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# lock.py
class Lock(object):

    LUA_RELEASE_SCRIPT = """
        local token = redis.call('get', KEYS[1])
        if not token or token ~= ARGV[1] then
            return 0
        end
        redis.call('del', KEYS[1])
        return 1
    ""
    
    def __init__(...):
        ...
        self.redis = redis
        self.name = name
        self.local = threading.local() if self.thread_local else dummy()
        self.local.token = None
        cls = self.__class__
        cls.lua_release = client.register_script(cls.LUA_RELEASE_SCRIPT)
  
    def __enter__(self):
        # force blocking, as otherwise the user would have to check whether
        # the lock was actually acquired or not.
        if self.acquire(blocking=True):
            return self
        raise LockError("Unable to acquire lock within the time specified")

    def __exit__(self, exc_type, exc_value, traceback):
        self.release()
        
    def acquire(self, blocking=None, blocking_timeout=None, token=None):
        ...
        token = uuid.uuid1().hex.encode()
        self.redis.set(self.name, token, nx=True, px=timeout)
        ...
        self.local.token = token
        ...
    
    def release(self):
        expected_token = self.local.token
        self.local.token = None
        self.lua_release(keys=[self.name],
                                     args=[expected_token],
                                     client=self.redis)
  • LUA_RELEASE_SCRIPT使用lua脚本来处理删除token的事务
  • lock使用线程变量来存储token值,保证多线程并发可以正常
  • __enter__和__exit__是装饰器语法,保证可以合法的获取和释放
  • 申请锁的时候获取一个临时的token,然后设置到redis服务中,这个token是有生命周期的,可以超时自动释放。
  • 释放的时候清理线程本地变量和redis服务中的变量

TODO

源码中的 publish/subscibeMonitor ,Sentinel 和事务等内容,个人认为并不在主线任务上,留待后续再行介绍。

参考链接