# -*- coding: utf-8 -*-
# @Time : 2022/1/12 2:00 PM
# @Author : Ultipa
# @Email : [email protected]
# @File : ultipaSchedule.py
import threading
import time
import schedule
[docs]
def run_continuously(interval=2):
"""Continuously run, while executing pending jobs at each
elapsed time interval.
@return cease_continuous_run: threading. Event which can
be set to cease continuous run. Please note that it is
*intended behavior that run_continuously() does not run
missed jobs*. For example, if you've registered a job that
should run every minute and you set a continuous run
interval of one hour then your job won't be run 60 times
at each interval but only once.
"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
while not cease_continuous_run.is_set():
schedule.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.daemon = True
continuous_thread.start()
return cease_continuous_run
[docs]
def background_job():
print('Hello from the background thread')
if __name__ == '__main__':
schedule.every().second.do(background_job)
# Start the background thread
stop_run_continuously = run_continuously()
# Do some other things...
time.sleep(10)
# Stop the background thread
stop_run_continuously.set()