添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

celery执行异步任务

PS:特别需要注意创建工程的时候,实例化Celery所在的文件文件名称必须为celery.py,且包下有__init__.py文件,在实例化celery的时候必须将include参数添加正确

study_celery				# 工程文件夹
	- proj					# proj包
		- __init__.py		# 必须要含有__init__.py
		- celery.py			# 名称必须为celery.py,在此处编写定时任务代码
		- tasks.py			# 任务文件,可以有多个
	- add_task1.py			# 添加任务到backend中,执行异步任务
	- add_task2.py			# 添加任务到backend中,执行延迟任务

proj/celery.py

from celery import Celery
from datetime import timedelta
from celery.schedules import crontab
broker = 'redis://:123456@10.0.0.100:6379/1'
backend = 'redis://:123456@10.0.0.100:6379/2'
app = Celery(__name__, broker=broker, backend=backend, include=['proj.tasks'])
app.conf.beat_schedule = {
    'low-task': {
        'task': 'proj.tasks.add',  # celery_task下的task1的add方法
        'schedule': timedelta(seconds=15),  # 每15秒执行1次
        # 'schedule': crontab(hour=8, day_of_week=2),  # 每周二早八点
        'args': (300, 150),  # add所需要的参数
  1. 创建celery实例即app对象
  2. 执行存放任务broker地址和存放结果地址backend
  3. include参数是程序启动时倒入的模块列表,可以该处添加任务模块,便于worker能够对应相应的任务,列表中可以添加多个。
  4. 添加定时任务 beat

proj/tasks.py

from .celery import app
@app.task
def add(x, y):
    return x + y
@app.task
def mul(x, y):
    return x * y 
  1. 从celery中导入app对象
  2. 创建两个任务函数通过@app.task进行装饰

add_task1.py

from proj.tasks import add, mul
result = add.delay(1, 2)
result1 = mul.delay(8, 2)
print(result)
print(result1)

add_task2.py

from datetime import datetime, timedelta
from proj.tasks import add, mul
# 当前的utc时间向后延迟10s,默认使用utc时间
eta = datetime.utcnow() + timedelta(seconds=10)
# 10s后执行add和mul
ret1 = add.apply_async(args=(2, 3), eta=eta)
ret2 = mul.apply_async(args=(4, 5), eta=eta)
print(ret1)
print(ret2)
# 需要cd到proj所在的目录下
# linux启动
celery -A proj worker -l info
# windows启动
celery -A proj worker -l info -P eventlet
---console---
User information: uid=0 euid=0 gid=0 egid=0
  uid=uid, euid=euid, gid=gid, egid=egid,
 -------------- celery@6ae5fd398c10 v5.2.3 (dawn-chorus)
--- ***** ----- 
-- ******* ---- Linux-3.10.0-1160.59.1.el7.x86_64-x86_64-with-debian-9.11 2022-03-31 08:55:46
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         proj.celery:0x7f357cd743d0
- ** ---------- .> transport:   redis://:**@127.0.0.1:6379/1
- ** ---------- .> results:     redis://:**@127.0.0.1:6379/2
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
[tasks]
  . proj.tasks.add
  . proj.tasks.mul
[2022-03-31 08:55:46,902: INFO/MainProcess] Connected to redis://:**@127.0.0.1:6379/1
[2022-03-31 08:55:46,904: INFO/MainProcess] mingle: searching for neighbors
[2022-03-31 08:55:47,914: INFO/MainProcess] mingle: all alone
[2022-03-31 08:55:47,925: INFO/MainProcess] celery@6ae5fd398c10 ready.

执行异步任务

# 需要cd到add_task1所在的目录下
python add_task.py
---console---
d6e5e56d-912f-40e6-b1ab-f08b1d59da98
af37bc9b-1cf9-43fe-87c4-fbd3f79d8066
---worker console---
[2022-03-31 08:58:09,033: INFO/MainProcess] Task proj.tasks.add[d6e5e56d-912f-40e6-b1ab-f08b1d59da98] received
[2022-03-31 08:58:09,048: INFO/MainProcess] Task proj.tasks.mul[af37bc9b-1cf9-43fe-87c4-fbd3f79d8066] received
[2022-03-31 08:58:09,061: INFO/ForkPoolWorker-1] Task proj.tasks.add[d6e5e56d-912f-40e6-b1ab-f08b1d59da98] succeeded in 0.02734477199999219s: 3
[2022-03-31 08:58:09,064: INFO/ForkPoolWorker-1] Task proj.tasks.mul[af37bc9b-1cf9-43fe-87c4-fbd3f79d8066] succeeded in 0.0013154429999531203s: 16

添加任务和worker先启动哪一个都可以,没有影响,当worker启动后,任务都会执行

执行延迟任务

# # 需要cd到add_task2.py所在的目录下
python add_task2.py
---console---
13e3a2ee-0bb5-4428-8a78-1fe29b499062
f68b434b-3957-466d-8294-5af66dfda468

此时worker所在的bash窗口等待片刻后输出

[2022-04-06 09:55:10,934: INFO/MainProcess] Task proj.tasks.add[13e3a2ee-0bb5-4428-8a78-1fe29b499062] received
[2022-04-06 09:55:10,956: INFO/MainProcess] Task proj.tasks.mul[f68b434b-3957-466d-8294-5af66dfda468] received
[2022-04-06 09:55:20,954: INFO/ForkPoolWorker-1] Task proj.tasks.mul[f68b434b-3957-466d-8294-5af66dfda468] succeeded in 0.005011890999867319s: 20
[2022-04-06 09:55:20,957: INFO/ForkPoolWorker-1] Task proj.tasks.add[13e3a2ee-0bb5-4428-8a78-1fe29b499062] succeeded in 0.0019722010001714807s: 5

执行定时任务

# 代码编写在proj/celery.py中
# worker启动后,新打开一个bash窗口输入
celery -A proj beat -l info 
---console---
celery beat v5.2.3 (dawn-chorus) is starting.
__    -    ... __   -        _
LocalTime -> 2022-04-06 09:58:22
Configuration ->
    . broker -> redis://:**@127.0.0.1:6379/1
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 minutes (300s)
[2022-04-06 09:58:22,686: INFO/MainProcess] beat: Starting...
[2022-04-06 09:58:22,696: INFO/MainProcess] Scheduler: Sending due task low-task (proj.tasks.add)
[2022-04-06 09:58:37,699: INFO/MainProcess] Scheduler: Sending due task low-task (proj.tasks.add)

此时看到,beat所在的bash窗口每隔15s调动一次任务
打开worker所在的任务窗口如下

[2022-04-06 09:58:22,704: INFO/MainProcess] Task proj.tasks.add[c8b26f6a-f29f-43cf-9555-cd8746cbfb21] received
[2022-04-06 09:58:22,706: INFO/ForkPoolWorker-1] Task proj.tasks.add[c8b26f6a-f29f-43cf-9555-cd8746cbfb21] succeeded in 0.0015727149998383538s: 450
[2022-04-06 09:58:37,704: INFO/MainProcess] Task proj.tasks.add[82a406dc-c310-4bbb-ba83-a7d73bb2ae5e] received
[2022-04-06 09:58:37,707: INFO/ForkPoolWorker-1] Task proj.tasks.add[82a406dc-c310-4bbb-ba83-a7d73bb2ae5e] succeeded in 0.0025173189997076406s: 450
				
所有演示均基于Django2.0 celery是一个基于python开发的简单、灵活且可靠的分布式任务队列框架,支持使用任务队列的方式在分布式的机器/进程/线程上执行任务调度。采用典型的生产者-消费者模型,主要由三部分组成: 消息队列broker:broker实际上就是一个MQ队列服务,可以使用redis、rabbitmq等作为broker 处理任务的消费者workers:broker通知worker队列中有任务,worker去队列中取出任务执行,每一个worker就是一个进程 存储结果的backend:执行结果存储在backend,默认也会存储在broker使用的MQ队列服务中,
这篇文章主要介绍了python基于celery实现异步任务周期任务定时任务,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 hello, 小伙伴们, 好久不更新了,这一次带来的是celerypython中的应用以及设置异步任务周期任务和定时任务的步骤,希望能给入坑的你带来些许帮助. 首先是对celery的介绍,Celery其实是一个专注于实时处理...
Celery 使用介绍 Celery 简单来说就是一个分布式消息队列。简单、灵活且可靠,能够处理大量消息,它是一个专注于实时处理的任务队列,同时也支持异步任务调度。Celery 不仅可以单机运行,也能够同时在多台机器上运行,甚至可以跨数据中心。 Celery 中比较关键的概念: worker: worker 是一个独立的进程,任务执行单元,它持续监视队列中是否有需要处理的任务; broker: ...
app = Celery(broker=broker, backend=backend, include=include) 绑定存放任务的仓库,绑定存放任务结构的仓库,绑定任务函数文件的路径 创建Celery对象 启动celery服务 celery服务启动指令: 1. 非windows celery worker -A celery_task -l info 2. wi...