celery是什么

Celery 通过消息机制进行通信,通常使用中间人(Broker)作为客户端和职程(Worker)调节。启动一个任务,客户端向消息队列发送一条消息,然后中间人(Broker)将消息传递给一个职程(Worker),最后由职程(Worker)进行执行中间人(Broker)分配的任务。

Celery 可以有多个职程(Worker)和中间人(Broker),用来提高Celery的高可用性以及横向扩展能力。

Celery 是用Python编写的,但是协议可以通过任何语言进行实现。

转载自官方文档https://www.celerycn.io/ru-men/celery-jian-jie

安装 celery

celery支持多种backend,我们选择redis。安装命令如下:

1
2
3
python3 -m venv env
source env/bin/activate
pip install -U "celery[redis]"

完成后,项目的包如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
amqp==2.5.1
billiard==3.6.1.0
celery==4.3.0
importlib-metadata==0.23
kombu==4.6.4
more-itertools==7.2.0
pytz==2019.2
redis==3.3.8
vine==1.3.0
zipp==0.6.0

使用"celery[redis]"会同时安装celery和redis

使用 celery 任务

1. 启动redis容器

1
docker run -it -p 6379:6379 redis:5.0.4-alpine

2. 编写 celery 项目

项目目录结构如下:

1
2
3
4
5
6
├── project0
│   ├── __init__.py
│   ├── app.py
│   ├── env
│   ├── settings.py
│   └── tasks.py

app.py 中声明 celery

1
2
3
4
5
6
7
# -*- coding:utf-8 -*-
from celery import Celery

app = Celery('demo')
app.config_from_object('settings')
if __name__ == '__main__':
    app.start()

使用 settings.py 配置backend等:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_TIMEZONE = 'Asia/Shanghai'   # 时区配置

CELERY_IMPORTS = (
     'tasks',
)

tasks.py 中定义hello任务:

1
2
3
4
5
6
from app import app

@app.task
def hello(word):
    print("hello:", time.ctime(), word)
    return "hello,{}".format(word)

3. 启动 celery worker

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
(env) ➜  project0 celery -A app worker -l info


celery@YoodeMac-mini.local v4.3.0 (rhubarb)

Darwin-18.6.0-x86_64-i386-64bit 2019-09-24 16:20:55

[config]
.> app:         demo:0x108085160
.> transport:   redis://localhost:6379/0
.> results:     redis://localhost:6379/1
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.hello

[2019-09-24 16:20:56,134: INFO/MainProcess] Connected to redis://localhost:6379/0
[2019-09-24 16:20:56,159: INFO/MainProcess] mingle: searching for neighbors
[2019-09-24 16:20:57,199: INFO/MainProcess] mingle: all alone
[2019-09-24 16:20:57,236: INFO/MainProcess] celery@YoodeMac-mini.local ready.

可见tasks.hello已经注册到任务列表[tasks]中。

4. 启动任务

worker启动后,可以直接在新终端测试任务:

1
2
3
4
5
6
7
(env) ➜  project0 python
Python 3.7.1 (v3.7.1:260ec2c36a, Oct 20 2018, 03:13:28)
[Clang 6.0 (clang-600.0.57)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import tasks
>>>
>>> t = tasks.hello.delay("world")

同时在worker中可以看到任务执行的情况:

1
2
3
4
5
...
[2019-09-24 16:24:05,726: WARNING/ForkPoolWorker-8] hello:
[2019-09-24 16:24:05,727: WARNING/ForkPoolWorker-8] Tue Sep 24 16:24:05 2019
[2019-09-24 16:24:05,728: WARNING/ForkPoolWorker-8] world
...

在终端中,可以看到task执行的状态和结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
>>> t
<AsyncResult: d8f80147-c2c8-419e-ac9b-e317e7533cf1>
>>> t.get()
'hello,world'
>>> t.info
'hello,world'
>>> t.result
'hello,world'
>>> t.task_id
'd8f80147-c2c8-419e-ac9b-e317e7533cf1'
>>> t.successful()
True
>>> t.state
'SUCCESS'

通过上述实验,可见celery是分布式的生产者-消费者模型。worker进程负责消费(执行任务),生产进程负责生产(启动任务),redis负责消息传递和结果保存。