本章我们从celery的蓝图学习celery的实现细节。
Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。
本文是是celery源码解析的第七篇,在前六篇里分别介绍了:
- 神器 celery 源码解析- vine实现Promise功能
- 神器 celery 源码解析- py-amqp实现AMQP协议
- 神器 celery 源码解析- kombu,一个python实现的消息库
- 神器 celery 源码解析- kombu的企业级算法
- 神器 celery 源码解析- celery启动流程分析
- 神器 celery 源码解析- celery启动日志跟踪
蓝图设计
celery的蓝图,官方的解释是 A directed acyclic graph of reusable components ,翻译过来就是 可重用组件的有向无环图 。有WorkController(也叫worker)和Consumer两个蓝图,每个蓝图又由一些step组成,这些step根据依赖关系(requires)组成下面的树结构:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
WorkController(Blueprint)
|- StateDB
|- Timer
|- Hub
|- Pool
|- WorkerComponent(Autoscaler)
|- Beat
|- Consumer(Blueprint)
|- Connection
|- Agent
|- Events
|- Mingle
|- Gossip
|- Tasks
|- Control
|- Heart
|- Evloop
|
其中Consumer是WorkController的一个step,这个step又启动了一个Consumer的蓝图,形成一个蓝图嵌蓝图的结构。蓝图这个词,可以理解为celery启动的时候需要一些步骤,这些步骤是有依赖顺序的,同级的步骤构成一个蓝图。
Worker蓝图包括{StateDB, Timer, Hub, Pool, Autoscaler, Beat, Consumer}
七个步骤,一般情况下仅仅启动了其中的三个Hub, Pool, Consumer
:
1
2
3
4
5
6
7
8
9
|
[2021-11-24 15:53:12,984: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2021-11-24 15:53:12,988: DEBUG/MainProcess] | Worker: Building graph...
[2021-11-24 15:53:12,988: DEBUG/MainProcess] | Worker: New boot order: {StateDB, Timer, Hub, Pool, Autoscaler, Beat, Consumer}
...
[2021-11-24 15:53:13,062: DEBUG/MainProcess] | Worker: Starting Hub
[2021-11-24 15:53:13,062: DEBUG/MainProcess] ^-- substep ok
[2021-11-24 15:53:13,062: DEBUG/MainProcess] | Worker: Starting Pool
[2021-11-24 15:53:13,410: DEBUG/MainProcess] ^-- substep ok
[2021-11-24 15:53:13,411: DEBUG/MainProcess] | Worker: Starting Consumer
|
这七个蓝图的顺序和配置的顺序是有差异的:
1
2
3
4
5
6
7
8
9
|
default_steps = {
'celery.worker.components:Hub',
'celery.worker.components:Pool',
'celery.worker.components:Beat',
'celery.worker.components:Timer',
'celery.worker.components:StateDB',
'celery.worker.components:Consumer',
'celery.worker.autoscale:WorkerComponent',
}
|
Consumer蓝图包括{Connection, Events, Mingle, Tasks, Control, Gossip, Agent, Heart, event loop}
十个步骤,一般情况下除了Agent
, 其它都会启动。
1
2
3
4
|
[2021-11-24 15:53:13,005: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2021-11-24 15:53:13,005: DEBUG/MainProcess] | Consumer: Building graph...
[2021-11-24 15:53:13,038: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Gossip, Agent, Heart, event loop}
...
|
Blueprint
主要有2个实现函数:apply
创建各个step,start
启动各个step:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
def apply(self, parent, **kwargs):
# 创建step
self._debug('Preparing bootsteps.')
order = self.order = []
steps = self.steps = self.claim_steps()
self._debug('Building graph...')
for S in self._finalize_steps(steps):
step = S(parent, **kwargs)
steps[step.name] = step
order.append(step)
self._debug('New boot order: {%s}',
', '.join(s.alias for s in self.order))
for step in order:
# 隐式的创建step
step.include(parent)
return self
def start(self, parent):
# 启动蓝图
...
for i, step in enumerate(s for s in parent.steps if s is not None):
self._debug('Starting %s', step.alias)
self.started = i + 1
step.start(parent)
logger.debug('^-- substep ok')
|
Step
和子类StartStopStep
使用enbled属性,决定step步骤的是否启用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
enabled = True
def include_if(self, parent):
return self.enabled
def _should_include(self, parent):
if self.include_if(parent):
return True, self.create(parent)
return False, None
def include(self, parent):
inc, ret = self._should_include(parent)
if inc:
self.obj = ret
parent.steps.append(self)
return inc
|
比如默认情况下StateDB会根据参数关闭:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
@click.option('-S',
'--statedb',
cls=CeleryOption,
type=click.Path(),
callback=lambda ctx, _, value: value or ctx.obj.app.conf.worker_state_db,
help_group="Worker Options",
help="Path to the state database. The extension '.db' may be "
"appended to the filename.")
...
class StateDB(bootsteps.Step):
"""Bootstep that sets up between-restart state database file."""
def __init__(self, w, **kwargs):
self.enabled = w.statedb
...
|
Step
类还有requires和last两个属性,blueprint可以根据这两个属性建立所有步骤的先后顺序:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
def _find_last(self):
# 查找steps的尾
return next((C for C in self.steps.values() if C.last), None)
def _firstpass(self, steps):
# 查找依赖关系
for step in steps.values():
step.requires = [symbol_by_name(dep) for dep in step.requires]
stream = deque(step.requires for step in steps.values())
# 广度优先的遍历
while stream:
for node in stream.popleft():
node = symbol_by_name(node)
if node.name not in self.steps:
steps[node.name] = node
stream.append(node.requires)
|
Consumer这个特殊的Step是这样嵌套启动Consumer蓝图的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
class Consumer(bootsteps.StartStopStep):
"""Bootstep starting the Consumer blueprint."""
def create(self, w):
# consumer_cls就是Consumer蓝图
c = w.consumer = self.instantiate(
w.consumer_cls, w.process_task,
hostname=w.hostname,
task_events=w.task_events,
init_callback=w.ready_callback,
initial_prefetch_count=prefetch_count,
pool=w.pool,
timer=w.timer,
app=w.app,
controller=w,
hub=w.hub,
worker_options=w.options,
disable_rate_limits=w.disable_rate_limits,
prefetch_multiplier=w.prefetch_multiplier,
)
return c
|
celery将启动过程分成多个step,每个step承担不同的功能,不同的step又组合成多个蓝图,这种方式可以灵活的定义启动流程,并且让业务功能解耦,更易维护。下面我们继续学习其中的一些step。
Connection-Step实现AMQP协议连接
Connection-Step主要功能是创建connection连接:
1
2
3
4
5
6
7
8
9
10
11
|
class Connection(bootsteps.StartStopStep):
"""Service managing the consumer broker connection."""
def __init__(self, c, **kwargs):
c.connection = None
super().__init__(c, **kwargs)
def start(self, c):
# 创建连接
c.connection = c.connect()
info('Connected to %s', c.connection.as_uri())
|
Pool-Step实现并发模型
Pool-Step主要功能是启动一个调度池:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# Initialize bootsteps
self.pool_cls = _concurrency.get_implementation(self.pool_cls)
def create(self, w):
...
# 启动concurrency模型
pool = w.pool = self.instantiate(
w.pool_cls, w.min_concurrency,
initargs=(w.app, w.hostname),
maxtasksperchild=w.max_tasks_per_child,
max_memory_per_child=w.max_memory_per_child,
timeout=w.time_limit,
soft_timeout=w.soft_time_limit,
putlocks=w.pool_putlocks and threaded,
lost_worker_timeout=w.worker_lost_wait,
threads=threaded,
max_restarts=max_restarts,
allow_restart=allow_restart,
forking_enable=True,
semaphore=semaphore,
sched_strategy=self.optimization,
app=w.app,
)
...
return pool
|
并发模型主要包括下面一些实现,比如基于fork的多进程,基于eventlet和gevent的协程和多线程等:
1
2
3
4
5
6
7
8
9
10
11
12
|
ALIASES = {
'prefork': 'celery.concurrency.prefork:TaskPool',
'eventlet': 'celery.concurrency.eventlet:TaskPool',
'gevent': 'celery.concurrency.gevent:TaskPool',
'solo': 'celery.concurrency.solo:TaskPool',
'processes': 'celery.concurrency.prefork:TaskPool', # XXX compat alias
'threads': 'celery.concurrency.thread:TaskPool'
}
def get_implementation(cls):
"""Return pool implementation by name."""
return symbol_by_name(cls, ALIASES)
|
在前一篇的日志中,我们知道默认使用的是prefork也就是多线程模式:
1
2
3
4
5
|
class TaskPool(BasePool):
"""Multiprocessing Pool implementation."""
# billiard提供的池模式
BlockingPool = BlockingPool
...
|
TaskPool的实现主要依赖billiard库,我们以后再行介绍,这里简单了解一下celery的并发模型都在concurrency模块之下即可。
Evloop-Step实现事件循环
Evloop-Step是由Consumer blueprint启动:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
class Evloop(bootsteps.StartStopStep):
"""Event loop service.
Note:
This is always started last.
"""
# [2021-11-24 20:08:31,037: DEBUG/MainProcess] | Consumer: Starting event loop
label = 'event loop'
last = True
def start(self, c):
self.patch_all(c)
c.loop(*c.loop_args())
|
这里的loop在consumer中定义, 默认使用异步循环(asynloop)和同步循环(synloop)中的同步循环:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
def synloop(obj, connection, consumer, blueprint, hub, qos,
heartbeat, clock, hbrate=2.0, **kwargs):
"""Fallback blocking event loop for transports that doesn't support AIO."""
RUN = bootsteps.RUN
on_task_received = obj.create_task_handler()
perform_pending_operations = obj.perform_pending_operations
...
consumer.on_message = on_task_received
consumer.consume()
obj.on_ready()
while blueprint.state == RUN and obj.connection:
...
try:
perform_pending_operations()
connection.drain_events(timeout=2.0)
except socket.timeout:
...
|
循环中主要功能是:
- 设定消息的消费函数on_message
- 使用while循环阻塞监听
- 使用connection.drain_events消费消息(在kombu的文章中有过介绍)
因为synloop会阻塞,所以需要设置step为last,确保在蓝图的最后启动。
Consumer-Blueprint实现任务调度
我们再查看celery的任务处理日志:
1
2
3
4
|
[2021-11-24 21:33:50,535: INFO/MainProcess] Received task: myapp.add[e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2]
[2021-11-24 21:33:50,535: DEBUG/MainProcess] TaskPool: Apply <function _trace_task_ret at 0x7fe6086ac280> (args:('myapp.add', 'e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2', {'lang': 'py', 'task': 'myapp.add', 'id': 'e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2', 'parent_id': None, 'argsrepr': '(16, 16)', 'kwargsrepr': '{}', 'origin': 'gen83110@192.168.5.28', 'reply_to': '63862dbb-9d82-3bdd-b7fb-03580941362a', 'correlation_id': 'e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2', 'hostname': 'celery@192.168.5.28', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [16, 16], 'kwargs': {}}, b'[[16, 16], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{})
[2021-11-24 21:33:50,536: DEBUG/MainProcess] Task accepted: myapp.add[e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2] pid:83086
[2021-11-24 21:33:50,537: INFO/ForkPoolWorker-8] Task myapp.add[e9bb4aa0-8280-443f-a5ed-3deb0a0b99c2] succeeded in 0.000271957000000711s: 32
|
可以发现celery的worker在主进程(MainProcess)中接收到task后,会派发给子进程(ForkPoolWorker-8)执行。
前面synloop的消费函数on_message实际上是Consumer(Blueprint)的create_task_handler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
def create_task_handler(self, promise=promise):
strategies = self.strategies
on_unknown_message = self.on_unknown_message
on_unknown_task = self.on_unknown_task
on_invalid_task = self.on_invalid_task
callbacks = self.on_task_message
call_soon = self.call_soon
def on_task_received(message):
type_ = message.headers['task']
...
strategy = strategies[type_]
strategy(
message, payload,
promise(call_soon, (message.ack_log_error,)),
promise(call_soon, (message.reject_log_error,)),
callbacks,
)
...
return on_task_received
|
对于消息和任务的处理,celery提供了默认的执行策略:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
# celery/worker/strategy.py:22
def default(task, app, consumer,
info=logger.info, error=logger.error, task_reserved=task_reserved,
to_system_tz=timezone.to_system, bytes=bytes,
proto1_to_proto2=proto1_to_proto2):
"""Default task execution strategy."""
...
# task event related
# (optimized to avoid calling request.send_event)
handle = consumer.on_task_request
...
Request = symbol_by_name(task.Request)
Req = create_request_cls(Request, task, consumer.pool, hostname, eventer)
revoked_tasks = consumer.controller.state.revoked
def task_message_handler(message, body, ack, reject, callbacks,
to_timestamp=to_timestamp):
....
req = Req(
message,
on_ack=ack, on_reject=reject, app=app, hostname=hostname,
eventer=eventer, task=task, connection_errors=connection_errors,
body=body, headers=headers, decoded=decoded, utc=utc,
)
...
info('Received task: %s', req)
...
handle(req)
return task_message_handler
|
default策略主要做了下面2件事:
- 创建请求对象Request
- 使用handle处理request对象
Request对象的执行是调用pool的执行方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
def execute_using_pool(self, pool, **kwargs):
"""Used by the worker to send this task to the pool."""
result = pool.apply_async(
trace_task_ret,
args=(self._type, task_id, self._request_dict, self._body,
self._content_type, self._content_encoding),
accept_callback=self.on_accepted,
timeout_callback=self.on_timeout,
callback=self.on_success,
error_callback=self.on_failure,
soft_timeout=soft_time_limit or task.soft_time_limit,
timeout=time_limit or task.time_limit,
correlation_id=task_id,
)
# cannot create weakref to None
self._apply_result = maybe(ref, result)
return result
|
这样远程的任务请求就派发给Pool进行执行, pool如何执行task同样以后再介绍。
Mingle-Step和Gossip-Step实现worker分布式协作
celery作为一款分布式任务调度框架,多个worker的协作由Mingle和Gossip两个step提供。我们先看Mingle-Step的日志:
1
2
3
4
|
[2021-12-12 13:37:56,632: DEBUG/MainProcess] | Consumer: Starting Mingle
[2021-12-12 13:37:56,632: INFO/MainProcess] mingle: searching for neighbors
[2021-12-12 13:37:57,674: INFO/MainProcess] mingle: all alone
...
|
Mingle-Step实现多个worker节点的同步通讯:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
def start(self, c):
self.sync(c)
def sync(self, c):
info('mingle: searching for neighbors')
replies = self.send_hello(c)
if replies:
info('mingle: sync with %s nodes',
len([reply for reply, value in replies.items() if value]))
[self.on_node_reply(c, nodename, reply)
for nodename, reply in replies.items() if reply]
info('mingle: sync complete')
else:
info('mingle: all alone')
|
可以看到Mingle启动后,发送hello消息,然后对其它节点的回应进行处理。hello的发送是这样的:
1
2
3
4
5
6
7
8
9
10
|
def send_hello(self, c):
inspect = c.app.control.inspect(timeout=1.0, connection=c.connection)
our_revoked = c.controller.state.revoked
replies = inspect.hello(c.hostname, our_revoked._data) or {}
replies.pop(c.hostname, None) # delete my own response
return replies
...
# celery/app/control.py
def hello(self, from_node, revoked=None):
return self._request('hello', from_node=from_node, revoked=revoked)
|
对于回应的主要处理就是对当前worker的LamportClock进行校正。:
1
2
3
4
|
def on_node_reply(self, c, nodename, reply):
...
c.app.clock.adjust(clock) if clock else c.app.clock.forward()
...
|
Gossip-Step的功能会复杂一些,不像Mingle是一次性的,它是一个持续的过程。下面是它的日志,清晰展示会持续的监听:
1
2
3
4
5
|
[2021-12-05 15:59:19,088: DEBUG/MainProcess] w2@bogon joined the party[2021-12-12 13:37:58,096: DEBUG/MainProcess] w2@bogon joined the party
[2021-12-12 14:52:49,259: INFO/MainProcess] missed heartbeat from w2@bogon
[2021-12-12 14:52:49,262: DEBUG/MainProcess] w2@bogon joined the party
...
[2021-12-12 16:10:54,112: DEBUG/MainProcess] w2@bogon left
|
Gossip是一种算法,又称流行病算法,其图示如下:
简单的说在Gossip算法中网络节点每次向自己关联的节点广播消息,直到网络中所有节点都收到消息。
celery的gossip处理消息的过程是创建自己的Consumer和定时器:
1
2
3
4
5
6
7
8
9
10
11
12
|
def get_consumers(self, channel):
# 定时处理worker激活事件
self.register_timer()
# 消息消费者
ev = self.Receiver(channel, routing_key='worker.#',
queue_ttl=self.heartbeat_interval)
return [Consumer(
channel,
queues=[ev.queue],
on_message=partial(self.on_message, ev.event_from_message),
no_ack=True
)]
|
定时器负责处理其它节点的活跃状态, 如果节点不活跃,将它标记为脏节点,进行节点丢失处理,然后移除节点:
1
2
3
4
5
6
7
8
9
|
def periodic(self):
workers = self.state.workers
dirty = set()
for worker in workers.values():
if not worker.alive:
dirty.add(worker)
self.on_node_lost(worker)
for worker in dirty:
workers.pop(worker.hostname, None)
|
消费的消息,又分成2种类型: 选举消息和其它消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
def on_message(self, prepare, message):
_type = message.delivery_info['routing_key']
try:
# 选举事件
handler = self.event_handlers[_type]
except KeyError:
pass
else:
return handler(message.payload)
# proto2: hostname in header; proto1: in body
hostname = (message.headers.get('hostname') or
message.payload['hostname'])
if hostname != self.hostname:
...
# 其它事件
_, event = prepare(message.payload)
self.update_state(event)
...
else:
self.clock.forward()
|
选举类的消息是处理选举消息和选举ack消息:
1
2
3
4
5
6
7
8
9
10
|
self.event_handlers = {
'worker.elect': self.on_elect,
'worker.elect.ack': self.on_elect_ack,
}
def on_elect(self, event):
...
def on_elect_ack(self, event):
...
|
其它事件主要是节点的上下线之类:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
self.state = c.app.events.State(
on_node_join=self.on_node_join,
on_node_leave=self.on_node_leave,
max_tasks_in_memory=1,
)
def on_node_join(self, worker):
debug('%s joined the party', worker.hostname)
self._call_handlers(self.on.node_join, worker)
def on_node_leave(self, worker):
debug('%s left', worker.hostname)
self._call_handlers(self.on.node_leave, worker)
|
小结
我们通过解析celery的两个Blueprint,了解到celery worker的启动流程包括建立和broker之间的AMQP协议连接,使用进程池/线程池/协程池方式处理任务,使用hello消息进行worker节点之间的LamportClock时钟校时,使用Gossip协议进行worker节点之间的通讯协作。在多进程情况下,每次的任务都先被主进程获取,然后分配给进程池中的子进程进行执行。
参考链接