返回顶部
首页 > 资讯 > 后端开发 > Python >Python使用signal定时结束AsyncIOScheduler任务的问题
  • 418
分享到

Python使用signal定时结束AsyncIOScheduler任务的问题

Python 官方文档:入门教程 => 点击学习

摘要

在使用aioHttp结合apscheduler的AsynciOScheduler模拟定点并发的时候遇到两个问题 在调度器scheduler.start()后,程序直接退出(在Jupiter中任务可以正常启

在使用aioHttp结合apscheduler的AsynciOScheduler模拟定点并发的时候遇到两个问题

  1. 在调度器scheduler.start()后,程序直接退出(在Jupiter中任务可以正常启动)
  2. 如何在指定时间调用scheduler.shutdown()? (因为程序直接退出了)

原调试代码如下:


from datetime import datetime, timedelta

import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
async def get(session):
    url = 'https://httpbin.org/get?a=1'
    async with session.get(url) as res:
        print('get', res.status)
        return await res.text()

async def post(session):
    url = 'https://httpbin.org/post?b=2'
    async with session.post(url) as res:
        print('post', res.status)
        return await res.text()
async def main():
    async with aiohttp.ClientSession() as session:
        await get(session)
        await post(session)

if __name__ == '__main__':
    jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
    scheduler = AsyncIOScheduler(jobstores=jobstores)
    for i in range(10):  # 添加10个任务
        job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
    scheduler.start()

Google后发现AsyncIOScheduler的使用需要在scheduler启动后,需要自己调用asyncio.get_event_loop().run_forever()来启动协程任务。
但是一旦run_forever()则就会阻塞至死。除非有KeyboardInterrupt, SystemExit等异常或者强杀来停止其运行。
此时想到使用python的signal来定时发送信号,修改后程序如下,可以正常延迟停止(感觉有点像模拟Go的defer)。


# -*- coding: utf-8 -*-
"""
@Time : 2021/7/23
@Auth : hanzhichao
@Desc:
"""
from datetime import datetime, timedelta
import signal
import asyncio

import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

async def get(session):
    url = 'https://httpbin.org/get?a=1'
    async with session.get(url) as res:
        print('get', res.status)
        return await res.text()

async def post(session):
    url = 'https://httpbin.org/post?b=2'
    async with session.post(url) as res:
        print('post', res.status)
        return await res.text()

async def main():
    async with aiohttp.ClientSession() as session:
        await get(session)
        await post(session)

if __name__ == '__main__':
    jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
    scheduler = AsyncIOScheduler(jobstores=jobstores)
    for i in range(10):  # 添加10个任务
        job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
    scheduler.start()
    signal.alarm(20)  # 20秒后终止程序
    asyncio.get_event_loop().run_forever()  # 永远运行

到此这篇关于Python使用signal定时结束AsyncIOScheduler任务的文章就介绍到这了,更多相关Python定时结束AsyncIOScheduler任务内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: Python使用signal定时结束AsyncIOScheduler任务的问题

本文链接: https://lsjlt.com/news/11165.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作