返回顶部
首页 > 资讯 > 后端开发 > Python >如何使用Python Celery动态添加定时任务
  • 748
分享到

如何使用Python Celery动态添加定时任务

2023-07-06 11:07:37 748人浏览 泡泡鱼

Python 官方文档:入门教程 => 点击学习

摘要

本篇内容介绍了“如何使用python Celery动态添加定时任务”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!一、背景实际工作中

本篇内容介绍了“如何使用python Celery动态添加定时任务”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

    一、背景

    实际工作中会有一些耗时的异步任务需要使用定时调度,比如发送邮件,拉取数据,执行定时脚本

    通过celery 实现调度主要思想是 通过引入中间人redis,启动 worker 进行任务执行 ,celery-beat进行定时任务数据存储

    二、Celery动态添加定时任务的官方文档

    celery文档:https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

    celery 自定义调度类说明:

    自定义调度器类可以在命令行中指定(--scheduler参数)

    Django-celery-beat文档 : Https://pypi.org/project/djanGo-celery-beat/

    关于django-celery-beat 插件的说明:

    此扩展使您能够将定期任务计划存储在数据库中,可以从 Django 管理界面管理周期性任务,您可以在其中创建、编辑和删除周期性任务以及它们应该运行的频率

    三、celery简单实用

    3.1 基础环境配置

    1. 安装最新版本的Django

    pip3 install django #当前我安装的版本是 3.0.6

    创建项目

    django-admin startproject typeideadjango-admin startapp blog

    安装 celery

    pip3 install django-celerypip3 install -U Celery pip3 install "celery[libRabbitMQ,Redis,auth,msgpack]" pip3 install django-celery-beat # 用于动态添加定时任务pip3 install django-celery-resultspip3 install redis
    3.2 测试使用Celery应用

    1. 创建blog目录、新建task.py

    首先在Django项目中创建一个blog文件夹,并且在blog文件夹下创建tasks.py模块, 如下:

    如何使用Python Celery动态添加定时任务

    tasks.py代码如下:

    #!/usr/bin/env Python# -*- coding: UTF-8 -*- """#File: tasks.py#Time: 2022/3/30 2:26 下午#Author: julius"""from celery import Celery # 使用redis做为brokerapp = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0') # 创建任务函数@app.taskdef my_task():    print('任务正在执行...')

    Celery第一个参数是给其设定一个名字, 第二参数我们设定一个中间人broker, 在这里我们使用Redis作为中间人。my_task函数是我们编写的一个任务函数, 通过加上装饰器app.task, 将其注册到broker的队列中。

    2. 启动redis、创建worker

    现在我们在创建一个worker, 等待处理队列中的任务。

    进入项目的根目录,执行命令: celery -A celery_tasks.tasks worker -l info

    如何使用Python Celery动态添加定时任务

    3. 调用任务

    下面来测试一下功能,创建一个任务,加入任务队列中,提供worker执行。

    进入python终端, 执行如下代码:

    $ python manage.py shell>>> from blog.tasks import my_task>>> my_task.delay()<AsyncResult: 83484dfe-f729-417b-8e51-6c7ae32a1377>

    调用一个任务函数,将会返回一个AsyncResult对象,这个对象可以用来检查任务的状态或者获得任务的返回值。

    4. 查看结果

    在worker的终端查看任务执行情况,可以看到已经收到83484dfe-f729-417b-8e51-6c7ae32a1377 任务,并打印了任务执行信息

    如何使用Python Celery动态添加定时任务

    5. 存储并查看任务执行状态

    把任务执行结果赋值给ret,然后调用result() 会产生 DisabledBackend 报错,可见没有配置后端存储的时候并不能保存任务执行的状态信息,下一节我们会讲到如何配置backend保存任务执行结果

    $ python manage.py shell>>> from blog.tasks import my_task>>> ret=my_task.delay()>>> ret.result()

    如何使用Python Celery动态添加定时任务

    四、配置backend存储任务执行结果

    如果我们想跟踪任务的状态,Celery需要将结果保存到某个地方。有几种保存的方案可选:sqlAlchemy、Django ORM、Memcached、 Redis、rpc (RabbitMQ/AMQP)。

    1. 添加backend参数

    在本例中我们使用Redis作为存储结果的方案,通过Celery的backend参数来设定任务结果存储地址。我们将tasks模块修改如下:

    from celery import Celery # 使用redis作为broker以及backendapp = Celery('celery_tasks.tasks',             broker='redis://127.0.0.1:6379/8',             backend='redis://127.0.0.1:6379/9') # 创建任务函数@app.taskdef my_task(a, b):    print("任务函数正在执行....")    return a + b

    给Celery增加了backend参数,指定redis作为结果存储,并将任务函数修改为两个参数,并且有返回值。

    2. 调用任务/查看任务执行结果

    下面再来执行调用一下这个任务看看。

    $ python manage.py shell>>> from blog.tasks import my_task>>> res=my_task.delay(10,40)>>> res.result50>>> res.failed()False

    再来看看worker的执行情况,如下:

    如何使用Python Celery动态添加定时任务

    可以看到celery任务已经执行成功了。

    但是这只是一个开始,下一步要看看如何添加定时的任务。

    四、优化Celery目录结构

    上面直接将Celery的应用创建、配置、tasks任务全部写在了一个文件,这样在后面项目越来越大,也是不方便的。下面来拆分一下,并且添加一些常用的参数。

    基本结构如下

    如何使用Python Celery动态添加定时任务

    $ vim typeidea/celery.py (Celery应用文件)

    #!/usr/bin/env python# -*- coding: UTF-8 -*- """#File: celery.py#Time: 2022/3/30 12:25 下午#Author: julius"""import osfrom celery import Celeryfrom blog import celeryconfigproject_name='typeidea'# set the default django setting module for the 'celery' programos.environ.setdefault('DJANGO_SETTINGS_MODULE','typeidea.settings')app = Celery(project_name) app.config_from_object('django.conf:settings') app.autodiscover_tasks()

    vim blog/celeryconfig.py (配置Celery的参数文件)

    #!/usr/bin/env python# -*- coding: UTF-8 -*- """#File: celeryconfig.py#Time: 2022/3/30 2:54 下午#Author: julius"""# 设置结果存储from typeidea import settingsimport os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'# 设置代理人brokerBROKER_URL = 'redis://127.0.0.1:6379/1'# celery 的启动工作数量设置CELERY_WORKER_CONCURRENCY = 20# 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。CELERYD_PREFETCH_MULTIPLIER = 20# 非常重要,有些情况下可以防止死CELERYD_FORCE_EXECV = True# celery 的 worker 执行多少个任务后进行重启操作CELERY_WORKER_MAX_TASKS_PER_CHILD = 100# 禁用所有速度限制,如果网络资源有限,不建议开足马力。CELERY_DISABLE_RATE_LIMITS = True CELERY_ENABLE_UTC = FalseCELERY_TIMEZONE = settings.TIME_ZONEDJANGO_CELERY_BEAT_TZ_AWARE = FalseCELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

    vim blog/tasks.py (tasks 任务文件)

    import timefrom blog.celery import app # 创建任务函数@app.taskdef my_task(a, b, c):    print('任务正在执行...')    print('任务1函数休眠10s')    time.sleep(10)    return a + b + c

    五、开始使用django-celery-beat调度器

    使用 django-celery-beat 动态添加定时任务  celery 4.x 版本在 django 框架中是使用 django-celery-beat 进行动态添加定时任务的。前面虽然已经安装了这个库,但是还要再说明一下。

    1. 安装 django-celery-beat

    pip3 install django-celery-beat

    2.在项目的 settings 文件配置 django-celery-beat

    INSTALLED_APPS = [    'blog',    'django_celery_beat',    ...] # Django设置时区LANGUAGE_CODE = 'zh-hans'  # 使用中国语言TIME_ZONE = 'Asia/Shanghai'  # 设置Django使用中国上海时间# 如果USE_TZ设置为True时,Django会使用系统默认设置的时区,此时的TIME_ZONE不管有没有设置都不起作用# 如果USE_TZ 设置为False,TIME_ZONE = 'Asia/Shanghai', 则使用上海的UTC时间。USE_TZ = False

    3. 创建 django-celery-beat 相关表

    执行Django数据库迁移: python manage.py migrate

    如何使用Python Celery动态添加定时任务

    4. 配置Celery使用 django-celery-beat

    配置 celery.py

    import os from celery import Celery from blog import celeryconfig # 为celery 设置环境变量os.environ.setdefault("DJANGO_SETTINGS_MODULE","typeidea.settings")# 创建celery appapp = Celery('blog')# 从单独的配置模块中加载配置app.config_from_object(celeryconfig) # 设置app自动加载任务app.autodiscover_tasks([    'blog',])

    配置 celeryconfig.py

    # 设置结果存储from typeidea import settingsimport os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'# 设置代理人brokerBROKER_URL = 'redis://127.0.0.1:6379/1'# celery 的启动工作数量设置CELERY_WORKER_CONCURRENCY = 20# 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。CELERYD_PREFETCH_MULTIPLIER = 20# 非常重要,有些情况下可以防止死锁CELERYD_FORCE_EXECV = True# celery 的 worker 执行多少个任务后进行重启操作CELERY_WORKER_MAX_TASKS_PER_CHILD = 100# 禁用所有速度限制,如果网络资源有限,不建议开足马力。CELERY_DISABLE_RATE_LIMITS = True CELERY_ENABLE_UTC = FalseCELERY_TIMEZONE = settings.TIME_ZONEDJANGO_CELERY_BEAT_TZ_AWARE = FalseCELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

    编写任务 tasks.py

    import timefrom celery import Celeryfrom blog.celery import app # 使用redis做为broker# app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0',backend='redis://127.0.0.1:6379/1') # 创建任务函数@app.taskdef my_task(a, b, c):    print('任务正在执行...')    print('任务1函数休眠10s')    time.sleep(10)    return a + b + c @app.taskdef my_task2():    print("任务2函数正在执行....")    print('任务2函数休眠10s')    time.sleep(10)

    5. 启动定时任务work

    启动定时任务首先需要有一个work执行异步任务,然后再启动一个定时器触发任务。

    启动任务 work

    $ celery -A blog worker -l info

    如何使用Python Celery动态添加定时任务

    启动定时器触发 beat

    celery -A blog beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

    如何使用Python Celery动态添加定时任务

    六、具体操作演练

    6.1 创建基于间隔时间的周期性任务

    1. 初始化周期间隔对象interval 对象

    >>> from django_celery_beat.models import PeriodicTask, IntervalSchedule>>> schedule, created = IntervalSchedule.objects.get_or_create( ...       every=10, ...       period=IntervalSchedule.SECONDS, ...  )>>> IntervalSchedule.objects.all()<QuerySet [<IntervalSchedule: every 10 seconds>]>

    创建一个无参数的周期性间隔任务

    >>>PeriodicTask.objects.create(interval=schedule,name='my_task2',task='blog.tasks.my_task2',)<PeriodicTask: my_task2: every 10 seconds>

    beat 调度服务日志显示如下:

    如何使用Python Celery动态添加定时任务

    worker 服务日志显示如下:

    如何使用Python Celery动态添加定时任务

    创建一个带参数的周期性间隔任务

    >>> PeriodicTask.objects.create(interval=schedule,name='my_task',task='blog.tasks.my_task',args=JSON.dumps([10,20,30]))<PeriodicTask: my_task: every 10 seconds>

    beat 调度服务日志结果:

    如何使用Python Celery动态添加定时任务

    worker 服务日志结果:

    如何使用Python Celery动态添加定时任务

    4.如何高并发执行任务

    需要并行执行任务的时候,就需要设置多个worker来执行任务。

    6.2 创建一个不带参数的周期性间隔任务

    初始化 crontab 的调度对象

    >>> import pytz>>> schedule, _ = CrontabSchedule.objects.get_or_create(... minute='*',... hour='*',... day_of_week='*',... day_of_month='*',... timezone=pytz.timezone('Asia/Shanghai')... )

    创建不带参数的定时任务

    PeriodicTask.objects.create(crontab=schedule,name='my_task2_crontab',task='blog.tasks.my_task2',)

    beat 调度服务执行结果

    如何使用Python Celery动态添加定时任务

    worker 执行服务结果

    如何使用Python Celery动态添加定时任务

    6.3 周期性任务的查询、删除操作

    1. 周期性任务的查询

    >>> PeriodicTask.objects.all()<ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) Asia/Shanghai>, <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>]>>>> PeriodicTask.objects.get(name='my_task2_crontab')<PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>>>> for task in PeriodicTask.objects.all():...     print(task.id)... 113>>> PeriodicTask.objects.get(id=13)<PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>>>> PeriodicTask.objects.get(name='my_task2_crontab')<PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>

    控制台实际操作记录

    如何使用Python Celery动态添加定时任务

    2.周期性任务的暂停/启动

    2.1 设置my_taks2_crontab 暂停任务

    >>> my_task2_crontab = PeriodicTask.objects.get(id=13)>>> my_task2_crontab.enabledTrue>>> my_task2_crontab.enabled=False>>> my_task2_crontab.save()

    查看worker输出:

    如何使用Python Celery动态添加定时任务

    可以看到worker从19:31以后已经没有输出了,说明已经成功吧my_task2_crontab 任务暂停

    2.2 设置my_task2_crontab 开启任务

    把任务的 enabled 为 True 即可:

    >>> my_task2_crontab.enabledFalse>>> my_task2_crontab.enabled=True>>> my_task2_crontab.save()

    查看worker输出:

    如何使用Python Celery动态添加定时任务

    可以看到worker从19:36开始有输出,说明已把my_task2_crontab 任务重新启动

    3. 周期性任务的删除

    获取到指定的任务后调用delete(),再次查询指定任务会发现已经不存在了

    PeriodicTask.objects.get(name='my_task2_crontab').delete()>>> PeriodicTask.objects.get(name='my_task2_crontab')Traceback (most recent call last):  File "<console>", line 1, in <module>  File "/Users/julius/PyCharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method    return getattr(self.get_queryset(), name)(*args, **kwargs)  File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get    raise self.model.DoesNotExist(django_celery_beat.models.PeriodicTask.DoesNotExist: PeriodicTask matching query does not exist.

    “如何使用Python Celery动态添加定时任务”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!

    --结束END--

    本文标题: 如何使用Python Celery动态添加定时任务

    本文链接: https://lsjlt.com/news/358014.html(转载时请注明来源链接)

    有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

    猜你喜欢
    • 如何使用Python Celery动态添加定时任务
      本篇内容介绍了“如何使用Python Celery动态添加定时任务”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!一、背景实际工作中...
      99+
      2023-07-06
    • 怎么用Python Celery动态添加定时任务
      一、背景实际工作中会有一些耗时的异步任务需要使用定时调度,比如发送邮件,拉取数据,执行定时脚本通过celery 实现调度主要思想是 通过引入中间人redis,启动 worker 进行任务执行 ,celery-beat进行定时任务数据存储二、...
      99+
      2023-05-14
      Python Celery
    • celery动态添加任务
      celery是一个基于Python的分布式调度系统,文档在这 ,最近有个需求,想要动态的添加任务而不用重启celery服务,找了一圈没找到什么好办法(也有可能是文档没看仔细),所以只能自己实现囉 为celery动态添加任务,首先我想到的是...
      99+
      2023-01-31
      动态 celery
    • Python Celery动态添加定时任务生产实践指南
      目录一、背景二、Celery动态添加定时任务的官方文档三、celery简单实用3.1 基础环境配置3.2 测试使用Celery应用四、配置backend存储任务执行结果 四...
      99+
      2024-04-02
    • 关于Django使用 django-celery-beat动态添加定时任务的方法
      版本信息 # 插件安装 Django==2.2.2 django-celery-beat==2.1.0 django-redis==4.8.0 mysqlclient==2.0...
      99+
      2024-04-02
    • 使用celery怎么动态设置定时任务
      使用celery怎么动态设置定时任务?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。celery的beat运行过程。上图是beat的主要组成结构,beat中包含了...
      99+
      2023-06-08
    • celery实现动态设置定时任务
      本文实例为大家分享了celery动态设置定时任务的具体代码,供大家参考,具体内容如下 首先celery是一种异步任务队列,如果还不熟悉这个开源软件的请先看看官方文档,快速入门。 这里...
      99+
      2024-04-02
    • django怎么动态添加定时任务
      在Django中,可以使用celery来实现动态添加定时任务。首先,需要安装Celery:```shellpip install c...
      99+
      2023-09-26
      django
    • SpringBoot 实现动态添加定时任务功能
      目录代码结构1. 配置类2. 定时任务类型枚举3. 实际执行任务实现类4. 定时任务包装器5. 任务注册器 (核心)6. 使用最后最近的需求有一个自动发布的功能, 需要做到每次提交都...
      99+
      2024-04-02
    • Spring动态添加定时任务的实现思路
      一、背景 在工作中,有些时候我们有些定时任务的执行可能是需要动态修改的,比如: 生成报表,有些项目配置每天的8点生成,有些项目配置每天的10点生成,像这种动态的任务执行时间,在不考虑...
      99+
      2024-04-02
    • Spring动态添加定时任务的实现方法
      本篇内容主要讲解“Spring动态添加定时任务的实现方法”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Spring动态添加定时任务的实现方法”吧!一、背景在工作中,有些时候我们有些定时任务的执行...
      99+
      2023-06-20
    • 怎么用SpringBoot实现动态添加定时任务功能
      这篇“怎么用SpringBoot实现动态添加定时任务功能”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“怎么用SpringBo...
      99+
      2023-06-29
    • linux如何利用crontab添加定时任务详解
      前言 linux 系统是由 crond这个系统服务来控制的。Linux 系统上面原本就有非常多的计划性工作,因此这个系统服务是默认启动的。 crontab命令用于设置周期性被执行的指令。该命令从标准输入设备读取指令,并将...
      99+
      2022-06-04
      linux定时任务crontab linux查看crontab任务 linux查看定时任务
    • SpringBoot动态定时任务如何实现
      这篇文章主要介绍了SpringBoot动态定时任务如何实现的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringBoot动态定时任务如何实现文章都会有所收获,下面我们一起来看看吧。 执行定时任务的...
      99+
      2023-07-05
    • SpringBoot如何设置动态定时任务
      这篇文章主要介绍了SpringBoot如何设置动态定时任务的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringBoot如何设置动态定时任务文章都会有所收获,下面我们一起来看看吧。之前写过文章记录怎么在Sp...
      99+
      2023-07-02
    • python定时任务apscheduler如何使用
      这篇文章主要介绍了python定时任务apscheduler如何使用的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇python定时任务apscheduler如何使用文章都会有所收获,下面我们一起来看看吧。安装p...
      99+
      2023-06-29
    • 如何使用spring-task定时任务动态配置修改执行时间
      小编给大家分享一下如何使用spring-task定时任务动态配置修改执行时间,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!spring-task定时任务动态配置修改执行时间因项目需要,几个定时任务需要人为动态设置执行时间,...
      99+
      2023-06-25
    • 如何实现SpringBoot+Quartz+Maven+MySql动态定时任务
      下面一起来了解下如何实现SpringBoot+Quartz+Maven+MySql动态定时任务,相信大家看完肯定会受益匪浅,文字在精不在多,希望如何实现SpringBoot+Quartz+Maven+MyS...
      99+
      2024-04-02
    • 如何通过RabbitMq实现动态定时任务详解
      目录一、需求背景二、方案思考(1)需求大致分析(2)可尝试的方案三、通过RabbitMQ实现延时任务并间接实现动态定时任务。(1)通过死信的方式实现延时信息消费(2)通过MQ延时插件...
      99+
      2024-04-02
    • 如何使用 gocql 动态添加查询参数?
      对于一个Golang开发者来说,牢固扎实的基础是十分重要的,编程网就来带大家一点点的掌握基础知识点。今天本篇文章带大家了解《如何使用 gocql 动态添加查询参数?》,主要介绍了,希望对大家的知识积...
      99+
      2024-04-05
    软考高级职称资格查询
    编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
    • 官方手机版

    • 微信公众号

    • 商务合作