Actual combat: 10 methods to achieve delayed tasks, with code attached

The birth of this article is thanks to a reader who gave this excellent article the opportunity to meet you. The key point is the excellent article. Ha ha.

The story goes like this

 

Don't thank me. I'll give you roses as a gift. I believe that the following content will not disappoint you, because it will be the best article on "delayed tasks" in the market at present. This has always been my goal in writing, so that every article of mine is a little better than that in the market.

OK, let's move on to today's topic. The main content of this article is shown in the following figure:

What is a deferred task?

Gu Ming thinks that tasks that need to be delayed are called delayed tasks.

The usage scenarios for deferred tasks are as follows:

  1. The red envelope has not been checked for 24 hours, and the return business needs to be delayed;

  2. On the account sheet day of each month, you need to send the statement of the current month to the user;

  3. If the user fails to pay 30 minutes after the order is placed, the system needs to cancel the order automatically.

And other events require the use of deferred tasks.

Analysis on the realization of delayed tasks

The key to delay task implementation is to execute a task at a certain time node. Based on this information, we can think of the following two methods to realize delayed tasks:

  1. Write a "dead loop" by yourself to always judge whether the current time node has tasks to be executed;

  2. Use JDK or tool classes provided by third parties to implement deferred tasks.

The keywords we can think of for implementing deferred tasks through JDK are DelayQueue and ScheduledExecutorService, and there are many deferred task execution methods provided by third parties, such as Redis, Netty, MQ, etc.

Delay task implementation

Next, we will explain the specific implementation of each delayed task in combination with the code.

1. infinite loop implementation of delayed tasks

In this way, we need to start an infinite loop to scan the task all the time, and then use a Map set to store the task and delay the execution time. The implementation code is as follows:

import java.time.Instant;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/**
 * Summary of delayed task execution methods
 */
public class DelayTaskExample {
    //Store scheduled tasks
    private static Map<String, Long> _TaskMap = new HashMap<>();

    public static void main(String[] args) {
        System.out.println("Program start time:" + LocalDateTime.now());
        //Add scheduled task
        _TaskMap.put("task-1", Instant.now().plusSeconds(3).toEpochMilli()); //Delay 3s

        //Call infinite loop to implement deferred task
        loopTask();
    }

    /**
     * Infinite loop implementation delay task
     */
    public static void loopTask() {
        Long itemLong = 0L;
        while (true) {
            Iterator it = _TaskMap.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                itemLong = (Long) entry.getValue();
                //There are tasks to be performed
                if (Instant.now().toEpochMilli() >= itemLong) {
                    //Delay task, business logic execution
                    System.out.println("Perform tasks:" + entry.getKey() +
                            " ,Execution time:" + LocalDateTime.now());
                    //Delete task
                    _TaskMap.remove(entry.getKey());
                }
            }
        }
    }
}

The results of the above procedures are:

Program start time: 2020-04-12T18:51:28.188

Task: task-1, execution time: 2020-04-12T18:51:31.189

It can be seen that the task has been delayed for 3s, which is in line with our expectations.

2.Java API implementation delay task

The Java API provides two ways to implement deferred tasks: DelayQueue and ScheduledExecutorService.

① ScheduledExecutorService implements deferred tasks

We can use ScheduledExecutorService to always execute tasks at a fixed frequency. The implementation code is as follows:

public class DelayTaskExample {
    public static void main(String[] args) {
        System.out.println("Program start time:" + LocalDateTime.now());
        scheduledExecutorServiceTask();
    }

    /**
     * ScheduledExecutorService Implement the fixed frequency to cycle the task all the time
     */
    public static void scheduledExecutorServiceTask() {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        executor.scheduleWithFixedDelay(
                new Runnable() {
                    @Override
                    public void run() {
                        //Business code for task execution
                        System.out.println("Perform tasks" +
                                " ,Execution time:" + LocalDateTime.now());
                    }
                },
                2, //Initial execution interval
                2, //Once in 2s
                TimeUnit.SECONDS);
    }
}

The results of the above procedures are:

Program start time: 2020-04-12T21:28:10.416

Task execution time: 2020-04-12T21:28:12.421

Task execution time: 2020-04-12T21:28:14.422

......

You can see that scheduledexecutorservice\schedulewithfixeddelay (...) is used Method, the deferred task is cycled at a certain frequency.

② DelayQueue implements delayed tasks

DelayQueue is an unbounded blocking queue that supports delay to obtain elements. The elements in the queue must implement the Delayed interface and rewrite the getDelay(TimeUnit) and compareTo(Delayed) methods. The complete code for DelayQueue to implement delay queue is as follows:

public class DelayTest {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue delayQueue = new DelayQueue();
        //Add deferred task
        delayQueue.put(new DelayElement(1000));
        delayQueue.put(new DelayElement(3000));
        delayQueue.put(new DelayElement(5000));
        System.out.println("Start time:" +  DateFormat.getDateTimeInstance().format(new Date()));
        while (!delayQueue.isEmpty()){
            //Perform deferred tasks
            System.out.println(delayQueue.take());
        }
        System.out.println("End time:" +  DateFormat.getDateTimeInstance().format(new Date()));
    }

    static class DelayElement implements Delayed {
        //Delay deadline (single side: ms)
        long delayTime = System.currentTimeMillis();
        public DelayElement(long delayTime) {
            this.delayTime = (this.delayTime + delayTime);
        }
        @Override
        //Get remaining time
        public long getDelay(TimeUnit unit) {
            return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        @Override
        //Sort elements in the queue by
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
                return -1;
            } else {
                return 0;
            }
        }
        @Override
        public String toString() {
            return DateFormat.getDateTimeInstance().format(new Date(delayTime));
        }
    }
}

The results of the above procedures are:

Start time: 20:40:38, April 12, 2020

2020-4-12 20:40:39 

2020-4-12 20:40:41 

2020-4-12 20:40:43 

End time: 20:40:43, April 12, 2020

3.Redis implements delayed tasks

The methods of using Redis to delay tasks can be roughly divided into two types: the method of judging through zset data and the method of notification through key space.

① Judging by data

With the help of zset data type, we store delayed tasks in this data set, and then start a wireless loop to query all tasks of the current time for consumption. The implementation code is as follows (with the help of Jedis framework):

import redis.clients.jedis.Jedis;
import utils.JedisUtils;
import java.time.Instant;
import java.util.Set;

public class DelayQueueExample {
    // zset key
    private static final String _KEY = "myDelayQueue";
    
    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = JedisUtils.getJedis();
        //Delay execution for 30s (time after 30s)
        long delayTime = Instant.now().plusSeconds(30).getEpochSecond();
        jedis.zadd(_KEY, delayTime, "order_1");
        //Continue adding test data
        jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");
        jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");
        jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");
        jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");
        //Open delay queue
        doDelayQueue(jedis);
    }

    /**
     * Delay queue consumption
     * @param jedis Redis client
     */
    public static void doDelayQueue(Jedis jedis) throws InterruptedException {
        while (true) {
            //Current time
            Instant nowInstant = Instant.now();
            long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); //Last second time
            long nowSecond = nowInstant.getEpochSecond();
            //Query all tasks at the current time
            Set<String> data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);
            for (String item : data) {
                //Consumption task
                System.out.println("Consumption:" + item);
            }
            //Delete an already executed task
            jedis.zremrangeByScore(_KEY, lastSecond, nowSecond);
            Thread.sleep(1000); //Poll once per second
        }
    }
}

② Notification via key space

By default, the Redis server does not enable the key space notification. We need to enable it manually through the config set notify keyspace events ex command. After the key space notification is enabled, we can get the events of each key value expiration. We use this mechanism to enable a scheduled task for everyone. The implementation code is as follows:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import utils.JedisUtils;

public class TaskExample {
    public static final String _TOPIC = "__keyevent@0__:expired"; //Subscription channel name
    public static void main(String[] args) {
        Jedis jedis = JedisUtils.getJedis();
        //Perform scheduled tasks
        doTask(jedis);
    }

    /**
     * Subscribe to expired messages and execute scheduled tasks
     * @param jedis Redis client
     */
    public static void doTask(Jedis jedis) {
        //Subscribe to expired messages
        jedis.psubscribe(new JedisPubSub() {
            @Override
            public void onPMessage(String pattern, String channel, String message) {
                //Receive the message and execute the scheduled task
                System.out.println("Received message:" + message);
            }
        }, _TOPIC);
    }
}

4.Netty implements delayed tasks

Netty is a Java open source framework provided by JBOSS. It is a client-side and server-side programming framework based on NIO. Using netty can ensure that you can quickly and easily develop a network application, such as a client-side and server-side application that implements a certain protocol. Netty is equivalent to simplifying and streamlining the programming and development process of network applications, such as the development of socket services based on TCP and UDP.

You can use the tool class HashedWheelTimer provided by Netty to implement deferred tasks. The implementation code is as follows.

First, add a Netty reference to the project. The configuration is as follows:

<!-- https://mvnrepository.com/artifact/io.netty/netty-common -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-common</artifactId>
    <version>4.1.48.Final</version>
</dependency>

The complete code of Netty implementation is as follows:

public class DelayTaskExample {
    public static void main(String[] args) {
        System.out.println("Program start time:" + LocalDateTime.now());
        NettyTask();
    }

    /**
     * Netty based deferred tasks
     */
    private static void NettyTask() {
        //Create deferred task instance
        HashedWheelTimer timer = new HashedWheelTimer(3, //Time interval
                TimeUnit.SECONDS,
                100); //Number of slots in the time wheel
        //Create a task
        TimerTask task = new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                System.out.println("Perform tasks" +
                        " ,Execution time:" + LocalDateTime.now());
            }
        };
        //Add task to delay queue
        timer.newTimeout(task, 0, TimeUnit.SECONDS);

    }
}

The results of the above procedures are:

Program start time: 2020-04-13T10:16:23.033

Task execution time: 2020-04-13T10:16:26.118

HashedWheelTimer is implemented by using a timing wheel. A timing wheel is actually a ring-shaped data structure. You can imagine it as a clock, which is divided into many grids. Each grid represents a certain time. On this grid, a linked list is used to save the timeout tasks to be executed. At the same time, a pointer goes grid by grid. When it reaches that grid, the delay tasks corresponding to the grid are executed, as shown in the following figure:

(pictures from the Internet)

The above picture can be understood as that the size of the time wheel is 8. At a certain time, turn one grid (for example, 1s), and each grid points to a linked list, which stores the tasks to be executed.

5.MQ implement deferred tasks

If an MQ middleware is specifically enabled to perform deferred tasks, it will be a bit of a luxury. However, if there is an MQ environment, it is still desirable to use it to implement deferred tasks.

Almost all MQ middleware can implement deferred tasks, which is more accurately called deferred queue. This article uses RabbitMQ as an example to see how it implements deferred tasks.

RabbitMQ implements delay queues in two ways:

  • After the message expires, it enters the dead letter exchange and is forwarded to the delayed consumption queue by the exchange to realize the delay function;

  • Use the rabbitmq delayed message exchange plug-in to implement the delay function.

Note: rabbitmq delayed message exchange is supported only in RabbitMQ 3.5.7 and above, and depends on Erlang/OPT 18.0 and above.

Since it is troublesome to use the dead letter exchange, it is recommended to use the rabbitmq delayed message exchange plug-in to implement the delay queue function.

First, we need to download and install the rabbitmq delayed message exchange plug-in. The download address is: http://www.rabbitmq.com/community-plugins.html

Select the corresponding version to download, and then copy it to the RabbitMQ server directory. Use the command RabbitMQ plugins enable RabbitMQ_ delayed_ message_ Exchange opens the plug-ins. Use the command RabbitMQ plugins list to query all installed plug-ins. The installation is successful, as shown in the following figure:

Finally, restart the RabbitMQ service to make the plug-in take effect.

First, we need to configure the message queue. The implementation code is as follows:

import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayedConfig {
    final static String QUEUE_NAME = "delayed.goods.order";
    final static String EXCHANGE_NAME = "delayedec";
    @Bean
    public Queue queue() {
        return new Queue(DelayedConfig.QUEUE_NAME);
    }

    //Configure the default switch
    @Bean
    CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        //Parameter 2 is of type: it must be x-delayed-message
        return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }
    //Bind queue to switch
    @Bean
    Binding binding(Queue queue, CustomExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
    }
}

Then add the code to add the message. The specific implementation is as follows:

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;

@Component
public class DelayedSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String msg) {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("Sent on:" + sf.format(new Date()));

        rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setHeader("x-delay", 3000);
                return message;
            }
        });
    }
}

Add the code of the consumption message:

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;

@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {
    @RabbitHandler
    public void process(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("Receiving time:" + sdf.format(new Date()));
        System.out.println("Message content:" + msg);
    }
}

Finally, let's use the code to test:

import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.text.SimpleDateFormat;
import java.util.Date;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {

    @Autowired
    private DelayedSender sender;

    @Test
    public void Test() throws InterruptedException {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
        sender.send("Hi Admin.");
        Thread.sleep(5 * 1000); //Wait for the receiving program to execute before exiting the test
    }
}

The results of the above procedures are as follows:

Sent on: April 13, 2020 20:47:51

Receiving time: 20:47:54, April 13, 2020

Message content: Hi Admin

From the results, it can be seen that the execution of the above program meets the implementation expectation of the delayed task.

6. use Spring to schedule tasks

If you use a Spring or SpringBoot project, you can implement it with the help of Scheduled. This article will use the SpringBoot project to demonstrate the implementation of Scheduled. We need to declare that Scheduled is enabled. The implementation code is as follows:

@SpringBootApplication
@EnableScheduling
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

Then add a deferred task. The implementation code is as follows:

@Component
public class ScheduleJobs {
    @Scheduled(fixedDelay = 2 * 1000)
    public void fixedDelayJob() throws InterruptedException {
        System.out.println("Task execution time:" + LocalDateTime.now());
    }
}

At this point, after we start the project, we can see that the task has been executed circularly in the form of 2s delay. The results are as follows:

Task execution time: 2020-04-13T14:07:53.349

Task execution time: 2020-04-13T14:07:55.350

Task execution time: 2020-04-13T14:07:57.351

...

We can also use a Corn expression to define the frequency of task execution, for example, @Scheduled(cron = "0/4 * * * *?")  .

7.Quartz implements delayed tasks

Quartz is a powerful task scheduler, which can realize more complex scheduling functions. It also supports distributed task scheduling.

We use Quartz to implement a delayed task. First, we define an execution task code as follows:

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;

import java.time.LocalDateTime;

public class SampleJob extends QuartzJobBean {
    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext)
            throws JobExecutionException {
        System.out.println("Task execution time:" + LocalDateTime.now());
    }
}

After defining a JobDetail and Trigger, the implementation code is as follows:

import org.quartz.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SampleScheduler {
    @Bean
    public JobDetail sampleJobDetail() {
        return JobBuilder.newJob(SampleJob.class).withIdentity("sampleJob")
                .storeDurably().build();
    }

    @Bean
    public Trigger sampleJobTrigger() {
        //Execute after 3s
        SimpleScheduleBuilder scheduleBuilder =
                SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).withRepeatCount(1);
        return TriggerBuilder.newTrigger().forJob(sampleJobDetail()).withIdentity("sampleTrigger")
                .withSchedule(scheduleBuilder).build();
    }
}

Finally, start the delay task after the SpringBoot project is started. The implementation code is as follows:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

/**
 * SpringBoot Execute after project startup
 */
public class MyStartupRunner implements CommandLineRunner {

    @Autowired
    private SchedulerFactoryBean schedulerFactoryBean;

    @Autowired
    private SampleScheduler sampleScheduler;

    @Override
    public void run(String... args) throws Exception {
        //Start scheduled task
        schedulerFactoryBean.getScheduler().scheduleJob(
                sampleScheduler.sampleJobTrigger());
    }
}

The results of the above procedures are as follows:

2020-04-13 19:02:12.331  INFO 17768 --- [  restartedMain] com.example.demo.DemoApplication         : Started DemoApplication in 1.815 seconds (JVM running for 3.088) 

Task execution time: 2020-04-13T19:02:15.019

It can be seen from the results that the delayed task was executed 3s after the project was started.

summary

This article describes the usage scenarios of delayed tasks and 10 implementation methods of delayed tasks:

  1. Manual wireless cycle;

  2. ScheduledExecutorService;

  3. DelayQueue;

  4. Redis zset data judgment method;

  5. Redis key space notification mode;

  6. HashedWheelTimer tool class provided by Netty;

  7. RabbitMQ dead letter queue;

  8. RabbitMQ delayed message plug-in RabbitMQ delayed message exchange;

  9. Spring Scheduled;

  10. Quartz.

Last words

As the saying goes: one minute on stage, ten years off stage. The content of this article is the crystallization of the author's years of work and the painstaking arrangement of his liver. If you think this article is helpful to you, please help me share it and let more people see it. Thank you.

Tags: Java Spring Redis RabbitMQ programming language Internet

Posted by rogeriobrito on Fri, 03 Jun 2022 02:09:45 +0530