1, Loop sleep
from datetime import datetime import time # Execute every n seconds def timer(n): while True: # TodoSomething print("The scheduled task is started....") print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) time.sleep(n) # 5s timer(5)
The implementation results are as follows:
The scheduled task is started.... 2022-07-16 15:31:16 The scheduled task is started.... 2022-07-16 15:31:21 The scheduled task is started.... 2022-07-16 15:31:26 The scheduled task is started.... 2022-07-16 15:31:31 The scheduled task is started.... 2022-07-16 15:31:36 . . .
The disadvantage of this method is that it can only perform tasks at fixed intervals, and it is always executing. If there is a scheduled task, it cannot be completed, such as calling me up at seven in the morning. And sleep is a blocking function, that is to say, you can't do anything during sleep.
2, Timer in threading module
from datetime import datetime from threading import Timer # Print time function def printTime(inc): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) t = Timer(inc, printTime, (inc,)) t.start() # 5s printTime(5)
The running results are as follows:
2022-07-16 15:37:30 2022-07-16 15:37:35 2022-07-16 15:37:40 2022-07-16 15:37:45 2022-07-16 15:37:50
The first parameter of Timer function is the time interval (in seconds), the second parameter is the name of the function to be called, and the third parameter is the parameter of the calling function (tuple)
3, Using the sched module
sched module is a built-in module in Python. It is a scheduling (delay processing mechanism). Every time you want to execute a task regularly, you must write a scheduling.
import sched import time from datetime import datetime # Initialize the scheduler class of the sched module # The first parameter is a function that can return a timestamp, and the second parameter can block before the timing arrives. schedule = sched.scheduler(time.time, time.sleep) # Functions triggered by periodic scheduling def printTime(inc): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) schedule.enter(inc, 0, printTime, (inc,)) # Default parameter 60s def main(inc=60): # The four parameters of enter are: interval event, priority (for sequencing when two events arrive at the same time), and the function triggered by the call, # Give the parameters of the trigger function (tuple form) schedule.enter(0, 0, printTime, (inc,)) schedule.run() # Output once in 10s main(10)
The implementation results are as follows:
2022-07-16 15:44:20 2022-07-16 15:44:30 2022-07-16 15:44:40 2022-07-16 15:44:50 2022-07-16 15:45:00
The steps of sched are as follows:
1. Generate scheduler:
s = sched.scheduler(time.time,time.sleep)
2. Add scheduling event
In fact, there are enter, enterabs, etc. Let's take enter as an example.
s.enter(x1,x2,x3,x4)
The four parameters are: interval event, priority (for sequencing when two events arrive at the same time), the function to be called and triggered, and the parameters to the trigger function (Note: it must be given in tuple, if there is only one parameter (xx,))
3. Operation
s.run()
Note that the sched module is not cyclic. After a schedule is executed, it will be Over. If you want to execute it again, please enter again
4, APScheduler timing framework
Finally found a way to wake me up regularly every day
APScheduler is a Python timed task framework, which is very convenient to use. It provides tasks based on date, fixed time interval and crontab type, and can persist tasks and run applications in daemon mode.
Using APScheduler requires installation
$ pip install apscheduler
First, let's take an example of calling me up at 6:30 every morning from Monday to Friday
from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime # Output time def job(): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # BlockingScheduler scheduler = BlockingScheduler() scheduler.add_job(job, 'cron', day_of_week='1-5', hour=6, minute=30) scheduler.start()
What is the BlockingScheduler in the code?
BlockingScheduler is the scheduler in APScheduler. There are two commonly used schedulers in APScheduler, BlockingScheduler and BackgroundScheduler. When the scheduler is the only task to run in the application, use BlockingSchedule. If you want the scheduler to execute in the background, use BackgroundScheduler.
Four components of APScheduler
The four components of APScheduler are trigger, job store, executor and scheduler.
(1) Trigger
Including scheduling logic, each job has its own trigger, which is used to decide which job will run next. Except for their initial configuration, triggers are completely stateless
APScheduler has three built-in trigger s:
date: Trigger at a specific point in time interval: Fixed time interval trigger cron: Trigger periodically at a specific time
(2) Job store
Store the scheduled jobs. The default job storage is to simply save the jobs in memory, and other job storage is to save the jobs in the database. The data of a job is serialized when it is saved in the persistent job store, and deserialized when it is loaded. The scheduler cannot share the same job store.
APScheduler uses MemoryJobStore by default, and the DB storage scheme can be modified
(3) Actuator
They usually handle the operation of jobs by submitting the specified callable objects to a thread or entering the city pool in the job. When the job is completed, the actuator will notify the scheduler.
There are two most commonly used executor s:
ProcessPoolExecutor ThreadPoolExecutor
(4) Scheduler
Generally, there is only one scheduler in the application, and the application developer usually does not directly deal with job storage, scheduler and trigger. On the contrary, the scheduler provides a suitable interface to deal with these. Configuring job storage and executors can be done in the scheduler, such as adding, modifying, and removing jobs.
Configure scheduler
APScheduler provides many different ways to configure the scheduler. You can use a configuration dictionary or pass it in as a parameter keyword. You can also create a scheduler first, then configure and add jobs, so that you can get more flexibility in different environments.
Here is a simple example of BlockingScheduler:
from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime def job(): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # Define BlockingScheduler sched = BlockingScheduler() sched.add_job(job, 'interval', seconds=5) sched.start()
The above code creates a BlockingScheduler and uses the default memory storage and default actuator. (the default options are MemoryJobStore and ThreadPoolExecutor respectively, where the maximum number of threads in the thread pool is 10). After the configuration is completed, use the start() method to start.
If you want to explicitly set the job store (using mongo storage) and the executor, you can write this:
from datetime import datetime from pymongo import MongoClient from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor # MongoDB parameters host = '127.0.0.1' port = 27017 client = MongoClient(host, port) # Output time def job(): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # Storage mode jobstores = { 'mongo': MongoDBJobStore(collection='job', database='test', client=client), 'default': MemoryJobStore() } executors = { 'default': ThreadPoolExecutor(10), 'processpool': ProcessPoolExecutor(3) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults) scheduler.add_job(job, 'interval', seconds=5, jobstore='mongo') scheduler.start()
Operation on job
Add job
There are two ways to add a job:
add_job() scheduled_job()
The second method is only applicable to jobs that will not change during the application run, while the first method returns an apscheduler job. An instance of a job, which can be used to change or remove a job.
from apscheduler.schedulers.blocking import BlockingScheduler sched = BlockingScheduler() # Decorator @sched.scheduled_job('interval', id='my_job_id', seconds=5) def job_function(): print("Hello World") # start sched.start()
@sched.scheduled_job() is a decorator for Python.
Remove job
There are also two ways to remove a job:
remove_job() job.remove()
remove_job removed with jobID
job.remove() uses add_ Instance returned by job()
job = scheduler.add_job(myfunc, 'interval', minutes=2) job.remove() # id scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.remove_job('my_job_id')
Pause and resume job s
Pause a job:
apscheduler.job.Job.pause() apscheduler.schedulers.base.BaseScheduler.pause_job()
Restore a job:
apscheduler.job.Job.resume() apscheduler.schedulers.base.BaseScheduler.resume_job()
I hope you remember apscheduler job. Job is add_ Instance returned by job()
Get job list
To get the list of schedulable jobs, you can use get_jobs(), which will return all job instances.
You can also use print_jobs() to output a list of all formatted job s.
Modify job
All properties of a job except jobID can be modified, using apscheduler job. Job. Modify() or modify_job() modifies the properties of a job
job.modify(max_instances=6, name='Alternate name') modify_job('my_job_id', trigger='cron', minute='*/5')
Close job
By default, the scheduler will shut down all schedulers and job storage after all jobs are completed. Set the wait option to False to turn it off immediately.
scheduler.shutdown() scheduler.shutdown(wait=False) scheduler event
The scheduler can add event listeners and trigger them at special times.
def my_listener(event): if event.exception: print('The job crashed :(') else: print('The job worked :)') # Add listener scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
trigger rule
date
The most basic kind of scheduling is that the job will be executed only once. Its parameters are as follows:
run_date (datetime|str) – the date/time to run the job at
timezone (datetime.tzinfo|str) – time zone for run_date if it doesn't have one already
from datetime import date from apscheduler.schedulers.blocking import BlockingScheduler sched = BlockingScheduler() def my_job(text): print(text) # The job will be executed on November 6th, 2009 sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text']) sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text']) sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['text']) # The 'date' trigger and datetime.now() as run_date are implicit sched.add_job(my_job, args=['text']) sched.start()
cron
year (int|str) – 4-digit year month (int|str) – month (1-12) day (int|str) – day of the (1-31) week (int|str) – ISO week (1-53) day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) hour (int|str) – hour (0-23) minute (int|str) – minute (0-59) second (int|str) – second (0-59) start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) end_date (datetime|str) – latest possible date/time to trigger on (inclusive) timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
expression:
from apscheduler.schedulers.blocking import BlockingScheduler def job_function(): print("Hello World") # BlockingScheduler sched = BlockingScheduler() # Schedules job_function to be run on the third Friday # of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00 sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') # Runs from Monday to Friday at 5:30 (am) until 2014-05-30 00:00:00 sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30') sched.start()
interval
Parameters:
- weeks (int) – number of weeks to wait
- days (int) – number of days to wait
- hours (int) – number of hours to wait
- minutes (int) – number of minutes to wait
- seconds (int) – number of seconds to wait
- start_date (datetime|str) – starting point for the interval
calculation - end_date (datetime|str) – latest possible date/time to trigger on
- timezone (datetime.tzinfo|str) – time zone to use for the date/time
calculations
from datetime import datetime from apscheduler.schedulers.blocking import BlockingScheduler def job_function(): print("Hello World") # BlockingScheduler sched = BlockingScheduler() # Schedule job_function to be called every two hours sched.add_job(job_function, 'interval', hours=2) # The same as before, but starts on 2010-10-10 at 9:30 and stops on 2014-06-15 at 11:00 sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00') sched.start()