[Guide to Concurrent Programming in Java] "Difficulties - Core - Omissions" Let's explore the technical principle and source code analysis of CountDownLatch together

Analysis of the working principle of CountDownLatch

1. General introduction

Then this article shares and analyzes the working principle of CountDownLatch of JDK1.8;

Simple understanding of CountDownLatch

What is CountDownLatch?

  1. CountDownLatch is understood literally, the count counts as a down subtraction action, and the Latch is the meaning of the latch;

  2. CountDownLatch is a synchronization aid that allows one or more threads to wait until a set of operations performed in other threads completes;

  3. There is no so-called fair lock/unfair lock static inner class inside CountDownLatch, there is only one Sync static inner class, and the inside of CountDownLatch is basically called by sync.xxx and the like;

  4. CountDownLatch internally maintains a virtual resource pool. If the number of permits is not 0, the thread will block and wait until the number of permits is 0 before releasing and continuing to execute;

The state keyword of CountDownLatch

  1. In fact, the implementation of CountDownLatch just makes good use of the state variable value of its parent class AQS;

  2. Initialize a number value as the default value of the counter, assuming it is N, then when any thread calls countDown once, the count value will be decremented by 1, and the wait will not be released until the permission is 0;

  3. CountDownLatch, simply means: A group of threads waits for another group of B threads, and after the execution of group B threads, group A threads can execute;

Commonly important methods

Creates a count synchronizer object given a permitted count value

public CountDownLatch(int count)

Enter the queue and wait until the counter value is 0, then release the wait

public void await()

To release the license, the counter value is decremented by 1. If the counter value is 0, it will trigger the release of useless nodes

public void countDown()

Get the current latest shared resource counter value

public long getCount()

Design and implement pseudocode

Acquire a shared lock:

  • If the interrupt status is detected and found to be interrupted, then an InterruptedException is thrown
  • If the attempt to acquire the shared lock fails (various ways to attempt to acquire the shared lock are implemented by subclasses of AQS),
  • Then add a new shared lock node to the queue through the spin operation, and then enter the blocking wait by calling LockSupport.park, and release the wait until the counter value is zero
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

Release the shared lock:

  • If the attempt to release the shared lock fails (the various ways to try to release the shared lock are implemented by subclasses of AQS),
  • Then complete the awakening operation of the blocked thread through the spin operation
public void countDown() {
        sync.releaseShared(1);
}

Detailed understanding of CountDownLatch life

For example, in the 100-meter race, I will take the race as an example to illustrate the principle of CountDownLatch in life:

  • 1. Scene: Ten people participate in the 100-meter race, and there is a referee counting at the finish line;

  • 2. When the start signal is heard, ten people rush to the finish line. It is really exciting for a few seconds; it is exciting;

  • 3. When a person reaches the finish line, this person has completed his race, and he is free to play, then the referee will subtract one person;

  • 4. With the people running to the finish line one after another, the final count of the referee shows that there are still 0 people who have not arrived, which means that all the people have reached;

  • 5. Then the referee took the registered score and entered it into the computer for registration;

  • 6. So far, this series of actions is considered to be that group A's threads are waiting for the operations of other groups of threads until the counter is zero, then A will do other things;

Source code analysis CountDownLatch

CountDownLatch constructor

Constructor source code:

    /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

2. Create a count synchronizer object with a given counter value. The counter value must be greater than zero, and the count value is finally assigned to the shared resource value of state;

Sync synchronizer

AQS --> Sync

The synchronizers in CountDownLatch operate the call relationship through the Sync abstract interface. If you look closely, you will find that they are basically called through sync.xxx and the like;

await()

source code

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * // Causes the current thread to wait until the counter value is reduced to zero and then releases the wait, or the thread is interrupted to cause the release to wait;
     *
     * <p>If the current count is zero then this method returns immediately.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of two things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
  • After the await method is called, it will always be in a waiting state, and its core is still blocked and waiting due to calling LockSupport.park;
  • When the counter value state=0, the waiting status can be broken, and of course, the thread waiting status can also be broken after the thread is interrupted;

acquireSharedInterruptibly(int)

source code:

    /**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument.
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) // Check the interrupt flag bit of the thread before calling to check whether the thread has been interrupted before
            throw new InterruptedException(); // If interrupted, an interrupt exception is thrown
        if (tryAcquireShared(arg) < 0) // Try to acquire the shared resource lock, if it is less than 0, the acquisition fails. This method is implemented by the specific subclass of AQS
            doAcquireSharedInterruptibly(arg); // Enqueue the thread trying to acquire the lock resource
    }

Since it is to realize the synchronous counter function, the first call of tryAcquireShared must be less than 0, and it will successfully enter the doAcquireSharedInterruptibly thread waiting;
As for why the first call is less than 0, please refer to the implementation of the subclass. The implementation of the subclass is judged as "(getState() == 0) ? 1 : -1";

3.5,tryAcquireShared(int)

source code

	protected int tryAcquireShared(int acquires) {
		return (getState() == 0) ? 1 : -1; // The counter value is compared with zero to judge, if it is less than zero, the lock acquisition fails, and if it is greater than zero, the lock acquisition succeeds
	}

Try to acquire shared lock resources, but in the function of counter CountDownLatch, if it is less than zero, it needs to enter the queue and enter the blocking queue to wait; if it is greater than zero, it wakes up the waiting queue and releases the blocking waiting of the await method;

doAcquireSharedInterruptibly(int)

source code

    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
		// Create a new node according to the given mode mode. There are two modes: Node.EXCLUSIVE exclusive mode and Node.SHARED shared mode;
        final Node node = addWaiter(Node.SHARED); // Create shared schema nodes
        boolean failed = true;
        try {
            for (;;) { // The infinite loop operation mode of spin
                final Node p = node.predecessor(); // Get the predecessor node of the node
                if (p == head) { // If the predecessor node is head, then it goes without saying that the current node is second only to the boss.
                    int r = tryAcquireShared(arg); // And the second child also hopes to try to acquire the lock, what if the first node happens to be released just now? There is still hope, what if it comes true?
                    if (r >= 0) { // If r>=0, it means that the shared lock resource has been successfully acquired
                        setHeadAndPropagate(node, r); // Set the current node node as the head node, and call doReleaseShared to release the useless node
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
					// But when the await method is called for the first time, it will flow to this point. At this time, it will fail to acquire the lock resource, that is, r<0, so it will enter the judgment of whether it needs to sleep
					// But the first time you enter the dormant method, because the created node waitStatus=0, it will be modified once to the SIGNAL state, and cycle again
					// When the second loop enters the shouldParkAfterFailedAcquire method, returning true means that sleep is required, and the park method is successfully called to block and wait.
                }
                if (shouldParkAfterFailedAcquire(p, node) && // See if you need to take a break according to the predecessor node
                    parkAndCheckInterrupt()) // Blocking operation, under normal circumstances, if the shared lock cannot be obtained, the code stops in this method until it is awakened
					// After being woken up, if it is found that parkAndCheckInterrupt() is detected to be interrupted, it will make up for the interrupt exception, so an exception is thrown
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

When doAcquireSharedInterruptibly implements the counter principle, the main thing to do is to wait and wait until the counter value is zero before waking up;

countDown()

source code

    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
        sync.releaseShared(1); // release a licensed resource 
    }

Release license resources, that is, the counter value is continuously decremented by 1. When the counter value is zero, this method will release all waiting thread queues; as for why all are released, please see the subsequent releaseShared(int arg )explain;

releaseShared(int)

source code:

    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { // Try to release the shared lock resource, this method is implemented by the concrete subclass of AQS
            doReleaseShared(); // Spin operation, wake up the successor node
            return true; // Return true to indicate that all threads have been released
        }
        return false; // Returning false indicates that it has not been released yet, as long as the counter value is not zero, it will return false
    }
  1. The releaseShared method first judges the return value of tryReleaseShared(arg), but as long as the counter value is not zero, it will return false, so the releaseShared method returns false immediately;

  2. So when the counter value slowly decreases to zero, it will return true immediately, then it will immediately call doReleaseShared to release all waiting thread queues;

tryReleaseShared(int)

source code:

	// Static inner class of CountDownLatch The tryReleaseShared method of the Sync class	
	protected boolean tryReleaseShared(int releases) {
		// Decrement count; signal when transition to zero
		for (;;) { // The infinite loop operation mode of spin
			int c = getState(); // Get latest counter value
			if (c == 0) // If the counter value is zero, it means that it has been reduced to zero by the CAS operation, so there is no need to do anything when reading zero in concurrent, so it returns false
				return false;
			int nextc = c-1; // Counter value minus 1 operation
			if (compareAndSetState(c, nextc)) // Through CAS comparison, if the setting is successful, return true
				return nextc == 0; // When the nextc obtained through the calculation operation is zero, the modification through CAS is successful, then it indicates that everything has been done, and all waiting thread queues need to be released
			// If the CAS fails, you don’t even need to think about it. It must be due to concurrent operations that cause the CAS to fail. Then the only thing you can do is to check whether it has been processed by other threads next time.
		}
	}

The static internal class of CountDownLatch implements the method of the parent class AQS, which is used to deal with how to release the lock. Generally speaking, if it returns a negative number, it needs to enter the blocking queue, otherwise it needs to release all waiting queues;

doReleaseShared()

The main purpose is to release all waiting queues in the thread. When the counter value is zero, this method will be called immediately, and all waiting queues will be polled by spin mode;

    /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) { // The infinite loop operation mode of spin
            Node h = head; // Each time the head node of the queue is taken out
            if (h != null && h != tail) { // If the head node is not empty and is not the tail node
                int ws = h.waitStatus; // Then get the waitStatus status value of the head node
                if (ws == Node.SIGNAL) { // If the head node is in the SIGNAL state, it means that the successor node of the head node needs to be woken up
					// Try to set the state of the head node to empty state through CAS, if it fails, continue to loop, because it is possible that other places are also releasing concurrently
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h); // Wake up the successor node of the head node
                }
				// If the head node is in the empty state, change it to the PROPAGATE state. If it fails, it may have been changed due to concurrency, and then loop again
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
			// If there is no change in the head node, it means that the above settings have been completed, you're done, and you're done.
			// If there is a change, it may be that the head node has been added or something during the operation, then a retry must be performed to ensure that the wake-up action can continue to be passed
            if (h == head)                   // loop if head changed
                break;
        }
    }

Summarize

1,With analysis AQS After the foundation, let's analyze CountDownLatch It is much faster;

2,Here I briefly summarize CountDownLatch Some characteristics of the process:
	• Manage a counter value greater than zero;
	• Every countDown once state Just subtract 1 once, until the number of licenses is equal to 0, then release all waiting threads in the queue;
	• also available through countDown/await used in combination to achieve CyclicBarrier function;

CountDownLatch usage

The CountDownLatch class provides only one constructor:

public CountDownLatch(int count) {  };  //The parameter count is the count value

Then the following three methods are the most important methods in the CountDownLatch class:

public void await() throws InterruptedException { };   //The thread that calls the await() method will be suspended, and it will wait until the count value is 0 before continuing to execute
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  //Similar to await(), except that after waiting for a certain period of time, if the count value has not changed to 0, it will continue to execute
public void countDown() { };  //Decrease the count value by 1

CountDownLatch, a synchronization helper class that allows one or more threads to wait until they complete a set of operations that are being performed in other threads.

To illustrate with an example:

package main.java.CountDownLatch; 
import java.util.concurrent.CountDownLatch;

public class countDownlatchTest {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for(int i=0;i<5;i++){
            new Thread(new readNum(i,countDownLatch)).start();
        }
        countDownLatch.await();
        System.out.println("Thread execution ends. . . .");
    }
 
    static class readNum  implements Runnable{
        private int id;
        private CountDownLatch latch;
        public readNum(int id,CountDownLatch latch){
            this.id = id;
            this.latch = latch;
        }
        @Override
        public void run() {
            synchronized (this){
                System.out.println("id:"+id);
                latch.countDown();
                System.out.println("thread group tasks"+id+"Finished, other tasks continue");
            }
        }
    }
}

Output result:

id:1
 Thread group task 1 ends, other tasks continue
id:0
 Thread group task 0 ends, other tasks continue
id:2
 Thread group task 2 ends, other tasks continue
id:3
 Thread group task 3 ends, other tasks continue
id:4
 Thread group task 4 ends, other tasks continue
 Thread execution ends. . . .

thread in countDown()After that, it will continue to perform its own tasks

Tags: Java jvm programming language

Posted by freynolds on Fri, 09 Dec 2022 13:57:38 +0530