python异步实现定时任务和周期任务的方法

作者:狡猾的皮球 时间:2021-11-11 19:01:45 

一. 如何调用


def f1(arg1, arg2):
 print('f1', arg1, arg2)

def f2(arg1):
 print('f2', arg1)

def f3():
 print('f3')

def f4():
 print('周期任务', int(time.time()))

timer = TaskTimer()
# 把任务加入任务队列
timer.join_task(f1, [1, 2], timing=15.5) # 每天15:30执行
timer.join_task(f2, [3], timing=14) # 每天14:00执行
timer.join_task(f3, [], timing=15) # 每天15:00执行
timer.join_task(f4, [], interval=10) # 每10秒执行1次
# 开始执行(此时才会创建线程)
timer.start()

f1~f4是我们需要定时执行的函数。

首先创建TaskTimer对象(TaskTimer的代码在下面)。调用join_task函数,把需要执行的函数加入到任务队列。最后调用start,任务就开始执行了。

join_task参数:

fun:需要执行的函数

arg:fun的参数,如果没有就传一个空列表

interval:如果有此参数,说明任务是周期任务,单位为秒(注意interval最少5秒)

timing:如果有此参数,说明任务是定时任务,单位为时

注意:interval和timing只能选填1个

二. 源码


import datetime
import time
from threading import Thread
from time import sleep

class TaskTimer:
 __instance = None

def __new__(cls, *args, **kwargs):
   """
   单例模式
   """
   if not cls.__instance:
     cls.__instance = object.__new__(cls)
   return cls.__instance

def __init__(self):
   if not hasattr(self, 'task_queue'):
     setattr(self, 'task_queue', [])

if not hasattr(self, 'is_running'):
     setattr(self, 'is_running', False)

def write_log(self, level, msg):
   cur_time = datetime.datetime.now()
   with open('./task.log', mode='a+', encoding='utf8') as file:
     s = "[" + str(cur_time) + "][" + level + "]  " + msg
     print(s)
     file.write(s + "\n")

def work(self):

"""
   处理任务队列
   """
   while True:
     for task in self.task_queue:
       if task['interval']:
         self.cycle_task(task)
       elif task['timing']:
         self.timing_task(task)

sleep(5)

def cycle_task(self, task):
   """
   周期任务
   """
   if task['next_sec'] <= int(time.time()):
     try:
       task['fun'](*task['arg'])
       self.write_log("正常", "周期任务:" + task['fun'].__name__ + " 已执行")
     except Exception as e:
       self.write_log("异常", "周期任务:" + task['fun'].__name__ + " 函数内部异常:" + str(e))
     finally:
       task['next_sec'] = int(time.time()) + task['interval']

def timing_task(self, task):
   """
   定时任务
   """
   # 今天已过秒数
   today_sec = self.get_today_until_now()

# 到了第二天,就重置任务状态
   if task['today'] != self.get_today():
     task['today'] = self.get_today()
     task['today_done'] = False

# 第一次执行
   if task['first_work']:
     if today_sec >= task['task_sec']:
       task['today_done'] = True
       task['first_work'] = False
     else:
       task['first_work'] = False

# 今天还没有执行
   if not task['today_done']:
     if today_sec >= task['task_sec']: # 到点了,开始执行任务
       try:
         task['fun'](*task['arg'])
         self.write_log("正常", "定时任务:" + task['fun'].__name__ + " 已执行")
       except Exception as e:
         self.write_log("异常", "定时任务:" + task['fun'].__name__ + " 函数内部异常:" + str(e))
       finally:
         task['today_done'] = True
         if task['first_work']:
           task['first_work'] = False

def get_today_until_now(self):
   """
   获取今天凌晨到现在的秒数
   """
   i = datetime.datetime.now()
   return i.hour * 3600 + i.minute * 60 + i.second

def get_today(self):
   """
   获取今天的日期
   """
   i = datetime.datetime.now()
   return i.day

def join_task(self, fun, arg, interval=None, timing=None):
   """
   interval和timing只能存在1个
   :param fun: 你要调用的任务
   :param arg: fun的参数
   :param interval: 周期任务,单位秒
   :param timing: 定时任务,取值:[0,24)
   """
   # 参数校验
   if (interval != None and timing != None) or (interval == None and timing == None):
     raise Exception('interval和timing只能选填1个')

if timing and not 0 <= timing < 24:
     raise Exception('timing的取值范围为[0,24)')

if interval and interval < 5:
     raise Exception('interval最少为5')

# 封装一个task
   task = {
     'fun': fun,
     'arg': arg,
     'interval': interval,
     'timing': timing,
   }
   # 封装周期或定时任务相应的参数
   if timing:
     task['task_sec'] = timing * 3600
     task['today_done'] = False
     task['first_work'] = True
     task['today'] = self.get_today()
   elif interval:
     task['next_sec'] = int(time.time()) + interval

# 把task加入任务队列
   self.task_queue.append(task)

self.write_log("正常", "新增任务:" + fun.__name__)

def start(self):
   """
   开始执行任务
   返回线程标识符
   """
   if not self.is_running:
     thread = Thread(target=self.work)

thread.start()

self.is_running = True

self.write_log("正常", "TaskTimer已开始运行!")

return thread.ident

self.write_log("警告", "TaskTimer已运行,请勿重复启动!")

来源:https://blog.csdn.net/qq_39687901/article/details/81985767

标签:python,异步,定时,周期,任务
0
投稿

猜你喜欢

  • 基于layer.js实现收货地址弹框选择然后返回相应的地址信息

    2024-05-08 09:32:22
  • Python3.9.1中使用match方法详解

    2023-09-14 09:51:21
  • 浅谈javascript中的DOM方法

    2024-04-29 13:44:12
  • 详解vue-router 2.0 常用基础知识点之router.push()

    2024-04-09 10:49:35
  • tensorflow使用freeze_graph.py将ckpt转为pb文件的方法

    2023-01-31 15:31:05
  • vs10安装之后一些列问题

    2024-01-29 11:59:48
  • Django使用中间件解决前后端同源策略问题

    2022-09-05 10:33:32
  • 复习Python中的字符串知识点

    2022-09-13 07:32:49
  • django haystack实现全文检索的示例代码

    2021-04-08 05:00:59
  • Python爬虫番外篇之Cookie和Session详解

    2022-02-09 18:56:44
  • python批量读取txt文件为DataFrame的方法

    2021-09-29 12:04:21
  • Python在游戏中的热更新实现

    2022-04-05 14:10:15
  • python实现计算器功能

    2021-02-06 09:45:23
  • python将MongoDB里的ObjectId转换为时间戳的方法

    2022-08-08 04:33:01
  • 用python写一个定时提醒程序的实现代码

    2021-04-12 12:25:45
  • keras之权重初始化方式

    2023-06-01 19:04:44
  • IE7的web标准之道 Ⅲ

    2008-08-20 12:55:00
  • Python教程之基本运算符的使用(下)

    2021-04-29 20:32:37
  • Python机器学习应用之支持向量机的分类预测篇

    2023-08-29 20:42:55
  • 解决每次打开pycharm直接进入项目的问题

    2021-05-08 00:27:45
  • asp之家 网络编程 m.aspxhome.com