Comprehensive analysis of multi-threaded CompletableFuture
Table of contents:
Foreword:
In a low-level Api we wrote, due to the slow query efficiency in the case of hundreds of millions of data volumes, this document records the specific usage of CompletableFuture, the technology stack used in the optimization process. The overall optimization idea is as follows in the case of a large amount of concurrent data:
-
There are transactions: Because of the influence of transactions, the use of multi-threading may not guarantee the accuracy of the data in the case of changing first and then checking, so there are the following situations (about 50 fields in the table):
-
select * from user a where a.id in(......)
-
select * from user a where (a.id in(......)) or (a.id in(......))
-
SELECT * FROM user where a.id in (...) UNION ALL SELECT * FROM user where a.id in (...)
The specific test chart will not be posted, because it is not the key point. It is the fastest to check ten fields or in of 20,000 data under ten concurrency of 500,000 data stocks. The overall execution (plus business logic) takes less than 3.5s.
-
-
No transaction: Multi-threading is used, and the conditions are the same as the above for 0.7s. At first, what I thought was to use the thread pool to create a thread every 1000 (new Thread method) according to the amount of data, but later I found that it was very troublesome to write. After the guidance of the department boss, I found that there is a very simple way of writing, using CompletableFuture for multi-threaded processing, which is also the key point I want to summarize today! ! ! ! ! ! ! ! !
Optimum number of threads:
Before talking about CompletableFuture, I would like to briefly talk about the optimal number of threads, because creating threads and destroying threads are time-consuming operations, and frequent creation and destruction of threads will waste a lot of CPU resources. In addition, if each task creates a thread to process, there will be more and more threads. We know that each thread occupies 1M memory space by default. If there are too many threads, the memory resources will be exhausted. At this time, we need a thread pool to manage threads, so that memory resources will not be exhausted, and threads will not be created and destroyed frequently, because threads can be reused internally.
So how do we determine the optimal number of threads? (There will be formula drops later ~ look at the basics first)
When we use spring cloud, we usually use ThreadPoolExecutor to set the parameters of the thread pool, so let's summarize ThreadPoolExecutor first:
1. Important parameters of ThreadPoolExecutor
-
corePoolSize: number of core threads
-
The core thread will always survive, in time there is no task to execute
-
When the number of threads is less than the number of core threads, even if there are idle threads, the thread pool will give priority to creating new threads for processing
-
When set allowCoreThreadTimeout=true (default false), the core thread will be closed by timeout
-
-
queueCapacity: task queue capacity (blocking queue)
- When the number of core threads reaches the maximum, new tasks will be queued in the queue for execution
-
maxPoolSize: maximum number of threads
-
When the number of threads >= corePoolSize and the task queue is full. The thread pool creates new threads to process tasks
-
When the number of threads = maxPoolSize and the task queue is full, the thread pool will refuse to process the task and throw an exception
-
-
keepAliveTime: thread idle time
-
When the thread idle time reaches keepAliveTime, the thread will exit until the number of threads is equal to corePoolSize
-
If allowCoreThreadTimeout=true, it will wait until number of threads=0
-
-
allowCoreThreadTimeout: Allow core threads to time out
-
rejectedExecutionHandler: Task rejection handler
-
There are two situations in which the task will be rejected:
-
- When the number of threads has reached maxPoolSize, the cut queue is full, and new tasks will be rejected
- When the thread pool is called shutdown(), it will wait for the tasks in the thread pool to finish executing, and then shut down. If a task is submitted between the call to shutdown() and the actual shutdown of the thread pool, the new task will be rejected
-
The thread pool will call rejectedExecutionHandler to handle this task. If not set, the default is AbortPolicy, an exception will be thrown
-
The ThreadPoolExecutor class has several internal implementation classes to handle such cases:
-
- AbortPolicy discards tasks and throws runtime exceptions
- CallerRunsPolicy execute tasks
- DiscardPolicy ignore, nothing will happen
- DiscardOldestPolicy kicks the task that was first queued (last executed) from the queue
-
Implement the RejectedExecutionHandler interface to customize the handler
-
Second, ThreadPoolExecutor execution sequence:
The thread pool executes tasks with the following behavior
- Threads are created when the number of threads is less than the number of core threads.
- When the number of threads is greater than or equal to the number of core threads, and the task queue is not full, the task is put into the task queue.
- When the number of threads is greater than or equal to the number of core threads, and the task queue is full
- If the number of threads is less than the maximum number of threads, create a thread
- If the number of threads is equal to the maximum number of threads, an exception is thrown and the task is rejected
3. How to set parameters
-
Defaults
- corePoolSize=1 - queueCapacity=Integer.MAX_VALUE - maxPoolSize=Integer.MAX_VALUE - keepAliveTime=60s - allowCoreThreadTimeout=false - rejectedExecutionHandler=AbortPolicy()
-
how to set
-
- need to be determined based on several values
-
- tasks : the number of tasks per second, assuming 500~1000
- taskcost: the time spent on each task, assuming 0.1s
- responsetime: the maximum response time that the system allows to tolerate, assuming 1s
- do some calculations
-
- corePoolSize = How many threads per second are required?
-
- threadcount = tasks/(1/taskcost) =tasks*taskcost= (500~1000)*0.1 = 50~100 threads. corePoolSize setting should be greater than 50
- According to the 8020 principle, if 80% of the tasks per second are less than 800, then the corePoolSize can be set to 80
- queueCapacity = (coreSizePool/taskcost)*responsetime
-
- It can be calculated that queueCapacity = 80/0.1*1 = 80. It means that the threads in the queue can wait for 1s, and if it exceeds, a new thread needs to be opened to execute
- Remember that it cannot be set to Integer.MAX_VALUE, so the queue will be very large, and the number of threads will only remain at the corePoolSize size. When the task increases sharply, new threads cannot be opened for execution, and the response time will increase sharply.
- maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
-
- Calculated to get maxPoolSize = (1000-80)/10 = 92
- (Maximum number of tasks - queue capacity) / processing capacity per thread per second = maximum number of threads
- rejectedExecutionHandler: It is determined according to the specific situation. The task is not important and can be discarded. If the task is important, some buffering mechanisms must be used to process it.
- keepAliveTime and allowCoreThreadTimeout are usually satisfied by default
-
The above are all ideal values, which should be determined according to the performance of the machine in actual situations. If the cpu load of the machine is full before the maximum number of threads is reached, you need to upgrade the hardware (hehe) and optimize the code to reduce the task cost to deal with it.
Take a chestnut:
Future:
Because we have been using Future to implement asynchronous operations before CompletableFuture, and CompletableFuture implements Future’s interface, let’s take a look at Future:
Future provides a function of asynchronous parallel computing. If the main thread needs to perform a very time-consuming computing task, we can use future to put this task on the asynchronous thread to perform, and the main thread to perform other tasks. After processing , and then use Future to get the result.
Take a chestnut:
Suppose we have two task services, one for querying the user's basic information, and one for querying the user's residential information.
public class UserInfoService { public UserInfo getUserInfo(Long userId) throws InterruptedException { Thread.sleep(300);//Time-consuming simulation call return new UserInfo("666", "Erie Egg", 27); //Generally, it is to check the database, or return from a remote call } } public class HouseService { public HouseInfo getHouseInfo(long userId) throws InterruptedException { Thread.sleep(500); //Time-consuming simulation call return new HouseInfo("666", "Azerara Community"); } }
Next, let's demonstrate how to use Future to make asynchronous calls in the main thread.
public class FutureTest { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); UserInfoService userInfoService = new UserInfoService(); HouseService houseService = new HouseService(); long userId =666L; long startTime = System.currentTimeMillis(); //Call the user service to obtain basic user information FutureTask<UserInfo> userInfoFutureTask = new FutureTask<>(new Callable<UserInfo>() { @Override public UserInfo call() throws Exception { return userInfoService.getUserInfo(userId); } }); executorService.submit(userInfoFutureTask); Thread.sleep(300); //Time-consuming to simulate other operations on the main thread FutureTask<HouseInfo> houseInfoFutureTask = new FutureTask<>(new Callable<HouseInfo>() { @Override public HouseInfo call() throws Exception { return houseService.getHouseInfo(userId); } }); executorService.submit(houseInfoFutureTask); UserInfo userInfo = userInfoFutureTask.get();//Get Personal Information Results HouseInfo houseInfo = houseInfoFutureTask.get();//Get Residential Information Results System.out.println("total time" + (System.currentTimeMillis() - startTime) + "ms"); } } ------------------------------------------------------------------------- Running result: total time 806 ms
If we do not use Future to make asynchronous calls, but serially do it in the main thread, it will take about 1100ms, so the effect of future on program execution efficiency can still be improved.
Take a closer look at the acquisition of results by future, and find that it is not very friendly to the acquisition of results. You can only get the results of tasks by blocking or polling (the source code can be searched, there are many online):
- Future.get() is a blocking call, and the get method will block until the thread gets the result.
- Future provides an isDone method, which can be polled in the program to query the execution result.
At this time, the problem was discovered. This method of blocking is different from our design concept of asynchronous programming. It is difficult to do. If polling, it will waste unnecessary CPU resources. Is there a better way? ? ? Of course, I can find that there are so many big guys, and others have already discovered it. It was launched in JDK8, which is the key CompletableFuture we will summarize this time. Why can he solve it?
Because CompletableFuture provides a mechanism similar to the observer pattern, it can notify the listening party after the task execution is completed.
CompletableFuture:
First of all, we still use the chestnut of the future, but this time choose CompletableFuture to implement:
public class FutureTest { public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { UserInfoService userInfoService = new UserInfoService(); HouseService houseService = new HouseService(); long userId =666L; long startTime = System.currentTimeMillis(); //Call the user service to obtain basic user information CompletableFuture<UserInfo> completableUserInfoFuture = CompletableFuture.supplyAsync(() -> userInfoService.getUserInfo(userId)); CompletableFuture<HouseInfo> completableHouseInfoFuture = CompletableFuture.supplyAsync(() -> houseService.getHouseInfo(userId)); Thread.sleep(300); //Time-consuming to simulate other operations on the main thread UserInfo userInfo = completableUserInfoFuture.get(2,TimeUnit.SECONDS);//Get Personal Information Results HouseInfo houseInfo = completableHouseInfoFuture.get();//Get Residential Information Results System.out.println("total time" + (System.currentTimeMillis() - startTime) + "ms"); } } ------------------------------------------------------------------------- Running result: total time 800 ms
It can be found that using CompletableFuture, the code is indeed much simpler. The supplyAsync method of CompletableFuture provides the function of asynchronous execution, and the thread pool does not need to be created separately.
Next, we can summarize the specific use of CompletableFuture.
Three usage scenarios for CompletableFuture:
- Create asynchronous tasks
- Simple task asynchronous callback
- Multitasking combined processing
Create an asynchronous task:
CompletableFuture creates asynchronous tasks. Generally, there are two methods: supplyAsync and runAsync:
- supplyAsync executes CompletableFuture tasks and supports return values.
- runAsync executes the CompletableFuture task with no return value.
supplyAsync method:
//Use the default built-in thread pool ForkJoinPool.commonPool() to execute tasks according to the supplier build public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) //A custom thread that executes tasks based on the supplier build public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
runAsync method:
//Use the default built-in thread pool ForkJoinPool.commonPool() to execute tasks according to the runnable construction public static CompletableFuture<Void> runAsync(Runnable runnable) //Custom thread, build and execute tasks according to runnable public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
Take a chestnut:
public class FutureTest { public static void main(String[] args) { //custom thread pool ExecutorService executor = Executors.newCachedThreadPool(); //Use of runAsync CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("runAsync,for the tribe"), executor); //The use of supplyAsync CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> { System.out.print("supplyAsync,for the union"); return "hahahahaha"; }, executor); //The future of runAsync has no return value and outputs null System.out.println(runFuture.join()); //The future of supplyAsync has a return value System.out.println(supplyFuture.join()); executor.shutdown(); // The thread pool needs to be closed } } ------------------------------------------------------------------------- //output runAsync,for the tribe null supplyAsync,for the alliance hahahaha
Task asynchronous callback:
There are about six usage scenarios for task asynchronous callbacks:
1.thenRun/thenRunAsync
public CompletableFuture<Void> thenRun(Runnable action); public CompletableFuture<Void> thenRunAsync(Runnable action);
The thenRun method of CompletableFuture, in layman's terms, is to do the second task after completing the first task. After a task is executed, the callback method is executed; but the two tasks before and after do not pass parameters, and the second task has no return value. For example:
public class FutureThenRunTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("hit first BOSS"); return "for the Holy Light"; } ); CompletableFuture thenRunFuture = orgFuture.thenRun(() -> { System.out.println("Rewards"); }); System.out.println(thenRunFuture.get()); } } //output hit first BOSS Rewards null
We can see the difference between thenRun and thenRunAsync through the source code:
(DiDi: the differences between thenAccept and thenAcceptAsync, thenApply and thenApplyAsync, etc. introduced later are the same as this one)
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); public CompletableFuture<Void> thenRun(Runnable action) { return uniRunStage(null, action); } public CompletableFuture<Void> thenRunAsync(Runnable action) { return uniRunStage(asyncPool, action); }
If you pass in a custom thread pool when you execute the first task:
- When the thenRun method is called to execute the second task, the second task and the first task share the same thread pool.
- When calling thenRunAsync to execute the second task, the first task uses the thread pool you passed in, and the second task uses the ForkJoin thread pool.
2.thenAccept/thenAcceptAsync
The thenAccept method of CompletableFuture indicates that after the first task is executed, the second callback method task is executed, and the execution result of the task will be passed as an input parameter to the callback method, but the callback method has no return value.
public class FutureThenAcceptTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("First CompletableFuture method task"); return "for the Holy Light"; } ); CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> { if ("for the Holy Light".equals(a)) { System.out.println("strong"); } System.out.println("oops"); }); System.out.println(thenAcceptFuture.get()); } } //output First CompletableFuture method task strong oops null
3.thenApply/thenApplyAsync
The thenApply method of CompletableFuture indicates that after the first task is executed, the second callback method task is executed, and the execution result of the task will be passed as an input parameter to the callback method, and the callback method has a return value.
public class FutureThenApplyTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("First CompletableFuture method task"); return "for the Holy Light"; } ); CompletableFuture<String> thenApplyFuture = orgFuture.thenApply((a) -> { if ("for the Holy Light".equals(a)) { return "oops"; } return "Consider first"; }); System.out.println(thenApplyFuture.get()); } } //output First CompletableFuture method task oops
4.exceptionally
The exceptionally method of CompletableFuture indicates that when a task executes abnormally, the callback method is executed; and the exception is thrown as a parameter, which is passed to the callback method.
public class FutureExceptionTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("Current thread name:" + Thread.currentThread().getName()); throw new RuntimeException(); } ); CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> { e.printStackTrace(); return "crooked? Your program is abnormal"; }); System.out.println(exceptionFuture.get()); } } //output Current thread name: ForkJoinPool.commonPool-worker-1 java.util.concurrent.CompletionException: java.lang.RuntimeException at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: java.lang.RuntimeException at cn.eovie.future.FutureWhenTest.lambda$main$0(FutureWhenTest.java:13) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ... 5 more crooked? Your program is abnormal"
5.whenComplete method
The whenComplete method of CompletableFuture indicates that after a task is executed, the callback method executed has no return value; and the result of the CompletableFuture returned by the whenComplete method is the result of the previous task.
public class FutureWhenTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("Current thread name:" + Thread.currentThread().getName()); try { Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } return "Xiao Chen"; } ); CompletableFuture<String> rstFuture = orgFuture.whenComplete((a, throwable) -> { System.out.println("Current thread name:" + Thread.currentThread().getName()); System.out.println("The last task is completed, and the" + a + "pass over"); if ("Xiao Chen".equals(a)) { System.out.println("666"); } System.out.println("233333"); }); System.out.println(rstFuture.get()); } } //output Current thread name: ForkJoinPool.commonPool-worker-1 Current thread name: ForkJoinPool.commonPool-worker-1 The last task was completed, and Xiao Chen was passed over 666 233333 Xiao Chen
6.handle
The handle method of CompletableFuture indicates that after a task is completed, the callback method is executed, and there is a return value; and the result of the CompletableFuture returned by the handle method is the result of the callback method.
public class FutureHandlerTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("Current thread name:" + Thread.currentThread().getName()); try { Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } return "Xiao Chen"; } ); CompletableFuture<String> rstFuture = orgFuture.handle((a, throwable) -> { System.out.println("The last task is completed, and the" + Xiao Chen + "pass over"); if ("Xiao Chen".equals(a)) { System.out.println("666"); return "for the Holy Light"; } System.out.println("233333"); return null; }); System.out.println(rstFuture.get()); } } //output Current thread name: ForkJoinPool.commonPool-worker-1 The last task was completed, and Xiao Chen was passed over 666 for the Holy Light
Multiple task combination processing:
AND combination relationship:
thenCombine / thenAcceptBoth / runAfterBoth all mean: combine two CompletableFuture s, and only when these two are executed normally, will a task be executed.
The difference is that:
- thenCombine: The execution results of the two tasks will be used as method parameters, passed to the specified method, and have a return value
- thenAcceptBoth: The execution results of the two tasks will be used as method parameters and passed to the specified method with no return value
- runAfterBoth : The execution result will not be used as a method parameter, and there is no return value.
public class ThenCombineTest { public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { CompletableFuture<String> first = CompletableFuture.completedFuture("first async task"); ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture<String> future = CompletableFuture //second async task .supplyAsync(() -> "second async task", executor) // (w, s) -> System.out.println(s) is the third task .thenCombineAsync(first, (s, w) -> { System.out.println(w); System.out.println(s); return "Both tasks are executed before output"; }, executor); System.out.println(future.join()); executor.shutdown(); } } //output first async task second async task Both tasks are executed before output
OR Combination of relationships:
applyToEither / acceptEither / runAfterEither all mean: combine two CompletableFuture s, as long as one of them is executed, a certain task will be executed.
The difference is that:
- applyToEither: The task that has been executed will be passed as a method parameter to the specified method, and there will be a return value
- acceptEither: The task that has been executed will be passed as a method parameter to the specified method, and there will be no return value
- runAfterEither: The execution result will not be used as a method parameter, and there is no return value.
public class AcceptEitherTest { public static void main(String[] args) { //The first asynchronous task, sleeps for 2 seconds, to ensure that it executes later CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{ try{ Thread.sleep(2000L); System.out.println("Execute the first asynchronous task");} catch (Exception e){ return "The first task is abnormal"; } return "first async task"; }); ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = CompletableFuture //second async task .supplyAsync(() -> { System.out.println("finish the second task"); return "The first task is still sleeping, this is the second task";} , executor) //third task .acceptEitherAsync(first, System.out::println, executor); executor.shutdown(); } } //output finish the second task The first task is still sleeping, this is the second task
AllOf:
After all tasks are executed, the CompletableFuture returned by allOf is executed. If any task is abnormal, the CompletableFuture of allOf, executes the get method, will throw an exception
public class allOfFutureTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> a = CompletableFuture.runAsync(()->{ System.out.println("I'm done"); }); CompletableFuture<Void> b = CompletableFuture.runAsync(() -> { System.out.println("I also finished"); }); CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((m,k)->{ System.out.println("finish"); }); } } //output I'm done I also finished finish
AnyOf:
After any task is executed, the CompletableFuture returned by anyOf is executed. If the executed task is abnormal, the CompletableFuture of anyOf, execute the get method, an exception will be thrown
public class AnyOfFutureTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> a = CompletableFuture.runAsync(()->{ try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("I'm done"); }); CompletableFuture<Void> b = CompletableFuture.runAsync(() -> { System.out.println("I also finished"); }); CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m,k)->{ System.out.println("finish"); // return "The little boy who picked up the snails"; }); anyOfFuture.join(); } } //output I also finished finish
thenCompose:
The thenCompose method will use the execution result of the task as a method parameter to execute the specified method after the execution of a task is completed. This method returns a new CompletableFuture instance
- If the result of the CompletableFuture instance is not null, return a new CompletableFuture instance based on the result;
- If the CompletableFuture instance is null, then execute the new task
public class ThenComposeTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> f = CompletableFuture.completedFuture("first task"); //second async task ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> "second task", executor) .thenComposeAsync(data -> { System.out.println(data); return f; //use the first task as return }, executor); System.out.println(future.join()); executor.shutdown(); } } //output second task first task
Part of the content of this article refers to the public account Little boy picking up snails
Thanks for the info dude