Python 操作 Redis 必备神器:redis-py 源码阅读
△点击上方“Python猫”关注 ,回复“2”加入交流群
作者:肖恩
来源:游戏不存在
redis协议规范
redis-py概述
redis-py基础使用
RedisCommand
Redis连接
连接池
pipeline
LuaScript
lock
redis协议规范
RESP(Redis Serialization Protocol)是Redis客户端和服务端的通讯协议。数据示例如下:
+OK\r\n
-Error message\r\n
:1000\r\n
$6\r\nfoobar\r\n
*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
*3\r\n:1\r\n:2\r\n:3\r\n
协议定义了5种类型:
+
前缀表示字符串,后接字符串文本,以\r\n
结尾,通常用于命令结果-
前缀表示异常信息,后接以空格连接的两个字符串,以\r\n
结尾:
前缀表示整数,后接整数,以\r\n
结尾$
前缀表示定长的字符串,后接字符串长度,\r\n
和字符串文本,以\r\n
结尾*
前缀表示数组,后接数组的长度和\r\n
,数组的每个元素可以由上面4种类型构成
协议还约定了Null等的实现,详情请看参考链接部分。下面示例了 LLEN mylist
的请求和响应
C: *2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n
S: :48293\r\n
客户端发送了 LLEN mylist指令,指令序列化成RESP长度为2的数组,2个定长字符串分别是llen和mylist。
服务端响应整数48293,即mylist数据的长度。
Request-Response model是redis服务的请求响应模型,可以对比http协议的模式。redis服务端响应客户端的指令,处理后响应回复客户端,可以简单理解为一问一答。当然pipeline,pub/sub和monitor除外。
Redis-py 源码概述
本文使用的redis-py版本是 3.5.3
, 文件及包信息是:
名称 | 描述 |
---|---|
client | redis的api |
connection | 连接,连接池等 |
exceptions | 异常和错误 |
lock | 锁的实现 |
sentinel | 扩展的哨兵连接 |
utils | 工具 |
_compat | 都版本适配包 |
redis-py未依赖其它的包,代码量虽然不多,6000行左右,但是100%理解还是需要一定的时间和基础。本文从redis-py日常使用出发,也是redis-py的README中内容,介绍这些基础功能在源码中的实现。
redis-py基础使用
RedisCommand
redis-py的简单使用:
>>> import redis
>>> r = redis.Redis(host='localhost', port=6379, db=0)
>>> r.set('foo', 'bar')
True
>>> r.get('foo')
b'bar'
追踪redis-py的实现:
# client.py
class Redis(object)
def __init__(self, host='localhost', port=6379,
db=0, ..):
...
connection_pool = ConnectionPool(**kwargs)
self.connection_pool = connection_pool
...
def set(self, name, value, ex=None, px=None, nx=False, xx=False, keepttl=False)
...
return self.execute_command('SET', *pieces)
def get(self, name):
return self.execute_command('GET', name)
# COMMAND EXECUTION AND PROTOCOL PARSING
def execute_command(self, *args, **options):
"Execute a command and return a parsed response"
conn = self.connection or pool.get_connection(command_name, **options)
conn.send_command(*args)
return self.parse_response(conn, command_name, **options)
注意:为了便于理解,示例代码和实际的代码有出入,省去了复杂的逻辑和异常等
redis首先创造了一个到redis服务的连接,
redis包装了redis的所有指令,使用命令模式执行指令。
执行命令就是使用创建的连接发送指令,然后解析和获取响应。这和redis协议上的Request-Response model行为一致。
Redis连接
继续查看连接的创建和执行:
# connection.py
class Connection(object)
def __init__(...):
self.host = host
self.port = int(port)
self._sock = connect()
def connect():
for res in socket.getaddrinfo(self.host, self.port, self.socket_type,
socket.SOCK_STREAM):
family, socktype, proto, canonname, socket_address = res
sock = socket.socket(family, socktype, proto)
...
# TCP_NODELAY
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# connect
sock.connect(socket_address)
...
return sock
def pack_command(self, *args):
command = []
args = tuple(args[0].encode().split()) + args[1:]
...
buff = SYM_EMPTY.join((SYM_STAR, str(len(args)).encode(), SYM_CRLF))
for arg in imap(self.encoder.encode, args):
buff = SYM_EMPTY.join(
(buff, SYM_DOLLAR, str(arg_length).encode(), SYM_CRLF))
output.append(buff)
output.append(arg)
...
return command
def send_command(self, *args, **kwargs):
command = self.pack_command(args)
if isinstance(command, str):
command = [command]
for item in command:
self._sock.sendall(*args, **kwargs)
connection维持了一个socket连接
收到redis的命令使用pack_command进行RESP的序列化打包
数据包使用socket发送
# connection.py
class Connection(object)
def __init__(...,parser_class=PythonParser,...);
self._parser = parser_class(socket_read_size=socket_read_size)
def read_response(self):
response = self._parser.read_response()
return response
class PythonParser(BaseParser):
"Plain Python parsing class"
def __init__(self, socket_read_size):
self.socket_read_size = socket_read_size
...
self._sock = connection._sock
self._buffer = SocketBuffer(self._sock,
self.socket_read_size,
connection.socket_timeout)
self.encoder = connection.encoder
def read_response(self):
raw = self._buffer.readline()
byte, response = raw[:1], raw[1:]
# server returned an error
if byte == b'-':
response = nativestr(response)
...
# single value
elif byte == b'+':
pass
# int value
elif byte == b':':
response = long(response)
# bulk response
elif byte == b'$':
length = int(response)
response = self._buffer.read(length)
# multi-bulk response
elif byte == b'*':
length = int(response)
response = [self.read_response() for i in xrange(length)]
if isinstance(response, bytes):
response = self.encoder.decode(response)
return response
connection创建了一个parser用于读取和解析服务响应
默认的PythonParser使用SocketBuffer读取socket数据
read_response实现了RESP协议的解析过程。对于每行数据
\r\n
,第一个字符是响应类型,剩下的数据内容,如果是multi-bulk还需要循环读取多行。建议对比协议和发送请求进行详细阅读理解。
PythonParser是pure-python的实现,如果希望更高效,可以额外安装hiredis,会提供一个基于c的解析器 HiredisParser
。
连接池
redis-py使用连接池来提高执行效率,主要的使用方法3个步骤,创建连接池,从连接池中获取有效连接执行命令,完成后释放连接,语句如下:
# redis.py
connection_pool = ConnectionPool(**kwargs)
pool.get_connection(command_name, **options)
try:
conn.send_command(*args)
...
finally:
...
pool.release(conn)
连接池一定要注意释放,可以用try/finally,也可以使用上下文装饰器,这里使用了前者
连接池的具体实现:
# connection.py
class ConnectionPool(object):
def __init__(...):
self._available_connections = []
self._in_use_connections = set()
def make_connection(self):
"Create a new connection"
return self.connection_class(**self.connection_kwargs)
def get_connection(self, command_name, *keys, **options)
try:
connection = self._available_connections.pop()
except IndexError:
connection = self.make_connection()
self._in_use_connections.add(connection)
...
connection.connect()
return connection
def release(self, connection):
"Releases the connection back to the pool"
try:
self._in_use_connections.remove(connection)
except KeyError:
# Gracefully fail when a connection is returned to this pool
# that the pool doesn't actually own
pass
self._available_connections.append(connection)
连接池内部使用可用连接数组和正在使用连接集合管理所有连接
获取连接时候,优先从可用连接数组获取;没有可用连接会创建新的连接
所有获取到的连接会加入正在使用连接, 如果当前连接未连接会先建立连接
连接释放时会从正在使用连接集合中移除,然后加入可用连接数组数组,等待复用
到这里,我们基本理顺了一个redis指令执行的流程:
r = redis.Redis(host='localhost', port=6379, db=0)
r.set('foo', 'bar')
pipeline
redis还支持pipeline管线模式,可以批量发送一些命令,然后获取所有的结果:
>>> r = redis.Redis(...)
>>> pipe = r.pipeline()
>>> pipe.set('foo', 'bar').sadd('faz', 'baz').incr('auto_number').execute()
[True, True, 6]
pipeline的继承自redis,做了一些扩展
class Pipeline(Redis)
def __init__(...):
self.command_stack = []
def execute_command(self, *args, **kwargs):
self.command_stack.append((args, options))
return self
def execute(self, raise_on_error=True):
"Execute all the commands in the current pipeline"
stack = self.command_stack
execute = self._execute_pipeline
execute(conn, stack, raise_on_error)
def _execute_pipeline(self, connection, commands, raise_on_error):
# build up all commands into a single request to increase network perf
all_cmds = connection.pack_commands([args for args, _ in commands])
connection.send_packed_command(all_cmds)
response = []
for args, options in commands:
response.append(
self.parse_response(connection, args[0], **options))
return response
pipeline使用一个stack来临时存储批量发送的命令,同时返回自身,这样可以支持链式语法
execute时候才正式发送指令
发送指令后再依次获取服务响应,打包称一个数组统一返回
LuaScript
redis使用lua脚本来处理事务,使用方法如下:
>>> r = redis.Redis()
>>> lua = """
... local value = redis.call('GET', KEYS[1])
... value = tonumber(value)
... return value * ARGV[1]"""
>>> multiply = r.register_script(lua)
>>> r.set('foo', 2)
>>> multiply(keys=['foo'], args=[5])
10
lua脚本中定义了KEYS和ARGV两个数组用于接受参数,KEY的第一个值(lua数组从1开始)是key的名称,ARGV的第一个值是倍数
脚本需要进行注册
redis-py中把参数传递给脚本并执行得到结果
脚本的实现原理:
# client.py
class Redis(object):
def register_script(self, script):
return Script(self, script
def script_load(self, script):
"Load a Lua ``script`` into the script cache. Returns the SHA."
return self.execute_command('SCRIPT LOAD', script)
def evalsha(self, sha, numkeys, *keys_and_args):
return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args)
class Script(object):
"An executable Lua script object returned by ``register_script``"
def __init__(self, registered_client, script):
self.registered_client = registered_client
self.script = script
# Precalculate and store the SHA1 hex digest of the script.
...
self.sha = hashlib.sha1(script).hexdigest()
def __call__(self, keys=[], args=[], client=None):
"Execute the script, passing any required ``args``"
args = tuple(keys) + tuple(args)
# make sure the Redis server knows about the script
...
try:
return client.evalsha(self.sha, len(keys), *args)
except NoScriptError:
# Maybe the client is pointed to a differnet server than the client
# that created this instance?
# Overwrite the sha just in case there was a discrepancy.
self.sha = client.script_load(self.script)
return client.evalsha(self.sha, len(keys), *args
lua脚本通过
script load
加载到redis服务,并获得一个sha值,sha值可以重用,避免多次加载同一脚本通过
evalsha
执行脚本
lock
redis-py还提供了一个全局锁的实现, 可以跨进程同步:
try:
with r.lock('my-lock-key', blocking_timeout=5) as lock:
# code you want executed only after the lock has been acquired
except LockError:
# the lock wasn't acquired
下面是其实实现:
# lock.py
class Lock(object):
LUA_RELEASE_SCRIPT = """
local token = redis.call('get', KEYS[1])
if not token or token ~= ARGV[1] then
return 0
end
redis.call('del', KEYS[1])
return 1
""
def __init__(...):
...
self.redis = redis
self.name = name
self.local = threading.local() if self.thread_local else dummy()
self.local.token = None
cls = self.__class__
cls.lua_release = client.register_script(cls.LUA_RELEASE_SCRIPT)
def __enter__(self):
# force blocking, as otherwise the user would have to check whether
# the lock was actually acquired or not.
if self.acquire(blocking=True):
return self
raise LockError("Unable to acquire lock within the time specified")
def __exit__(self, exc_type, exc_value, traceback):
self.release()
def acquire(self, blocking=None, blocking_timeout=None, token=None):
...
token = uuid.uuid1().hex.encode()
self.redis.set(self.name, token, nx=True, px=timeout)
...
self.local.token = token
...
def release(self):
expected_token = self.local.token
self.local.token = None
self.lua_release(keys=[self.name],
args=[expected_token],
client=self.redis)
LUA_RELEASE_SCRIPT使用lua脚本来处理删除token的事务
lock使用线程变量来存储token值,保证多线程并发可以正常
_enter和_exit是装饰器语法,保证可以合法的获取和释放
申请锁的时候获取一个临时的token,然后设置到redis服务中,这个token是有生命周期的,可以超时自动释放。
释放的时候清理线程本地变量和redis服务中的变量
TODO
源码中的 publish/subscibe
, Monitor
, Sentinel
和事务等内容,个人认为并不在主线任务上,留待后续再行介绍。
参考链接
https://redis.io/topics/protocol
https://github.com/andymccurdy/redis-py
https://pypi.org/project/hiredis/#description
近期热门文章推荐:
分享与在看是对我最大的支持!