一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

腳本之家,腳本語言編程技術及教程分享平臺!
分類導航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服務器之家 - 腳本之家 - Python - Python使用signal定時結束AsyncIOScheduler任務的問題

Python使用signal定時結束AsyncIOScheduler任務的問題

2021-12-15 10:32臨淵(v:superz-han) Python

這篇文章主要介紹了Python使用signal定時結束AsyncIOScheduler任務,在使用aiohttp結合apscheduler的AsyncIOScheduler模擬定點并發的時候遇到兩個問題,針對每個問題給大家詳細介紹,需要的朋友可以參考下

在使用aiohttp結合apscheduler的AsyncIOScheduler模擬定點并發的時候遇到兩個問題

  1. 在調度器scheduler.start()后,程序直接退出(在Jupiter中任務可以正常啟動)
  2. 如何在指定時間調用scheduler.shutdown()? (因為程序直接退出了)

原調試代碼如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from datetime import datetime, timedelta
 
import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
async def get(session):
    url = 'https://httpbin.org/get?a=1'
    async with session.get(url) as res:
        print('get', res.status)
        return await res.text()
 
async def post(session):
    url = 'https://httpbin.org/post?b=2'
    async with session.post(url) as res:
        print('post', res.status)
        return await res.text()
async def main():
    async with aiohttp.ClientSession() as session:
        await get(session)
        await post(session)
 
if __name__ == '__main__':
    jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
    scheduler = AsyncIOScheduler(jobstores=jobstores)
    for i in range(10):  # 添加10個任務
        job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
    scheduler.start()

Google后發現AsyncIOScheduler的使用需要在scheduler啟動后,需要自己調用asyncio.get_event_loop().run_forever()來啟動協程任務。
但是一旦run_forever()則就會阻塞至死。除非有KeyboardInterrupt, SystemExit等異常或者強殺來停止其運行。
此時想到使用Python的signal來定時發送信號,修改后程序如下,可以正常延遲停止(感覺有點像模擬Go的defer)。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# -*- coding: utf-8 -*-
"""
@Time : 2021/7/23
@Auth : hanzhichao
@Desc:
"""
from datetime import datetime, timedelta
import signal
import asyncio
 
import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
 
async def get(session):
    url = 'https://httpbin.org/get?a=1'
    async with session.get(url) as res:
        print('get', res.status)
        return await res.text()
 
async def post(session):
    url = 'https://httpbin.org/post?b=2'
    async with session.post(url) as res:
        print('post', res.status)
        return await res.text()
 
async def main():
    async with aiohttp.ClientSession() as session:
        await get(session)
        await post(session)
 
if __name__ == '__main__':
    jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
    scheduler = AsyncIOScheduler(jobstores=jobstores)
    for i in range(10):  # 添加10個任務
        job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
    scheduler.start()
    signal.alarm(20# 20秒后終止程序
    asyncio.get_event_loop().run_forever()  # 永遠運行

到此這篇關于Python使用signal定時結束AsyncIOScheduler任務的文章就介紹到這了,更多相關Python定時結束AsyncIOScheduler任務內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!

原文鏈接:https://www.cnblogs.com/superhin/p/15060818.html

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 狠狠色狠狠色综合系列 | 亚洲国产一区二区三区a毛片 | 久草热8精品视频在线观看 久草草在线视视频 | 亚洲国产一区二区三区a毛片 | 男人天堂网页 | 精品午夜寂寞黄网站在线 | 国产自拍资源 | 成人亚洲欧美综合 | 欧美日韩中文字幕一区二区高清 | 国产精品亚洲专区在线播放 | 日韩欧美视频二区 | 国产精品成人在线播放 | 特级毛片全部免费播放器 | se01在线看片 | 免费理伦片在线观看全网站 | 男人j放进女人的p免费看视频 | 国产视频在线一区 | 青草热久精品视频在线观看 | 天天av天天翘天天综合网 | 91精品国产免费久久国语蜜臀 | 亚洲AV精品无码喷水直播间 | 精品夜夜澡人妻无码AV蜜桃 | 喷奶水榨乳ova动漫无修 | 97se狠狠狠狠狼亚洲综合网 | 亚洲精品国产自在现线最新 | 调教肉文 | 日韩大片在线播放 | 亚洲精品资源 | 国产免费午夜高清 | 国内自拍视频在线观看 | 国语自产拍在线观看7m | 午夜国产| 天天澡夜夜澡狠狠澡 | 人禽l交视频在线播放 视频 | 爽爽窝窝午夜精品一区二区 | 特黄视频免费看 | 免费看又黄又爽又猛的视频软件- | 大胆私拍模特国模377 | 日韩激情视频在线观看 | 含羞草国产亚洲精品岁国产精品 | 福利三区 |