(1) Fork/Join framework explanation
Fork/Join is a framework provided by java7 for parallel task execution. It is a framework that divides a large task into several small tasks. Finally, summarize the results of each small task to get the framework of large task results
Operation flow chart, as shown below
(2) Work stealing algorithm
Work stealing algorithm: refers to a thread stealing tasks from other queues to execute.
For example, if you need to do a relatively large task, you can divide the task into several independent subtasks. In order to reduce the competition between threads, put these subtasks in different queues,
A separate thread is created for each queue to execute the tasks in the queue. The thread and queue correspond to each other one by one.
When a thread finishes the task in its own queue, other threads have not finished, so it steals a task from other queues to execute.
At this time, they will access the same queue at the same time. In order to reduce competition, dual end queues are usually used
The thread of the stolen task takes the task from the head to execute
Steal the thread of the task and execute the task from the tail
Advantages: make full use of threads for parallel computing and reduce the competition between threads
Disadvantages: when there is only one task left in the queue, competition will still occur, and the algorithm consumes a lot of resources, such as creating multiple threads and multiple double ended queues
Double ended queue: a data structure with the properties of queue and stack. Elements in a double ended queue can be popped from both ends, and their restricted insertion and deletion operations are performed at both ends of the table.
(3) Fork/Join framework design
- Split task
Divide a large task into small tasks
- Execute tasks and merge results
The split subtasks are placed in several double ended queues, and then the mechanism of starting several threads to take the tasks from the double ended queues to perform fork() and join() operations
ForkJoinTask: instead of using this directly, use its subclasses
RecursiveAction: used for tasks that do not return results
RecursiveTask: used for tasks with returned results
ForkJoinPool: ForkJoinTask needs to be executed through ForkJoinPool
The subtasks separated from the task will be added to the double ended queue maintained by the current worker thread and enter the head of the queue. When there is no task in the queue of a worker thread for the time being, it will randomly obtain a task from the tail of the queue of other worker threads.
(4) Using the Fork/Join framework
Calculation 1+2+3+4
package com.example.forkjoin; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; /** * @author zhangchaocai * @create 2020-07-05 13:56 */ public class CountTask extends RecursiveTask<Integer> { private static final int THRESHOLD = 2; //threshold private int start; private int end; public CountTask(int start,int end){ this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; //Calculate tasks if they are small enough boolean canCompute = (end -start) <= THRESHOLD; if(canCompute){ for(int i = start ; i <= end ; i++){ sum += i; } }else{ //If the task is greater than the threshold, it will be split into two subtasks for calculation int middle = (start + end) / 2; CountTask leftTask = new CountTask(start,middle); CountTask rightTask = new CountTask(middle + 1, end); //Execute subtasks leftTask.fork(); rightTask.fork(); //Wait for the execution of subtasks and get their results int leftResult = leftTask.join(); int rightResult = rightTask.join(); //Merge subtasks sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); //Generate a calculation task to calculate 1+2+3+4 CountTask task = new CountTask(1, 4); //Perform a task ForkJoinTask<Integer> result = forkJoinPool.submit(task); //Check whether the task has thrown an exception or has been canceled //ForkJoinTask Exceptions may be thrown during execution, but there is no way to catch exceptions directly in the main thread //ForkJoinTash Provided getException() boolean normally = result.isCompletedAbnormally(); try { System.out.println(result.get()); }catch (InterruptedException e){ e.printStackTrace(); }catch (ExecutionException e){ e.printStackTrace(); } } }
(V) Fork/Join implementation principle
The ForkJoinPool consists of the ForkJoinTask array and the ForkJoinWorkerThread array
ForkJoinTask array: stores the tasks submitted by the program to the ForkJoinPool
ForkJoinWorkerThread array: responsible for performing these tasks
- fork method of ForkJoinTask
- join method of ForkJoinTask
The doJoin() method is called in the join. The status of the current task is obtained through the doJoin() method to determine the returned results. There are four task statuses
Completed (NORMAL)
Cancelled (CANCELLED)
Signal
Exception occurred
If the task is completed, the task result will be returned directly.
If the task status is canceled, a CancelationException will be thrown directly
If the task status is to throw an exception, the corresponding exception will be thrown directly
It thunders and rains every day, but the weather is very cool