irpas技术客

【Celery】Celery的简易部署和应用_大佬橙1215_celery部署

网络 2007

Celery的简易应用


文章目录 Celery的简易应用前言一、创建一个Celery App二、部署Celery1. 创建所需的用户和文件夹2. 创建 celeryd.config 配置文件3. 创建 .service 服务脚本(1) 创建路由任务 celeryd.service(2) 创建周期任务 celery_beat.service(3) 启动 三. 任务(1) 路由任务(routing task)(2) 周期任务(periodic task)


前言

?? Celery 是一个简单、灵活、可靠的分布式系统,可以处理大量的消息,同时提供维护系统所需的工具。它是一个专注于处理 实时任务 和 定时任务 的的任务队列。 ?? 本篇文章使用 Ubuntu20.04 系统 ,Python3.8 版本。使用的是 Celery4.4.7 版本,用 Redis 作为消息队列存储任务(可以用更好的RabbitMQ)。 ?? 主要目的是快速部署和简易应用。更多操作:传送门 - Celery官方文档


一、创建一个Celery App

假设在下面目录里创建一个 Celery App,目录结构如下

目录 /home/ubuntu/app/Celery

# app.py from celery import Celery # include代表任务文件 app = Celery(broker='redis://localhost:6379/0', backend='redis://localhost:6379/1', # 导入所执行的任务文件 include=['routing_task.tasks', 'periodic_task.tasks']) # 配置时区,定时任务要用 app.conf.update( result_expires=3600, timezone='Asia/Shanghai' ) # 定时任务调度 app.conf.beat_schedule = { # 任务名 'this is periodic_tasks': { # 任务调度位置 'task': 'periodic_tasks.tasks.abbc', # 调度时间 'schedule': timedelta(seconds=5), # 'schedule': crontab(minute=53, hour=14), # 任务参数 'args': (16, 16), }, } # routing_task/tasks.py from app import app @app.task() def add(x, y): return x + y if __name__ == "__main__": print(add.delay(1, 3).result)

此时,我们可以在当前目录下,运行:

celery -A app worker --loglevel=INFO

就可以正常运行Celery了。 执行 python tasks.py,将add任务推入redis消息中间件后Celery执行它并返回处理结果。


二、部署Celery

使用 .service 守护进程来运行 Celery

1. 创建所需的用户和文件夹 # 创建 用户、组 sudo useradd celery sudo groupadd celery sudo gpasswd -a celery celery # 创建日志目录 sudo mkdir /var/log/celery sudo chmod 0755 /var/log/celery/ sudo chown celery:celery /var/run/celery/ # 创建pid文件夹 sudo mkdir /run/celery sudo chmod 0755 /run/celery/ 2. 创建 celeryd.config 配置文件

配置文件路径:/etc/celery/celeryd.conf,没有就创建一个

# 节点的启动名称 CELERYD_NODES="work" # 或者你可以写三个: #CELERYD_NODES="w1 w2 w3" # celery的绝对路径,还是别用相对路径了,避免出错。 CELERY_BIN="/usr/local/bin/celery" #CELERY_BIN="/virtualenvs/def/bin/celery" # 这里写Celery的应用实例 CELERY_APP="app" # 或者你想写一个完整版 #CELERY_APP="Celery.app:app" # 启动多个 CELERYD_MULTI="multi" # worker的参数 , 启动8个 CELERYD_OPTS="--time-limit=300 --concurrency=8" # celerybeat调度器路径 CELERYBEAT_OPTS="--schedule=/var/run/celery/celerybeat-schedule" # 下面就是celery的pid路径,log日志了。 # - %n will be replaced with the first part of the nodename. # - %I will be replaced with the current child process index # and is important when using the prefork pool to avoid race conditions. CELERYD_PID_FILE="/var/run/celery/%n.pid" CELERYD_LOG_FILE="/var/log/celery/%n%I.log" CELERYD_LOG_LEVEL="INFO" # 设置celerybeat的pid和日志文件路径 CELERYBEAT_PID_FILE="/var/run/celery/beat.pid" CELERYBEAT_LOG_FILE="/var/log/celery/beat.log" 3. 创建 .service 服务脚本 (1) 创建路由任务 celeryd.service # 路径 vim /etc/systemd/system/celeryd.service [Unit] Description=Celery Service After=network.target [Service] Type=forking User=celery Group=celery # 配置文件的路径 EnvironmentFile=/etc/celery/celeryd.config # Celery的工作路径 WorkingDirectory=/home/ubuntu/app/Celery # 下面这写会被config里的参数自动替换掉 ExecStart=/bin/sh -c '${CELERY_BIN} -A $CELERY_APP multi start $CELERYD_NODES \ --pidfile=${CELERYD_PID_FILE} --logfile=${CELERYD_LOG_FILE} \ --loglevel="${CELERYD_LOG_LEVEL}" $CELERYD_OPTS' ExecStop=/bin/sh -c '${CELERY_BIN} multi stopwait $CELERYD_NODES \ --pidfile=${CELERYD_PID_FILE} --logfile=${CELERYD_LOG_FILE} \ --loglevel="${CELERYD_LOG_LEVEL}"' ExecReload=/bin/sh -c '${CELERY_BIN} -A $CELERY_APP multi restart $CELERYD_NODES \ --pidfile=${CELERYD_PID_FILE} --logfile=${CELERYD_LOG_FILE} \ --loglevel="${CELERYD_LOG_LEVEL}" $CELERYD_OPTS' Restart=always [Install] WantedBy=multi-user.target (2) 创建周期任务 celery_beat.service [Unit] Description=Celery Beat Service After=network.target [Service] Type=simple User=celery Group=celery EnvironmentFile=/etc/celery/celeryd.config WorkingDirectory=/home/ubuntu/app/Celery ExecStart=/bin/sh -c '${CELERY_BIN} -A ${CELERY_APP} beat \ --pidfile=${CELERYBEAT_PID_FILE} \ --logfile=${CELERYBEAT_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} $CELERYBEAT_OPTS' Restart=always [Install] WantedBy=multi-user.target (3) 启动 # 重载所有服务文件 sudo systemctl daemon-reload # 启动 celeryd.service sudo systemctl start celeryd.service # 启动 celery_beat.service sudo systemctl start celery_beat.service 三. 任务 (1) 路由任务(routing task) # routing_task/tasks.py from app import app @app.task() def add(x, y): return f'this is routing task: {x - y}'

在所需要的场景下,通过

res = add.delay(1, 3)

来应用。

(2) 周期任务(periodic task)

在 app.conf.beat_schedule 中注册后, 重启 celery_beat 和 celeryd 两个服务后可以实现周期任务的添加应用。 注意:celery_beat 只当作任务调度,不执行任务。具体执行需要 celeryd 来完成,所以添加/修改任务后需要将两个 service 都重启才可以。 注册

# app.py app.conf.beat_schedule = { # 任务名 'this is periodic_tasks': { # 任务调度位置 'task': 'periodic_tasks.tasks.abbc', # 调度时间 'schedule': timedelta(seconds=5), # 'schedule': crontab(minute=53, hour=14), # 任务参数 'args': (16, 16), }, .... } # periodic_task/tasks.py from app import app @app.task() def abc(x, y): return f'this is periodic task: {x - y}'


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #celery部署 #celery #它是一个专注于处理 #实时任务 # #定时任务