Fork/Join framework JUC

(1) Fork/Join framework explanation

Fork/Join is a framework provided by java7 for parallel task execution. It is a framework that divides a large task into several small tasks. Finally, summarize the results of each small task to get the framework of large task results

Operation flow chart, as shown below

  

 

(2) Work stealing algorithm

Work stealing algorithm: refers to a thread stealing tasks from other queues to execute.

For example, if you need to do a relatively large task, you can divide the task into several independent subtasks. In order to reduce the competition between threads, put these subtasks in different queues,

A separate thread is created for each queue to execute the tasks in the queue. The thread and queue correspond to each other one by one.

 

When a thread finishes the task in its own queue, other threads have not finished, so it steals a task from other queues to execute.

At this time, they will access the same queue at the same time. In order to reduce competition, dual end queues are usually used

The thread of the stolen task takes the task from the head to execute

Steal the thread of the task and execute the task from the tail

 

Advantages: make full use of threads for parallel computing and reduce the competition between threads

Disadvantages: when there is only one task left in the queue, competition will still occur, and the algorithm consumes a lot of resources, such as creating multiple threads and multiple double ended queues

Double ended queue: a data structure with the properties of queue and stack. Elements in a double ended queue can be popped from both ends, and their restricted insertion and deletion operations are performed at both ends of the table.

(3) Fork/Join framework design

  • Split task

Divide a large task into small tasks

  • Execute tasks and merge results

The split subtasks are placed in several double ended queues, and then the mechanism of starting several threads to take the tasks from the double ended queues to perform fork() and join() operations

ForkJoinTask: instead of using this directly, use its subclasses

RecursiveAction: used for tasks that do not return results

RecursiveTask: used for tasks with returned results

ForkJoinPool: ForkJoinTask needs to be executed through ForkJoinPool

    

The subtasks separated from the task will be added to the double ended queue maintained by the current worker thread and enter the head of the queue. When there is no task in the queue of a worker thread for the time being, it will randomly obtain a task from the tail of the queue of other worker threads.

(4) Using the Fork/Join framework

Calculation 1+2+3+4

package com.example.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * @author zhangchaocai
 * @create 2020-07-05 13:56
 */
public class CountTask extends RecursiveTask<Integer> {

    private static final int THRESHOLD = 2; //threshold

    private int start;

    private int end;

    public CountTask(int start,int end){
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;

        //Calculate tasks if they are small enough
        boolean canCompute = (end -start) <= THRESHOLD;

        if(canCompute){
            for(int i = start ; i <= end ; i++){
                sum += i;
            }
        }else{
            //If the task is greater than the threshold, it will be split into two subtasks for calculation
            int middle = (start + end) / 2;
            CountTask leftTask = new CountTask(start,middle);
            CountTask rightTask = new CountTask(middle + 1, end);

            //Execute subtasks
            leftTask.fork();
            rightTask.fork();

            //Wait for the execution of subtasks and get their results
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            //Merge subtasks
            sum = leftResult + rightResult;
        }

        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        //Generate a calculation task to calculate 1+2+3+4
        CountTask task = new CountTask(1, 4);

        //Perform a task
        ForkJoinTask<Integer> result = forkJoinPool.submit(task);
        
        
        //Check whether the task has thrown an exception or has been canceled
        //ForkJoinTask Exceptions may be thrown during execution, but there is no way to catch exceptions directly in the main thread
        //ForkJoinTash Provided getException()
        boolean normally = result.isCompletedAbnormally();
        
        try {
            System.out.println(result.get());
        }catch (InterruptedException e){
            e.printStackTrace();
        }catch (ExecutionException e){
            e.printStackTrace();
        }
    }
}

 

 

(V) Fork/Join implementation principle

The ForkJoinPool consists of the ForkJoinTask array and the ForkJoinWorkerThread array
ForkJoinTask array: stores the tasks submitted by the program to the ForkJoinPool

ForkJoinWorkerThread array: responsible for performing these tasks
 
  • fork method of ForkJoinTask

    

 

  • join method of ForkJoinTask

      

 

The doJoin() method is called in the join. The status of the current task is obtained through the doJoin() method to determine the returned results. There are four task statuses

Completed (NORMAL)

Cancelled (CANCELLED)

Signal

Exception occurred

If the task is completed, the task result will be returned directly.

If the task status is canceled, a CancelationException will be thrown directly

If the task status is to throw an exception, the corresponding exception will be thrown directly

     

 

 

 

 

 

It thunders and rains every day, but the weather is very cool

 

Posted by micheal_newby on Thu, 02 Jun 2022 00:33:08 +0530