Source code analysis from scratch (threads)

preface

This article is mainly a supplement to the previously omitted parts of the multithreading source code, mainly including:

  • Callable and FutureTask
  • condition
  • Countdownlatch
  • Cyclicbarrier
  • Semaphore (semaphore)

Thread with return value

Callable

When learning about the thread pool, we found that the ThreadPoolExecutor does not have a submit method, but uses an adapter pattern in the abstract class to convert Runnable into a Callable interface.
Let's first look at the Callable interface and ask me to put it together with the Runnable interface for comparison.
You can see that run is a virtual method and has no return value, but the call method not only has a return value, but also can throw exceptions.

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

FutureTask

Let's take a direct look at the application of Callable - FutureTask.

Inheritance and interfaces

Let's first look at inheritance and interfaces according to the Convention. FutureTask implements the RunnableFuture interface, which in turn inherits from Runnable and future<v>. So what is the future<v>?

public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V> {
	void run();
}

Generally speaking, Future is a wrapper class that stores the execution results and status of the thread. Calling the get method will block the thread until it exits (execution is completed, exceptions) or times out.

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Member variable

Both member variables and threads have states

//Thread status
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

/** Callable called by the bottom layer, cleared after running */
private Callable<V> callable;
/** Return results*/
private Object outcome; // non-volatile, protected by state reads/writes
/** The reference of the current execution thread, which is used to cancel or interrupt */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

Constructor

FutureTask can be passed in not only label but also runnable. If the latter is passed in, it will be passed through executors Convert with callable method

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

The method in the executor is the adapter mode we mentioned earlier. The adapter inherits the Callable interface. Using the adapter mode, you need to pass in the returned object.

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

apply

Code source: https://blog.csdn.net/qq_40685275/article/details/99838677

public static void main(String[] args){
	FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task());
	Thread thread = new Thread(futureTask);
	thread.setName("Task thread");
	thread.start();
	
	int result = 0;
	try {
	    // 5. call the get() method to get the task result. If the task is not completed, block and wait
	    result = futureTask.get();
	} catch (Exception e) {
	    e.printStackTrace();
	}
	System.out.println("result is " + result);
}

 
// 1. inherit the Callable interface and implement the call() method. The generic parameter is the type to be returned
static class Task  implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        int result = 0;
        for(int i = 0; i < 100;++i) {
            result += i;
        }
        Thread.sleep(3000);
        return result;
    }

Condition

Condition is essentially an interface. Let's look directly at the methods in the interface.

public interface Condition {
   //Make the current thread join the await() waiting queue and release the current lock. When other threads call signal(), they will re request the lock. And object Wait() is similar.
    void await() throws InterruptedException;
     //The precondition for calling this method is that the current thread has successfully obtained the reentry lock bound to the condition object. Otherwise, an IllegalMonitorStateException will be thrown when calling this method.
    //After calling this method, the only way to end the wait is for other threads to call the signal() or signalALL() method of the condition object. If the current thread is interrupted during the waiting process, the method will continue to wait, while maintaining the interrupted state of the thread. 
    void awaitUninterruptibly();
    // The precondition for calling this method is that the current thread has successfully obtained the reentry lock bound to the condition object. Otherwise, an IllegalMonitorStateException will be thrown when calling this method.
    //nanosTimeout specifies the maximum time (in nanoseconds) that the method waits for a signal. If signal() or signalALL() is received within the specified time, it returns nanosTimeout minus the waiting time;
    //If another thread interrupts the thread within the specified time, an InterruptedException will be thrown and the interrupt status of the current thread will be cleared; If no notification is received within the specified time, 0 or a negative number is returned. 
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    //Wake up a thread in the await() wait queue. And object Notify() similar
    void signal();
    void signalAll();
}

Generally speaking, Condition needs to be used with Lock. Generally, object Wait() applies to the listening object, while Condition Await() applies to AQS synchronization.
Code source: https://www.cnblogs.com/gemine/p/9039012.html

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionUseCase {

    public Lock lock = new ReentrantLock();
    public Condition condition = lock.newCondition();

    public static void main(String[] args)  {
        ConditionUseCase useCase = new ConditionUseCase();
        ExecutorService executorService = Executors.newFixedThreadPool (2);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                useCase.conditionWait();
            }
        });
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                useCase.conditionSignal();
            }
        });
    }

    public void conditionWait()  {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "Got the lock");
            System.out.println(Thread.currentThread().getName() + "Waiting signal");
            condition.await();
            System.out.println(Thread.currentThread().getName() + "Get the signal");
        }catch (Exception e){

        }finally {
            lock.unlock();
        }
    }
    public void conditionSignal() {
        lock.lock();
        try {
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() + "Got the lock");
            condition.signal();
            System.out.println(Thread.currentThread().getName() + "Signal");
        }catch (Exception e){

        }finally {
            lock.unlock();
        }
    }

}

Synchronization tool

CountDownLatch

The code of CountDownLatch is very short and the principle is very simple, so let's take a rough look.
Similar to ReentrantLock, CountDownLatch implements a sync synchronizer internally, but mainly uses the sharing mode

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
    Sync(int count) {
        setState(count);
    }
    int getCount() {
        return getState();
    }
    protected int tryAcquireShared(int acquires) {
    	//Attempt to acquire shared lock
        return (getState() == 0) ? 1 : -1;
    }
    //Try to release the lock. When it decreases to 0, the doRelease method in AQS will be called
    //The queue header node will be awakened in the doRelease method
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

The constructor only initializes the Sync synchronizer and sets a state that is not 0.

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

Let's look at the await() and countDown() methods:

public void await() throws InterruptedException {
	//Try to acquire the shared lock, and throw an exception if it is interrupted
    sync.acquireSharedInterruptibly(1);
}
public void countDown() {
    sync.releaseShared(1);
}

At this point, it is not difficult to guess the implementation principle of CountDownLatch:

  • Set state as a positive integer during initialization
  • When the await() method is called, it is found that the state is not 0 and the shared lock cannot be obtained, so it is added to the section waiting queue.
  • Every time the countDown() method is called, the state will be reduced through CAS. When the state is 0, the head node will wake up.
  • The head node finds that it is a shared node, passes the wake-up operation back, and all the nodes in the queue are awakened.

CyclicBarrier

CyclicBarrier is different from the synchronization tool mentioned before. Instead of implementing a Sync synchronization queue, it is implemented through reentrant lock ReentrantLock.

Member variable

private final ReentrantLock lock = new ReentrantLock();
//Conditional wakeup
private final Condition trip = lock.newCondition();
//Total number of participating threads
private final int parties;
//If you want to do something before releasing all threads, you can pass in this parameter
private final Runnable barrierCommand;
//Indicator to indicate whether the fence needs to be broken
private Generation generation = new Generation();
//Number of threads to wait for
private int count;

Construction method

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
    this(parties, null);
}

await

CyclicBarrier enables all threads to call the await() method. The invoked threads will be blocked there. The execution will not continue until all threads have reached the await() method.
So let's look at his await() method directly. The await() method only calls the dowait() method.

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
private int dowait(boolean timed, long nanos)
   throws InterruptedException, BrokenBarrierException,
          TimeoutException {
   final ReentrantLock lock = this.lock;
   //Acquire lock
   lock.lock();
   try {
       final Generation g = generation;
		//Break the fence
       if (g.broken)
           throw new BrokenBarrierException();
		//If one thread is interrupted, release the other threads
       if (Thread.interrupted()) {
           breakBarrier();
           throw new InterruptedException();
       }
       //Being called indicates that a thread has reached the await method
       int index = --count;
       //If the number of threads to wait is 0, release all threads
       if (index == 0) {  // tripped
           boolean ranAction = false;
           try {
               final Runnable command = barrierCommand;
               if (command != null)
                   command.run();
               ranAction = true;
               //Wake up other threads and reset the status
               nextGeneration();
               return 0;
           } finally {
               if (!ranAction)
                   breakBarrier();
           }
       }

       // Otherwise, cycle and wait
       for (;;) {
           try {
               if (!timed)
               		//Blocking, waiting for wakeup
               		//At this time, the lock will be released
                   trip.await();
               else if (nanos > 0L)
                   nanos = trip.awaitNanos(nanos);
           } catch (InterruptedException ie) {
               if (g == generation && ! g.broken) {
                   breakBarrier();
                   throw ie;
               } else {
                   // We're about to finish waiting even if we had not
                   // been interrupted, so this interrupt is deemed to
                   // "belong" to subsequent execution.
                   Thread.currentThread().interrupt();
               }
           }

           if (g.broken)
               throw new BrokenBarrierException();

           if (g != generation)
               return index;

           if (timed && nanos <= 0L) {
               breakBarrier();
               throw new TimeoutException();
           }
       }
   } finally {
       lock.unlock();
   }
}
private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

Therefore, it is not difficult to understand the principle of CyclicBarrier. Only one thread can hold the fence through re-entry lock. When the thread finds that it is not the last thread, it will call the await() method to give up the lock. After all threads arrive, condition Signalall() to wake up all waiting threads.

Semaphore

Inner class

Since the member variable of Semaphore has only one Sync object, let's look directly at how it implements the synchronizer.
You can see that the semaphore is also a synchronizer based on shared lock.
Different from the read-write lock learned before, in the semaphore, state represents how many threads are allowed to enter, rather than how many threads currently hold the lock.

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
    //Set up resources
    Sync(int permits) {
        setState(permits);
    }

    final int getPermits() {
        return getState();
    }

    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            //Try to obtain resources. If the number of remaining resources is less than 0, it will enter AQS
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }

    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }

    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

Construction method

Semaphores also have two ways: fair and unfair.

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

The above code can summarize the principle of semaphores: set a status to mark the number of threads that can still enter. If the remaining number is less than 0, it will enter the AQS and wait.
When a resource is released, because it is a shared lock, it will wake up all waiting threads in the queue. The threads compete again and continue to wait without obtaining the resource.

Posted by rawb on Mon, 30 May 2022 02:38:22 +0530