The birth of this article is thanks to a reader who gave this excellent article the opportunity to meet you. The key point is the excellent article. Ha ha.
The story goes like this
Don't thank me. I'll give you roses as a gift. I believe that the following content will not disappoint you, because it will be the best article on "delayed tasks" in the market at present. This has always been my goal in writing, so that every article of mine is a little better than that in the market.
OK, let's move on to today's topic. The main content of this article is shown in the following figure:
What is a deferred task?
Gu Ming thinks that tasks that need to be delayed are called delayed tasks.
The usage scenarios for deferred tasks are as follows:
-
The red envelope has not been checked for 24 hours, and the return business needs to be delayed;
-
On the account sheet day of each month, you need to send the statement of the current month to the user;
-
If the user fails to pay 30 minutes after the order is placed, the system needs to cancel the order automatically.
And other events require the use of deferred tasks.
Analysis on the realization of delayed tasks
The key to delay task implementation is to execute a task at a certain time node. Based on this information, we can think of the following two methods to realize delayed tasks:
-
Write a "dead loop" by yourself to always judge whether the current time node has tasks to be executed;
-
Use JDK or tool classes provided by third parties to implement deferred tasks.
The keywords we can think of for implementing deferred tasks through JDK are DelayQueue and ScheduledExecutorService, and there are many deferred task execution methods provided by third parties, such as Redis, Netty, MQ, etc.
Delay task implementation
Next, we will explain the specific implementation of each delayed task in combination with the code.
1. infinite loop implementation of delayed tasks
In this way, we need to start an infinite loop to scan the task all the time, and then use a Map set to store the task and delay the execution time. The implementation code is as follows:
import java.time.Instant; import java.time.LocalDateTime; import java.util.HashMap; import java.util.Iterator; import java.util.Map; /** * Summary of delayed task execution methods */ public class DelayTaskExample { //Store scheduled tasks private static Map<String, Long> _TaskMap = new HashMap<>(); public static void main(String[] args) { System.out.println("Program start time:" + LocalDateTime.now()); //Add scheduled task _TaskMap.put("task-1", Instant.now().plusSeconds(3).toEpochMilli()); //Delay 3s //Call infinite loop to implement deferred task loopTask(); } /** * Infinite loop implementation delay task */ public static void loopTask() { Long itemLong = 0L; while (true) { Iterator it = _TaskMap.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = (Map.Entry) it.next(); itemLong = (Long) entry.getValue(); //There are tasks to be performed if (Instant.now().toEpochMilli() >= itemLong) { //Delay task, business logic execution System.out.println("Perform tasks:" + entry.getKey() + " ,Execution time:" + LocalDateTime.now()); //Delete task _TaskMap.remove(entry.getKey()); } } } } }
The results of the above procedures are:
Program start time: 2020-04-12T18:51:28.188
Task: task-1, execution time: 2020-04-12T18:51:31.189
It can be seen that the task has been delayed for 3s, which is in line with our expectations.
2.Java API implementation delay task
The Java API provides two ways to implement deferred tasks: DelayQueue and ScheduledExecutorService.
① ScheduledExecutorService implements deferred tasks
We can use ScheduledExecutorService to always execute tasks at a fixed frequency. The implementation code is as follows:
public class DelayTaskExample { public static void main(String[] args) { System.out.println("Program start time:" + LocalDateTime.now()); scheduledExecutorServiceTask(); } /** * ScheduledExecutorService Implement the fixed frequency to cycle the task all the time */ public static void scheduledExecutorServiceTask() { ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); executor.scheduleWithFixedDelay( new Runnable() { @Override public void run() { //Business code for task execution System.out.println("Perform tasks" + " ,Execution time:" + LocalDateTime.now()); } }, 2, //Initial execution interval 2, //Once in 2s TimeUnit.SECONDS); } }
The results of the above procedures are:
Program start time: 2020-04-12T21:28:10.416
Task execution time: 2020-04-12T21:28:12.421
Task execution time: 2020-04-12T21:28:14.422
......
You can see that scheduledexecutorservice\schedulewithfixeddelay (...) is used Method, the deferred task is cycled at a certain frequency.
② DelayQueue implements delayed tasks
DelayQueue is an unbounded blocking queue that supports delay to obtain elements. The elements in the queue must implement the Delayed interface and rewrite the getDelay(TimeUnit) and compareTo(Delayed) methods. The complete code for DelayQueue to implement delay queue is as follows:
public class DelayTest { public static void main(String[] args) throws InterruptedException { DelayQueue delayQueue = new DelayQueue(); //Add deferred task delayQueue.put(new DelayElement(1000)); delayQueue.put(new DelayElement(3000)); delayQueue.put(new DelayElement(5000)); System.out.println("Start time:" + DateFormat.getDateTimeInstance().format(new Date())); while (!delayQueue.isEmpty()){ //Perform deferred tasks System.out.println(delayQueue.take()); } System.out.println("End time:" + DateFormat.getDateTimeInstance().format(new Date())); } static class DelayElement implements Delayed { //Delay deadline (single side: ms) long delayTime = System.currentTimeMillis(); public DelayElement(long delayTime) { this.delayTime = (this.delayTime + delayTime); } @Override //Get remaining time public long getDelay(TimeUnit unit) { return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override //Sort elements in the queue by public int compareTo(Delayed o) { if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) { return 1; } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) { return -1; } else { return 0; } } @Override public String toString() { return DateFormat.getDateTimeInstance().format(new Date(delayTime)); } } }
The results of the above procedures are:
Start time: 20:40:38, April 12, 2020
2020-4-12 20:40:39
2020-4-12 20:40:41
2020-4-12 20:40:43
End time: 20:40:43, April 12, 2020
3.Redis implements delayed tasks
The methods of using Redis to delay tasks can be roughly divided into two types: the method of judging through zset data and the method of notification through key space.
① Judging by data
With the help of zset data type, we store delayed tasks in this data set, and then start a wireless loop to query all tasks of the current time for consumption. The implementation code is as follows (with the help of Jedis framework):
import redis.clients.jedis.Jedis; import utils.JedisUtils; import java.time.Instant; import java.util.Set; public class DelayQueueExample { // zset key private static final String _KEY = "myDelayQueue"; public static void main(String[] args) throws InterruptedException { Jedis jedis = JedisUtils.getJedis(); //Delay execution for 30s (time after 30s) long delayTime = Instant.now().plusSeconds(30).getEpochSecond(); jedis.zadd(_KEY, delayTime, "order_1"); //Continue adding test data jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2"); jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3"); jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4"); jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5"); //Open delay queue doDelayQueue(jedis); } /** * Delay queue consumption * @param jedis Redis client */ public static void doDelayQueue(Jedis jedis) throws InterruptedException { while (true) { //Current time Instant nowInstant = Instant.now(); long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); //Last second time long nowSecond = nowInstant.getEpochSecond(); //Query all tasks at the current time Set<String> data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond); for (String item : data) { //Consumption task System.out.println("Consumption:" + item); } //Delete an already executed task jedis.zremrangeByScore(_KEY, lastSecond, nowSecond); Thread.sleep(1000); //Poll once per second } } }
② Notification via key space
By default, the Redis server does not enable the key space notification. We need to enable it manually through the config set notify keyspace events ex command. After the key space notification is enabled, we can get the events of each key value expiration. We use this mechanism to enable a scheduled task for everyone. The implementation code is as follows:
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; import utils.JedisUtils; public class TaskExample { public static final String _TOPIC = "__keyevent@0__:expired"; //Subscription channel name public static void main(String[] args) { Jedis jedis = JedisUtils.getJedis(); //Perform scheduled tasks doTask(jedis); } /** * Subscribe to expired messages and execute scheduled tasks * @param jedis Redis client */ public static void doTask(Jedis jedis) { //Subscribe to expired messages jedis.psubscribe(new JedisPubSub() { @Override public void onPMessage(String pattern, String channel, String message) { //Receive the message and execute the scheduled task System.out.println("Received message:" + message); } }, _TOPIC); } }
4.Netty implements delayed tasks
Netty is a Java open source framework provided by JBOSS. It is a client-side and server-side programming framework based on NIO. Using netty can ensure that you can quickly and easily develop a network application, such as a client-side and server-side application that implements a certain protocol. Netty is equivalent to simplifying and streamlining the programming and development process of network applications, such as the development of socket services based on TCP and UDP.
You can use the tool class HashedWheelTimer provided by Netty to implement deferred tasks. The implementation code is as follows.
First, add a Netty reference to the project. The configuration is as follows:
<!-- https://mvnrepository.com/artifact/io.netty/netty-common --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-common</artifactId> <version>4.1.48.Final</version> </dependency>
The complete code of Netty implementation is as follows:
public class DelayTaskExample { public static void main(String[] args) { System.out.println("Program start time:" + LocalDateTime.now()); NettyTask(); } /** * Netty based deferred tasks */ private static void NettyTask() { //Create deferred task instance HashedWheelTimer timer = new HashedWheelTimer(3, //Time interval TimeUnit.SECONDS, 100); //Number of slots in the time wheel //Create a task TimerTask task = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { System.out.println("Perform tasks" + " ,Execution time:" + LocalDateTime.now()); } }; //Add task to delay queue timer.newTimeout(task, 0, TimeUnit.SECONDS); } }
The results of the above procedures are:
Program start time: 2020-04-13T10:16:23.033
Task execution time: 2020-04-13T10:16:26.118
HashedWheelTimer is implemented by using a timing wheel. A timing wheel is actually a ring-shaped data structure. You can imagine it as a clock, which is divided into many grids. Each grid represents a certain time. On this grid, a linked list is used to save the timeout tasks to be executed. At the same time, a pointer goes grid by grid. When it reaches that grid, the delay tasks corresponding to the grid are executed, as shown in the following figure:
(pictures from the Internet)
The above picture can be understood as that the size of the time wheel is 8. At a certain time, turn one grid (for example, 1s), and each grid points to a linked list, which stores the tasks to be executed.
5.MQ implement deferred tasks
If an MQ middleware is specifically enabled to perform deferred tasks, it will be a bit of a luxury. However, if there is an MQ environment, it is still desirable to use it to implement deferred tasks.
Almost all MQ middleware can implement deferred tasks, which is more accurately called deferred queue. This article uses RabbitMQ as an example to see how it implements deferred tasks.
RabbitMQ implements delay queues in two ways:
-
After the message expires, it enters the dead letter exchange and is forwarded to the delayed consumption queue by the exchange to realize the delay function;
-
Use the rabbitmq delayed message exchange plug-in to implement the delay function.
Note: rabbitmq delayed message exchange is supported only in RabbitMQ 3.5.7 and above, and depends on Erlang/OPT 18.0 and above.
Since it is troublesome to use the dead letter exchange, it is recommended to use the rabbitmq delayed message exchange plug-in to implement the delay queue function.
First, we need to download and install the rabbitmq delayed message exchange plug-in. The download address is: http://www.rabbitmq.com/community-plugins.html
Select the corresponding version to download, and then copy it to the RabbitMQ server directory. Use the command RabbitMQ plugins enable RabbitMQ_ delayed_ message_ Exchange opens the plug-ins. Use the command RabbitMQ plugins list to query all installed plug-ins. The installation is successful, as shown in the following figure:
Finally, restart the RabbitMQ service to make the plug-in take effect.
First, we need to configure the message queue. The implementation code is as follows:
import com.example.rabbitmq.mq.DirectConfig; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class DelayedConfig { final static String QUEUE_NAME = "delayed.goods.order"; final static String EXCHANGE_NAME = "delayedec"; @Bean public Queue queue() { return new Queue(DelayedConfig.QUEUE_NAME); } //Configure the default switch @Bean CustomExchange customExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); //Parameter 2 is of type: it must be x-delayed-message return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args); } //Bind queue to switch @Bean Binding binding(Queue queue, CustomExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs(); } }
Then add the code to add the message. The specific implementation is as follows:
import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; @Component public class DelayedSender { @Autowired private AmqpTemplate rabbitTemplate; public void send(String msg) { SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("Sent on:" + sf.format(new Date())); rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setHeader("x-delay", 3000); return message; } }); } }
Add the code of the consumption message:
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; @Component @RabbitListener(queues = "delayed.goods.order") public class DelayedReceiver { @RabbitHandler public void process(String msg) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("Receiving time:" + sdf.format(new Date())); System.out.println("Message content:" + msg); } }
Finally, let's use the code to test:
import com.example.rabbitmq.RabbitmqApplication; import com.example.rabbitmq.mq.delayed.DelayedSender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.text.SimpleDateFormat; import java.util.Date; @RunWith(SpringRunner.class) @SpringBootTest public class DelayedTest { @Autowired private DelayedSender sender; @Test public void Test() throws InterruptedException { SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd"); sender.send("Hi Admin."); Thread.sleep(5 * 1000); //Wait for the receiving program to execute before exiting the test } }
The results of the above procedures are as follows:
Sent on: April 13, 2020 20:47:51
Receiving time: 20:47:54, April 13, 2020
Message content: Hi Admin
From the results, it can be seen that the execution of the above program meets the implementation expectation of the delayed task.
6. use Spring to schedule tasks
If you use a Spring or SpringBoot project, you can implement it with the help of Scheduled. This article will use the SpringBoot project to demonstrate the implementation of Scheduled. We need to declare that Scheduled is enabled. The implementation code is as follows:
@SpringBootApplication @EnableScheduling public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
Then add a deferred task. The implementation code is as follows:
@Component public class ScheduleJobs { @Scheduled(fixedDelay = 2 * 1000) public void fixedDelayJob() throws InterruptedException { System.out.println("Task execution time:" + LocalDateTime.now()); } }
At this point, after we start the project, we can see that the task has been executed circularly in the form of 2s delay. The results are as follows:
Task execution time: 2020-04-13T14:07:53.349
Task execution time: 2020-04-13T14:07:55.350
Task execution time: 2020-04-13T14:07:57.351
...
We can also use a Corn expression to define the frequency of task execution, for example, @Scheduled(cron = "0/4 * * * *?") .
7.Quartz implements delayed tasks
Quartz is a powerful task scheduler, which can realize more complex scheduling functions. It also supports distributed task scheduling.
We use Quartz to implement a delayed task. First, we define an execution task code as follows:
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.scheduling.quartz.QuartzJobBean; import java.time.LocalDateTime; public class SampleJob extends QuartzJobBean { @Override protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { System.out.println("Task execution time:" + LocalDateTime.now()); } }
After defining a JobDetail and Trigger, the implementation code is as follows:
import org.quartz.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class SampleScheduler { @Bean public JobDetail sampleJobDetail() { return JobBuilder.newJob(SampleJob.class).withIdentity("sampleJob") .storeDurably().build(); } @Bean public Trigger sampleJobTrigger() { //Execute after 3s SimpleScheduleBuilder scheduleBuilder = SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).withRepeatCount(1); return TriggerBuilder.newTrigger().forJob(sampleJobDetail()).withIdentity("sampleTrigger") .withSchedule(scheduleBuilder).build(); } }
Finally, start the delay task after the SpringBoot project is started. The implementation code is as follows:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.scheduling.quartz.SchedulerFactoryBean; /** * SpringBoot Execute after project startup */ public class MyStartupRunner implements CommandLineRunner { @Autowired private SchedulerFactoryBean schedulerFactoryBean; @Autowired private SampleScheduler sampleScheduler; @Override public void run(String... args) throws Exception { //Start scheduled task schedulerFactoryBean.getScheduler().scheduleJob( sampleScheduler.sampleJobTrigger()); } }
The results of the above procedures are as follows:
2020-04-13 19:02:12.331 INFO 17768 --- [ restartedMain] com.example.demo.DemoApplication : Started DemoApplication in 1.815 seconds (JVM running for 3.088)
Task execution time: 2020-04-13T19:02:15.019
It can be seen from the results that the delayed task was executed 3s after the project was started.
summary
This article describes the usage scenarios of delayed tasks and 10 implementation methods of delayed tasks:
-
Manual wireless cycle;
-
ScheduledExecutorService;
-
DelayQueue;
-
Redis zset data judgment method;
-
Redis key space notification mode;
-
HashedWheelTimer tool class provided by Netty;
-
RabbitMQ dead letter queue;
-
RabbitMQ delayed message plug-in RabbitMQ delayed message exchange;
-
Spring Scheduled;
-
Quartz.
Last words
As the saying goes: one minute on stage, ten years off stage. The content of this article is the crystallization of the author's years of work and the painstaking arrangement of his liver. If you think this article is helpful to you, please help me share it and let more people see it. Thank you.