celery是什么
Celery 通过消息机制进行通信,通常使用中间人(Broker)作为客户端和职程(Worker)调节。启动一个任务,客户端向消息队列发送一条消息,然后中间人(Broker)将消息传递给一个职程(Worker),最后由职程(Worker)进行执行中间人(Broker)分配的任务。
Celery 可以有多个职程(Worker)和中间人(Broker),用来提高Celery的高可用性以及横向扩展能力。
Celery 是用Python编写的,但是协议可以通过任何语言进行实现。
转载自官方文档https://www.celerycn.io/ru-men/celery-jian-jie
安装 celery
celery支持多种backend,我们选择redis。安装命令如下:
1
2
3
|
python3 -m venv env
source env/bin/activate
pip install -U "celery[redis]"
|
完成后,项目的包如下:
1
2
3
4
5
6
7
8
9
10
|
amqp==2.5.1
billiard==3.6.1.0
celery==4.3.0
importlib-metadata==0.23
kombu==4.6.4
more-itertools==7.2.0
pytz==2019.2
redis==3.3.8
vine==1.3.0
zipp==0.6.0
|
使用"celery[redis]"
会同时安装celery和redis
使用 celery 任务
1. 启动redis容器
1
|
docker run -it -p 6379:6379 redis:5.0.4-alpine
|
2. 编写 celery 项目
项目目录结构如下:
1
2
3
4
5
6
|
├── project0
│ ├── __init__.py
│ ├── app.py
│ ├── env
│ ├── settings.py
│ └── tasks.py
|
app.py
中声明 celery
1
2
3
4
5
6
7
|
# -*- coding:utf-8 -*-
from celery import Celery
app = Celery('demo')
app.config_from_object('settings')
if __name__ == '__main__':
app.start()
|
使用 settings.py
配置backend等:
1
2
3
4
5
6
7
8
9
10
11
|
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_TIMEZONE = 'Asia/Shanghai' # 时区配置
CELERY_IMPORTS = (
'tasks',
)
|
tasks.py
中定义hello任务:
1
2
3
4
5
6
|
from app import app
@app.task
def hello(word):
print("hello:", time.ctime(), word)
return "hello,{}".format(word)
|
3. 启动 celery worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
(env) ➜ project0 celery -A app worker -l info
celery@YoodeMac-mini.local v4.3.0 (rhubarb)
Darwin-18.6.0-x86_64-i386-64bit 2019-09-24 16:20:55
[config]
.> app: demo:0x108085160
.> transport: redis://localhost:6379/0
.> results: redis://localhost:6379/1
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.hello
[2019-09-24 16:20:56,134: INFO/MainProcess] Connected to redis://localhost:6379/0
[2019-09-24 16:20:56,159: INFO/MainProcess] mingle: searching for neighbors
[2019-09-24 16:20:57,199: INFO/MainProcess] mingle: all alone
[2019-09-24 16:20:57,236: INFO/MainProcess] celery@YoodeMac-mini.local ready.
|
可见tasks.hello已经注册到任务列表[tasks]中。
4. 启动任务
worker启动后,可以直接在新终端测试任务:
1
2
3
4
5
6
7
|
(env) ➜ project0 python
Python 3.7.1 (v3.7.1:260ec2c36a, Oct 20 2018, 03:13:28)
[Clang 6.0 (clang-600.0.57)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import tasks
>>>
>>> t = tasks.hello.delay("world")
|
同时在worker中可以看到任务执行的情况:
1
2
3
4
5
|
...
[2019-09-24 16:24:05,726: WARNING/ForkPoolWorker-8] hello:
[2019-09-24 16:24:05,727: WARNING/ForkPoolWorker-8] Tue Sep 24 16:24:05 2019
[2019-09-24 16:24:05,728: WARNING/ForkPoolWorker-8] world
...
|
在终端中,可以看到task执行的状态和结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
>>> t
<AsyncResult: d8f80147-c2c8-419e-ac9b-e317e7533cf1>
>>> t.get()
'hello,world'
>>> t.info
'hello,world'
>>> t.result
'hello,world'
>>> t.task_id
'd8f80147-c2c8-419e-ac9b-e317e7533cf1'
>>> t.successful()
True
>>> t.state
'SUCCESS'
|
通过上述实验,可见celery是分布式的生产者-消费者模型。worker进程负责消费(执行任务),生产进程负责生产(启动任务),redis负责消息传递和结果保存。