CountDownLatch Latch
related introduction
Function: It means that one or a group of threads must wait until other threads have finished executing before starting to execute operations. Let's give an example to illustrate that during the exam, the teacher has to wait until everyone has handed in the test papers before leaving. At this time, the teacher is equivalent to the waiting thread, and the student is like the execution thread.
Use Cases
CountDownLatch countDownLatch = new CountDownLatch(2); System.out.println("Start testing, testers are ready"); new Thread(() -> { try { Thread.sleep(3000); System.out.println("Tester 1 submits test results"); } catch (InterruptedException e) { e.printStackTrace(); }finally { countDownLatch.countDown(); } }).start(); new Thread(() -> { try { Thread.sleep(7000); System.out.println("Tester 2 submits test results"); } catch (InterruptedException e) { e.printStackTrace(); }finally { countDownLatch.countDown(); } }).start(); new Thread(() -> { try { countDownLatch.await(); System.out.println("Start summarizing test results and submit test reports"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); // output result // Start testing, testers are ready // Tester 1 submits test results // Tester 2 submits test results // Start summarizing test results and submit test reports
underlying principle
/** * As we can see above, CountDownLatch mainly uses the countDown method to subtract 1, and the await method to wait until the operation. * Let's go to the source code to see. This source code is based on jdk1.8. Special description here. * -------------------------------------------------------------------------------------- * Decrements the count of the latch, releasing all waiting threads if * the count reaches zero. * Decrements the count of the lock, and if the count reaches zero, releases all waiting threads. * <p>If the current count is greater than zero then it is decremented. * If the new count is zero then all waiting threads are re-enabled for * thread scheduling purposes. * If the current count is greater than zero, decrement it. If the new count is zero, all waiting threads are re-enabled for thread scheduling purposes. * <p>If the current count equals zero then nothing happens. * If the current count is zero, nothing happens. */ public void countDown() { sync.releaseShared(1); }
/** * Here we find that Sync inherits AbstractQueuedSynchronizer (AQS). * AQS One of its functions is to maintain thread state and acquire and release locks. This means that CountDownLatch uses the AQS mechanism to maintain the lock status. * The releaseShared(1) method releases a shared lock. * ------------------------------------------------------------------------------------------ * Synchronization control For CountDownLatch. * Uses AQS state to represent count. * CountDownLatch synchronization control. Use the AQS state to represent the count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) // Atomically sets the synchronization state to the given update value nextc if the current state value is equal to the expected value c. return nextc == 0; } } }
/** * Causes the current thread to wait until the latch has counted down to * zero, unless the thread is {@linkplain Thread#interrupt interrupted}. * Causes the current thread to wait until the latch countdown reaches zero, unless the thread is interrupted. * * <p>If the current count is zero then this method returns immediately. * If the current count is zero, this method returns immediately. * * <p>If the current count is greater than zero then the current * thread becomes disabled for thread scheduling purposes and lies * dormant until one of two things happen: * If the current count is greater than zero, the current thread is disabled for thread scheduling purposes and lies dormant until one of the following two things happens: * <ul> * <li>The count reaches zero due to invocations of the * {@link #countDown} method; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread. * </ul> * The count reached zero due to a call to the countDown method; or Some other thread interrupted the current thread. * <p>If the current thread: * <ul> * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting, * </ul> * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * or Interrupted while waiting, then throws InterruptedException and clears the interrupted status of the current thread. * @throws InterruptedException if the current thread is interrupted * while waiting * InterruptedException – If the current thread is blocked while waiting */ public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
CyclicBarrier fence
related introduction
- A fence is similar to a lock, which blocks a group of threads until an event occurs. The key difference between a fence and a lock is that all threads must reach the fence position at the same time to continue execution. Latches are used to wait for events, while fences are used to wait for other threads.
- CyclicBarrier can make a certain number of threads repeatedly gather at the fence position. When the thread reaches the fence position, the await method will be called, and this method will block until all threads have reached the fence position. If all threads reach the fence position, the fence will be opened, all threads will be released at this time, and the fence will be reset for the next use.
Use Cases
public class CyclicBarriesTest { public static void main(String[] args) { int playerCount = 3; CyclicBarrier cyclicBarrier = new CyclicBarrier(playerCount); for (int i = 0; i < playerCount; i++) { Player player = new Player(cyclicBarrier); player.setName(String.valueOf(i)); player.start(); } } static class Player extends Thread { CyclicBarrier cyclicBarrier; public Player(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; //Control multiple player s to use the same fence object } @Override public void run() { try { String name = Thread.currentThread().getName(); System.out.println("player "+name+" start preparing......"); int time = new Random().nextInt(10); Thread.sleep(time * 1000); System.out.println("player "+name+" Ready......"); cyclicBarrier.await(); //Blocks waiting for other threads to reach the fence System.out.println("player "+name+" enter the game......"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } } // operation result: // Player 0 starts preparing... // Player 2 starts preparing... // Player 1 starts preparing... // Player 0 is ready... // Player 1 is ready... // Player 2 is ready... // Player 2 enters the game... // Player 0 enters the game... // Player 1 enters the game...
underlying principle
/** The lock for guarding barrier entry */ /** The lock that guards the entrance to the fence */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ /** The number of threads participating in blocking */ private final Condition trip = lock.newCondition(); /** The number of parties */ /** The number of threads participating in blocking */ private final int parties; /** The command to run when tripped */ /** Tasks performed when a fence is knocked down */ private final Runnable barrierCommand; /** The current generation */ /** current generation */ private Generation generation = new Generation(); /** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. * The number of threads still waiting, each generation counts down from each side to 0, it resets on a new generation or on corruption. */ private int count; // CyclicBarrier internally uses two classes, ReentrantLock and Condition. It has two constructors: public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } // Each thread uses the await() method to tell the CyclicBarrier that I have reached the barrier, and then the current thread is blocked. public int await() throws InterruptedException, BrokenBarrierException { try { // no timeout wait return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } /* dowait(boolean, long)The main logic processing of the method is relatively simple. If the thread is not the last thread to call the await method, it will always be in the waiting state unless the following situations occur: * 1.The last thread arrives, i.e. index == 0 * 2.A participating thread waits for a timeout * 3.A participating thread is interrupted * 4.The reset() method of CyclicBarrier is called. This method will reset the barrier to its initial state */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // acquire an exclusive lock final ReentrantLock lock = this.lock; lock.lock(); try { // current generation final Generation g = generation; // If this generation is corrupted, an exception is thrown if (g.broken) throw new BrokenBarrierException(); // If the thread is interrupted, an exception is thrown if (Thread.interrupted()) { // set the damaged state to true // and notify other threads blocked on this fence breakBarrier(); throw new InterruptedException(); } // get subscript int index = --count; // If it is 0, it means that the last thread called the method if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; // Executing fence tasks if (command != null) command.run(); ranAction = true; // Update the new generation, reset the count, and reset the generation // A thread that was waiting before waking up nextGeneration(); return 0; } finally { // If the fence task fails, set the damage state to true if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { // If there is no time limit, wait directly until it is woken up if (!timed) trip.await(); // If there is a time limit, wait for the specified time else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // The current generation is not corrupted if (g == generation && ! g.broken) { // let the fence fail breakBarrier(); throw ie; } else { // If the above conditions are not met, it means that this thread is not from this generation // It will not affect the execution of the current generation of fences, so mark an interruption Thread.currentThread().interrupt(); } } // When any thread is interrupted, the breakBarrier method will be called // It will wake up other threads, and after other threads wake up, they will also throw an exception if (g.broken) throw new BrokenBarrierException(); // g != generation indicates that the generation is normal, and returns the subscript of the fence where the current thread is located // If g == generation, it means that the generation has not been replaced, so why did it wake up? // Because a thread can use multiple fences, when other fences wake up this thread, it will go here, so it needs to judge whether it is the current generation. // It is for this reason that generation is needed to ensure correctness. if (g != generation) return index; // If there is a time limit and the time is less than or equal to 0, destroy the fence and throw an exception if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // release exclusive lock lock.unlock(); } }
Locks and Fences
difference between the two
- Number of uses: The counter of CountDownLatch can only be used once, while the counter of CyclicBarrier can be reset using the reset() method and can be used multiple times, so CyclicBarrier can handle more complex scenarios.
- Application scenario: CountDownLatch allows one or more threads to wait for a set of events to be generated, while CyclicBarrier is used to wait for other threads to run to the barrier position.
- Provide method: CyclicBarrier also provides some other useful methods, such as the getNumberWaiting() method to obtain the number of threads blocked by CyclicBarrier, and the isBroken() method to know whether the blocked thread is interrupted.