Concurrent execution method using thread pool in java

During the development process, we often encounter the concurrent execution of a method. When searching for answers on the Internet, it seems that Thread creates a Thread, or first gives you a set of JMM, memory between threads, and message communication mechanisms.
This is a good way to consolidate knowledge. If it is like a case of multi-threaded execution method, a large number of principle introductions are time-consuming and laborious, and even lead to doubt about your level.

Now there is a business requirement: I want to get the data in the two types of story and joke. In case of serial operation
Query the story, and then query the joke.
It doesn't matter if the amount of data is small. If the amount of data is large, it takes one hour to query the story and one hour to query the job. The serial operation takes two hours and the concurrent operation takes less than one hour (measured environment).

First create a thread pool

private ExecutorService cacheExecutor = Executors.newCachedThreadPool();

Query story and query joke, serial operation:

List<String> storySubTypeList = countStory("story");
List<String> jokeSubTypeList = countStory("joke");

Then we will stop serializing and let them execute concurrently. Intuitively, the speed is very fast and can be doubled.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

//query story Then manipulate the data
cacheExecutor.execute(new Runnable() {
    @Override
    public void run() {
        List<String> storySubTypeList = countStory("story");
        for(int i=1; i<=6; i++) {
            //Query method. In order to display intuitively, I wrap this line of code.
            log.info("call "+this.getClass()+"."
            +Thread.currentThread().getStackTrace()[1].getMethodName()
            +"inputParam:"+storySubTypeList.get(i));
            //Query method. In order to display intuitively, I wrap this line of code.
            List<String> ids = 
            storySearchDao.searchIds("story", 
            storySubTypeList.get(i), Common.STORY_ID_LIST_MAXNUM, 0);

            storyTypeMap.put(storySubTypeList.get(i), ids);
        }
    }
});
//query joke Then manipulate the data
cacheExecutor.execute(new Runnable() {
    @Override
    public void run() {
         List<String> jokeSubTypeList = countStory("joke");
        for(int i=1; i<=6; i++) {
            //Print the log. In order to display intuitively, I have changed this line of code.
            log.info("call "+this.getClass()+"."
            +Thread.currentThread().getStackTrace()[1].getMethodName()
            +"inputParam:"+jokeSubTypeList.get(i));
            //Query method. In order to display intuitively, I wrap this line of code.
            List<String> ids = 
            storySearchDao.searchIds("story", 
            jokeSubTypeList.get(i), Common.STORY_ID_LIST_MAXNUM, 0);

            storyTypeMap.put(jokeSubTypeList.get(i), ids);
        }
    }
});

In the code, the shapes are: storyTypeMap, storySearchDao, countStory, so don't worry about the meaning. Here, I express how to quickly implement thread pool creation tasks, and then use threads.

It is introduced in the book "the art of Java Concurrent Programming":
(the following content can be read or not. I suggest reading it. I think it is a particularly good summary of this book.)
Through the tool class Executors of the Executor framework, three types of threadpoolexecutors can be created

FixedThreadPool

FixedThreadPool A thread pool called a reusable fixed number of threads. FixedThreadPool of corePoolSize and maximumPoolSize Are set to create FixedThreadPool Parameters specified when nThreads
 When the number of threads in the thread pool is greater than corePoolSize When, keepAliveTime The maximum time to wait for new tasks for redundant idle threads. After this time, the redundant threads will be terminated. Here keepAliveTime Set to 0 L,This means that redundant idle threads will be terminated immediately

 

For the explanation in the figure:
1) If the number of threads currently running is less than corePoolSize, a new thread is created to execute the task.
2) After the thread pool is warmed up (the number of currently running threads is equal to the corePoolSize), add the task to the
LinkedBlockingQueue.
3) After the thread finishes executing the task in 1, it will repeatedly obtain the task from LinkedBlockingQueue in the loop for execution.

FixedThreadPool uses the unbounded queue LinkedBlockingQueue as the work queue of the thread pool (the capacity of the queue is integer.max\u value). Using unbounded queues as work queues has the following effects on the thread pool.
1) When the number of threads in the thread pool reaches corePoolSize, the new task will wait in the unbounded queue, so
The number of threads for will not exceed corePoolSize.
2) Because of 1, maximumPoolSize will be an invalid parameter when using unbounded queues.
3) Because of 1 and 2, keepAliveTime will be an invalid parameter when using unbounded queues.
4) Due to the use of unbounded queues, the running FixedThreadPool (the unexecuted method shutdown() or shutdownNow()) will not reject the task (the RejectedExecutionHandler.rejectedExecution method will not be called).

SingleThreadExecutor

SingleThreadExecutor Is to use a single worker Threaded Executor. FixedThreadPool Same.
SingleThreadExecutor Use unbounded queues LinkedBlockingQueue Work queue as thread pool (capacity of queue is Integer.MAX_VALUE).
SingleThreadExecutor The impact of using unbounded queue as work queue on thread pool FixedThreadPool identical

 

 

 

CachedThreadPool

CachedThreadPool Is a thread pool that creates new threads as needed.
CachedThreadPool of corePoolSize Is set to 0, i.e corePool Empty; maximumPoolSize Set to Integer.MAX_VALUE,Namely maximumPool Is unbounded. Here keepAliveTime Set to 60 L,Means CachedThreadPool The maximum time for idle threads in to wait for new tasks is 60 seconds,
Idle threads will be terminated after 60 seconds. FixedThreadPool and SingleThreadExecutor Use unbounded queues LinkedBlockingQueue A work queue that is a thread pool. CachedThreadPool Using a without capacity SynchronousQueue Task force as thread pool Column, but CachedThreadPool of maximumPool Is unbounded. This means that if the main thread submits tasks faster than maximumPool The speed at which a task is processed by a thread in, CachedThreadPool New threads are constantly created. In extreme cases, CachedThreadPool Will be exhausted by creating too many threads CPU And memory resources.

 

 

Note: if this thread pool is not used in actual development, it will be more elegant to use the thread pool +async with adjustable parameters to execute tasks

 

Posted by howtoholdem on Mon, 30 May 2022 11:03:18 +0530