signal在flask/django中都是很重要的解耦手段。flask的signal依赖blinker实现,django的signal也很类似。blinker库是纯python实现的代码简单,功能强大的signal库。本文我们从blinker开始,一起了解python-web开发的signal机制:

  • blinker的api
  • blinker-signal的实现
  • flask-signal的实现
  • django-signal的实现
  • weakref介绍
  • 小结
  • 小技巧

blinker简介

blinker源码使用 1.4 版本, 项目结构如下:

文件 描述
base.py 核心逻辑
_saferef.py 安全引用相关逻辑
_utilities.py 工具类

blinker的API

blinker的api使用示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from blinker import signal

def subscriber1(sender):
    print("1 Got a signal sent by %r" % sender)

def subscriber2(sender):
    print("2 Got a signal sent by %r" % sender)

ready = signal('ready')
print(ready)
ready.connect(subscriber1)
ready.connect(subscriber2)
ready.send("go")

示例的日志输出:

1
2
3
<blinker.base.NamedSignal object at 0x7f93a805ad00; 'ready'>
1 Got a signal sent by 'go'
2 Got a signal sent by 'go'

可以看到signal是发布/订阅模式。或者换个更常见的说法,事件中心:

  • ready = signal('ready') 创建名为ready的事件中心
  • ready.connect(subscriber1) 给ready事件中心添加事件监听器
  • ready.send("go") 向ready事件中心派发事件,这样事件监听器会收到事件并进行处理

signal的实现

signal默认单例,提供开箱即用的API:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class NamedSignal(Signal):
    """A named generic notification emitter."""

    def __init__(self, name, doc=None):
        Signal.__init__(self, doc)
        self.name = name

class Namespace(dict):

    def signal(self, name, doc=None):
        try:
            return self[name]
        except KeyError:
            return self.setdefault(name, NamedSignal(name, doc))

signal = Namespace().signal

需要说明一下的是,signal的单例是和name绑定的。同一个名称得到同一个NamedSignal对象,不同名称得到的NamedSignal对象不一样。

NamedSignal的父类Signal的构造方法,包括1)事件接收器字典receivers:以事件接收器id为key和事件接收器为value;2)接收器ID-发送器ID的字典:以接收器ID为key和发送器ID集合为value;3)和2类似的字典,只不过是反向的,key为发送器ID,value为接收器集合。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
ANY = symbol('ANY')

class Signal(object):

    ANY = ANY
    
    def __init__(self, doc=None)
        self.receivers = {}
        self._by_receiver = defaultdict(set)
        self._by_sender = defaultdict(set)
        ...

Signal的connect函数添加消息接收器,可以看到sender和receiver是多对多的关系。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def connect(self, receiver, sender=ANY, weak=True):
    receiver_id = hashable_identity(receiver)
    receiver_ref = receiver
    sender_id = ANY_ID

    self.receivers.setdefault(receiver_id, receiver_ref)
    self._by_sender[sender_id].add(receiver_id)
    self._by_receiver[receiver_id].add(sender_id)
    del receiver_ref

    return receiver

Signal的send函数将消息发送给所有关注该sender的接收器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def send(self, *sender, **kwargs):
    sender = sender[0]
    # 循环执行所有的receiver
    return [(receiver, receiver(sender, **kwargs))
            for receiver in self.receivers_for(sender)]

def receivers_for(self, sender):
    sender_id = hashable_identity(sender)
    # 根据sender_id找receiver_id
    if sender_id in self._by_sender:
        # 2个set的合集 
        ids = (self._by_sender[ANY_ID] |
               self._by_sender[sender_id])
    else:
        ids = self._by_sender[ANY_ID].copy()
    for receiver_id in ids:
        receiver = self.receivers.get(receiver_id)
        if receiver is None:
            continue
        # 迭代器
        yield receiver

有始有终,Signal使用disconnect函数注销消息的接收器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def disconnect(self, receiver, sender=ANY):
    sender_id = ANY_ID
    receiver_id = hashable_identity(receiver)
    self._disconnect(receiver_id, sender_id)

def _disconnect(self, receiver_id, sender_id):
    if sender_id == ANY_ID:
        if self._by_receiver.pop(receiver_id, False):
            for bucket in self._by_sender.values():
                bucket.discard(receiver_id)
        self.receivers.pop(receiver_id, None)
    else:
        self._by_sender[sender_id].discard(receiver_id)
        self._by_receiver[receiver_id].discard(sender_id)

为了便于理解signal机制,我们暂时忽略了weakref相关的代码,稍后再进行介绍。

flask-signal的实现

flask-signal依赖blinker的实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# flask.signals.py

from blinker import Namespace

_signals = Namespace()

template_rendered = _signals.signal("template-rendered")
before_render_template = _signals.signal("before-render-template")
request_started = _signals.signal("request-started")
request_finished = _signals.signal("request-finished")
request_tearing_down = _signals.signal("request-tearing-down")
got_request_exception = _signals.signal("got-request-exception")
appcontext_tearing_down = _signals.signal("appcontext-tearing-down")
appcontext_pushed = _signals.signal("appcontext-pushed")
appcontext_popped = _signals.signal("appcontext-popped")
message_flashed = _signals.signal("message-flashed")

从上面代码可以看到flask使用blinker预制了多个signal。以request_started为例, flask在处理request时候会向request_started派发事件:

1
2
3
4
5
6
7
8
# flask.app.py

from .signals import request_started

def full_dispatch_request(self):
    ...
    request_started.send(self)
    ...

我们可以在自己的代码中,这样注册事件监听:

1
2
3
4
5
def log_request(sender, **extra):
    sender.logger.debug('Request context is set up')

from flask import request_started
request_started.connect(log_request, app)

这样就可以很方便的使用signal获取到flask在各个阶段的数据。

django-signal的实现

django-signal虽然是独立实现,但是模式和blinker非常类似。Signal构造函数创建了一个对象,充当事件中心。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# django/dispatch/dispatcher.py

def _make_id(target):
    if hasattr(target, '__func__'):
        return (id(target.__self__), id(target.__func__))
    return id(target)

NONE_ID = _make_id(None)

# A marker for caching
NO_RECEIVERS = object()

class Signal:

    def __init__(self, providing_args=None, use_caching=False):
        """
        Create a new signal.
        """
        self.receivers = []
        self.lock = threading.Lock()
        self.use_caching = use_caching
        self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {}
        self._dead_receivers = False

connect核心功能就是为事件监听器构建唯一标识(receiver_id,sender_id),然后加入receivers数组。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def connect(self, receiver, sender=None, weak=True, dispatch_uid=None):
    from django.conf import settings

    lookup_key = (_make_id(receiver), _make_id(sender))

    ref = weakref.ref
    receiver_object = receiver
    receiver = ref(receiver)

    with self.lock:
        if not any(r_key == lookup_key for r_key, _ in self.receivers):
            self.receivers.append((lookup_key, receiver))

send函数和blinker的send类似:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def send(self, sender, **named):
    return [
        (receiver, receiver(signal=self, sender=sender, **named))
        for receiver in self._live_receivers(sender)
    ]

def _live_receivers(self, sender):
    with self.lock:
        senderkey = _make_id(sender)
        receivers = []
        for (receiverkey, r_senderkey), receiver in self.receivers:
            if r_senderkey == NONE_ID or r_senderkey == senderkey:
                receivers.append(receiver)
    ...
    non_weak_receivers = []
    for receiver in receivers:
        non_weak_receivers.append(receiver)
    return non_weak_receivers

django-signal额外提供了一个receiver装饰器,方便业务使用:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def receiver(signal, **kwargs):
    
    def _decorator(func):
        if isinstance(signal, (list, tuple)):
            for s in signal:
                s.connect(func, **kwargs)
        else:
            signal.connect(func, **kwargs)
        return func
    return _decorator

django的model中额外包装了ModelSignal类并且预制了一些signal:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class ModelSignal(Signal):

    def _lazy_method(self, method, apps, receiver, sender, **kwargs):
        from django.db.models.options import Options

        # This partial takes a single optional argument named "sender".
        partial_method = partial(method, receiver, **kwargs)
        if isinstance(sender, str):
            apps = apps or Options.default_apps
            apps.lazy_model_operation(partial_method, make_model_tuple(sender))
        else:
            return partial_method(sender)

    def connect(self, receiver, sender=None, weak=True, dispatch_uid=None, apps=None):
        self._lazy_method(
            super().connect, apps, receiver, sender,
            weak=weak, dispatch_uid=dispatch_uid,
        )

...
# 定义模型各个阶段的signal
pre_init = ModelSignal(use_caching=True)
post_init = ModelSignal(use_caching=True)

pre_save = ModelSignal(use_caching=True)
post_save = ModelSignal(use_caching=True)

pre_delete = ModelSignal(use_caching=True)
post_delete = ModelSignal(use_caching=True)

m2m_changed = ModelSignal(use_caching=True)

pre_migrate = Signal()

signal的使用方式在receiver装饰器的注释中有介绍:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
"""
A decorator for connecting receivers to signals. Used by passing in the
signal (or list of signals) and keyword arguments to connect::

    @receiver(post_save, sender=MyModel)
    def signal_receiver(sender, **kwargs):
        ...

    @receiver([post_save, post_delete], sender=MyModel)
    def signals_receiver(sender, **kwargs):
        ...
"""

这样利用signal机制,可以对MyModel进行一些额外的逻辑处理,又避免了代码的硬耦合。

weakref 介绍

了解了signal的各种实现和使用后,我们再回头学习blinker-signal中另外一个环节weakref。weakref可以显著提高signal的性能, 请看下面示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def test_weak_value_dict(cache):
    c_list = []

    class C:
        def method(self):
            return ("method called!", id(self))

    c1 = C()
    c2 = C()
    c3 = C()
    c_list.append(c1)
    c_list.append(c2)
    c_list.append(c3)
    del c1, c2, c3

    def do_cache(cache, name, target):
        cache[name] = target

    for idx, target in enumerate(c_list):
        do_cache(cache, idx, target)

    for k, v in cache.items():
        print("before", k, v.method())
    del c_list
    gc.collect()
    for x, y in cache.items():
        print("after", x, y.method())

test_weak_value_dict({})
print("==" * 10)
test_weak_value_dict(weakref.WeakValueDictionary())

在test_weak_value_dict函数中,创建了3个对象,将对象放到一个列表和cache中,完成后再删除对象和对象列表并进行gc。如果cache的实现是set,那么gc后cache中任然存在3个对象,也就是对象不会回收;如果是使用WeakValueDictionary实现的cache,则部分对象进行了回收。在一个事件中心,如果监听函数取消后却无法释放回收,内存会持续增长。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
before 0 ('method called!', 140431874960640)
before 1 ('method called!', 140431874959440)
before 2 ('method called!', 140431874959968)
after 0 ('method called!', 140431874960640)
after 1 ('method called!', 140431874959440)
after 2 ('method called!', 140431874959968)
====================
before 0 ('method called!', 140431875860416)
before 1 ('method called!', 140431875860128)
before 2 ('method called!', 140431876163136)
after 2 ('method called!', 140431876163136)

为什么WeakValueDictionary还保留最后一个数据呢? 欢迎大家评论区交流

signal 小结

到这里我们可以知道blinker/flask/django的signal都是单纯的python消息中心,和我们之前在gunicorn中使用的系统 signal 完全不一样。消息中心,可以用来进行业务逻辑的解耦,一般就包括三步:

  • 注册监听器
  • 派发事件
  • 注销监听器

小技巧

blinker中提供了一种 单例模式 的实现参考,我把它叫做 分组单例 , 组名相同会得到同一个对象实例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class _symbol(object):

    def __init__(self, group):
        """Construct a new group symbol."""
        # 原文是name,我把它换成了group,感觉这样更容易理解一些
        self.__group__ = self.group = group

    def __reduce__(self):
        return symbol, (self.group,)

    def __repr__(self):
        return self.group
_symbol.__group__ = 'symbol'


class symbol(object):
    """A constant symbol.
    # group相同的symbol是同一个对象
    >>> symbol('foo') is symbol('foo')
    True
    >>> symbol('foo')
    foo
    """
    symbols = {}

    def __new__(cls, group):
        try:
            return cls.symbols[group]
        except KeyError:
            return cls.symbols.setdefault(group, _symbol(group))

ANY = symbol('ANY')  # 单例

参考链接: