overview
Friends who know a little bit about concurrent source code know that the implementation of many concurrent tools such as ReentrantLock and CountdownLatch relies on AQS, the full name
AbstractQueuedSynchronizer.
AQS is a simple framework that provides atomic management of synchronization state, blocking and waking up threads, and a queue model. Generally speaking, synchronization tools implement lock control into exclusive locks and shared locks, and AQS provides support for these two modes.
Exclusive lock: Also called exclusive lock, that is, the lock can only be acquired by one thread. If one thread acquires the lock, other threads that want to acquire the lock can only wait until the lock is released. For example, write locks, for write operations, can only be performed by one thread at a time. If multiple threads perform write operations at the same time, thread safety issues are likely to occur, such as ReentrantLock in jdk.
Shared lock: The lock can be acquired by multiple threads at the same time. Once the lock is acquired, the lock counter is +1. The more typical one is the read lock. The read operation will not produce side effects, so multiple threads can be allowed to read the data at the same time without thread safety issues. Of course, the premise is that no thread is writing during the process. Such as ReadWriteLock and CountdownLatch.
This article focuses on AQS's support for the exclusive lock mode.
Example of custom exclusive lock
First, we customize a very simple exclusive lock synchronizer demo to understand the use of AQS.
public class ExclusiveLock implements Lock { // Synchronizer, inherited from AQS private static class Sync extends AbstractQueuedSynchronizer { // Override the way to acquire locks @Override protected boolean tryAcquire(int acquires) { assert acquires == 1; // The way of cas to grab the lock if(compareAndSetState(0, 1)) { // Set the thread that preempts the lock as the current thread setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int releases) { assert releases == 1; if (getState() == 0) { throw new IllegalMonitorStateException(); }; //Set the thread that preempts the lock to null setExclusiveOwnerThread(null); // release lock setState(0); return true; } } private final Sync sync = new Sync(); @Override public void lock() { sync.acquire(1); } @Override public void unlock() { sync.release(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public Condition newCondition() { return null; } }
Here is a non-reentrant exclusive lock class that uses a value of 0 for the unlocked state and a value of 1 for the locked state.
verify:
public static void main(String[] args) throws InterruptedException { ExclusiveLock exclusiveLock = new ExclusiveLock(); new Thread(() -> { try { exclusiveLock.lock(); System.out.println("thread1 get lock"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { exclusiveLock.unlock(); System.out.println("thread1 release lock"); } }).start(); new Thread(() -> { try { exclusiveLock.lock(); System.out.println("thread2 get lock"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { exclusiveLock.unlock(); System.out.println("thread2 release lock"); } }).start(); Thread.currentThread().join(); }
Such a very simple exclusive lock synchronizer is realized. Let's understand its core mechanism below.
Core Principle Mechanism
If you are asked to design an exclusive lock, what aspects should you consider?
- How does the thread indicate that the preemption of the lock resource is successful? Is it possible to have a state flag, state=1 means that there is a thread holding the lock, and other threads are waiting.
- Where are other threads that fail to grab locks maintained? Is it necessary to introduce a queue to maintain the thread queue that failed to acquire the lock?
- So how to make the thread block? Remember that LockSupport.park and unpark can block and wake up threads?
We can find the answers to these questions in the data structure and source code of AQS.
AQS internally maintains a volatile int state (representing shared resources) and a FIFO thread waiting queue (which will enter this queue when multiple threads contend for resources and are blocked).
Taking the above example as an example, the state is initialized to 0, indicating the unlocked state. When A thread lock(), it will call the acquire method of AQS, and acquire will call the tryAcquire() method rewritten by the subclass to seize the lock through cas. After that, other threads will fail when tryAcquire() and enter the CLH queue until the A thread unlock() releases the lock, that is, the state is restored to 0, and other threads have the opportunity to acquire the lock.
As an abstract method, AQS provides a framework for locking and releasing locks. The template mode used here, the tryAcquire and tryRelease mentioned above are template methods related to the exclusive mode, and other template methods and shared lock modes Or Condition-related, this article will not discuss it.
Source code analysis
The figure above is the class structure diagram of AQS, and the red part is an important member variable that makes up AQS.
Member variables
- state shared variable
A very important field state in AQS indicates the synchronization state, which is modified by volatile and is used to display the lock status of the current critical resource. It is maintained through three methods: getState(), setState(), compareAndSetState().
A few key points about state:
- Use volatile modification to ensure visibility between multiple threads.
- getState(), setState(), compareAndSetState() use final modification, restricting subclasses from rewriting them.
- compareAndSetState() adopts the optimistic locking CAS algorithm to ensure atomic operation.
1. CLH queue (FIFO queue)
Another important concept in AQS is the CLH queue, which is a doubly linked list queue, in which head and tail record the head node and tail node respectively, and the element type of the queue is Node.
private transient volatile Node head; private transient volatile Node tail;
The structure of Node is as follows:
static final class Node { //Waiting flags in shared mode static final Node SHARED = new Node(); //wait flag in exclusive mode static final Node EXCLUSIVE = null; //Indicates that the current node has canceled scheduling. When timeout or interrupted (in the case of interrupted response), it will trigger a change to this state, and the node after entering this state will no longer change. static final int CANCELLED = 1; //Indicates that the successor node is waiting for the current node to wake up. When the successor node joins the queue, the status of the predecessor node will be updated to SIGNAL. static final int SIGNAL = -1; //Indicates that the node is waiting on the Condition. When other threads call the signal() method of the Condition, the node in the CONDITION state will be transferred from the waiting queue to the synchronization queue, waiting to acquire the synchronization lock. static final int CONDITION = -2; //In the shared mode, the predecessor node will not only wake up its successor node, but may also wake up the successor node of the successor. static final int PROPAGATE = -3; //State, including the above four state values, the initial value is 0, generally the initial state of the node volatile int waitStatus; //reference to the previous node volatile Node prev; //reference to the next node volatile Node next; //A thread reference saved at the current node volatile Thread thread; //Subsequent nodes of the condition queue Node nextWaiter; }
Note that a negative value for waitSstatus indicates that the node is in an active wait state, while a positive value indicates that the node has been cancelled. Therefore, in many places in the source code, >0 and <0 are used to judge whether the status of the node is normal.
exclusiveOwnerThread
AQS through inheritance
The AbstractOwnableSynchronizer class has properties. Represents a thread held by a synchronizer in exclusive mode.
Exclusive lock acquisition acquire(int)
acquire(int) is the entry method for threads to acquire shared resources in exclusive mode.
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
The overall flow of the method is as follows:
- tryAcquire() tries to acquire resources directly, and returns directly if successful.
- If it fails, call the addWaiter() method to wrap the current thread into a Node (status is EXCLUSIVE, marked as exclusive mode) and insert it at the end of the CLH queue.
- The acquireQueued() method makes the thread block in the waiting queue to acquire resources, and returns only after acquiring the resources. If it is interrupted during the entire waiting process, it returns true, otherwise it returns false.
- The thread has been interrupted while waiting, it is not responding. Only after the thread acquires the resource, acquireQueued returns true and the response is interrupted.
tryAcquire(int)
This method attempts to acquire exclusive resources. If the acquisition is successful, return true directly, otherwise return false directly.
//Throwing an exception directly, this is a method implemented by subclasses, which embodies the idea of template mode protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
AQS is just a framework, and the acquisition/release method of specific resources is implemented by a custom synchronizer. For example, fair locks have fair lock acquisition methods, and unfair locks have unfair lock acquisition methods.
addWaiter(Node)
This method is used to add the current thread to the tail of the waiting queue.
// Encapsulate the thread into a node and put it at the end of the synchronization queue private Node addWaiter(Node mode) { // The current thread is encapsulated into a node Node of the synchronous queue Node node = new Node(Thread.currentThread(), mode); // This node needs to be inserted behind the original tail node, so we first write down the original tail node here Node pred = tail; // Determine whether the tail node is empty, if it is empty, it means that there is no node in the queue, and the following steps will not be performed if (pred != null) { // Record the previous node of the new node as the original tail node node.prev = pred; // Set the new node as the new tail node, and use the CAS operation to ensure atomicity if (compareAndSetTail(pred, node)) { // If the setting is successful, let the next of the original tail node point to the new tail node pred.next = node; return node; } } // If the above operations fail, call the enq method to continue trying (see the enq method below) enq(node); return node; } private Node enq(final Node node) { // Keep trying with an infinite loop for (;;) { // Record original tail node Node t = tail; // If the original tail node is empty, the synchronization queue must be initialized first. After initialization, the next cycle will add new nodes to the queue if (t == null) { // Use the CAS settings to create a default node as the inaugural point if (compareAndSetHead(new Node())) // point to the same node tail = head; } else { // The following operations are consistent with the if statement block in the addWaiter method node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
Its execution process can be roughly summarized as follows: encapsulate the new thread into a node, and add it to the tail of the synchronization queue. If the synchronization queue is empty, add a default node to it first, and then join; if the join fails, use An infinite loop (also called a spin) keeps trying until it succeeds.
acquireQueued(Node, int)
Through tryAcquire() and addWaiter(), the thread failed to acquire resources and has been placed at the end of the waiting queue. What are we going to do next?
Enter the waiting state and rest until other threads completely release the resources and wake up yourself, then get the resources by yourself, and then you can do what you want to do. It can be imagined as a hospital queuing up to get a number, queuing up to get a number in the waiting queue (you can rest if there is nothing else to do in the middle), and returning until you get the number.
final boolean acquireQueued(final Node node, int arg) { //Mark whether the resource is successfully obtained boolean failed = true; try { //Whether it has been interrupted during the mark waiting process boolean interrupted = false; //"Spin"! for (;;) { //get the precursor final Node p = node.predecessor(); //If the predecessor is head, that is, the node has become the second child, then it is eligible to try to obtain resources (maybe the boss wakes up after releasing resources, and of course it may be interrupt ed). if (p == head && tryAcquire(arg)) { //After getting the resource, point the head to the node. Therefore, the benchmark node pointed to by head is the node or null that currently obtains the resource. setHead(node); // node.prev in setHead has been set to null, and head.next is set to null here to facilitate the GC to recycle the previous head node. It also means that the nodes that have taken the resources before are out of the team! p.next = null; // Successfully obtained resources failed = false; //Returns whether it has been interrupted during the waiting process return interrupted; } //If you can rest, you will enter the waiting state through park() until it is unpark(). // If it is interrupted when it cannot be interrupted, it will wake up from park() and find that it cannot get resources, so it will continue to enter park() to wait. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //If the waiting process is interrupted, even if only once, mark interrupted as true interrupted = true; } } finally { if (failed) // If the resource is not successfully acquired during the waiting process (such as timeout, or interrupted when it can be interrupted), then cancel the waiting of the node in the queue. cancelAcquire(node); } }
To sum up: Let the thread block in the synchronization queue until it becomes the next node of the head node, wake up by the thread corresponding to the head node, and then start to acquire the lock, and return from the method if the acquisition is successful. This method will return a boolean value indicating whether the thread in the synchronization queue is interrupted.
shouldParkAfterFailedAcquire()
This method is mainly used to check the status and see if you can really go to rest.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //Get the state of the front int ws = pred.waitStatus; if (ws == Node.SIGNAL) //If you have already told the frontrunner to notify yourself after getting the number, then you can rest in peace return true; if (ws > 0) { /* * If the predecessor gives up, then keep looking forward until you find the nearest normal waiting state and queue it behind it. * Note: Those abandoned nodes, because they are "plugged" in front of them by themselves, they are equivalent to forming a no-reference chain, and will be driven away by the security uncle later (GC recycling)! */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //If the precursor is normal, then set the status of the precursor to SIGNAL and tell it to notify itself after receiving the number. It may fail, maybe they just released it! compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
During the whole process, if the state of the precursor node is not SIGNAL, then I can't rest at ease. I need to find a rest point with peace of mind. At the same time, I can try again to see if it is my turn to get the number.
parkAndCheckInterrupt()
This method is where the thread is actually blocked and rested.
private final boolean parkAndCheckInterrupt() { // Call park() to put the thread into the waiting state LockSupport.park(this); //Call park() to put the thread into the waiting state return Thread.interrupted(); }
park() will put the current thread into the waiting state. In this state, there are two ways to wake up the thread: 1) by unpark(); 2) by interrupt().
selfInterrupt()
static void selfInterrupt() { Thread.currentThread().interrupt(); }
To interrupt a thread, set the interrupt bit of the thread to true. Because Thread.interrupted() in the parkAndCheckInterrupt method will clear the interrupt flag, it is necessary to make up the interrupt in the selfInterrupt method.
The whole process can be illustrated with the following figure.
Exclusive lock release release(int)
release(int) is the entry point for threads to release shared resources in exclusive mode. It will release the specified amount of resources. If it is completely released (ie state=0), it will wake up other threads in the waiting queue to obtain resources.
public final boolean release(int arg) { // If the custom tryRelease above returns true, it means that the lock is not held by any thread if (tryRelease(arg)) { // Get the head node Node h = head; // If the head node is not empty and the waitStatus of the head node is not an initialization node, the thread suspension status will be released if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
Why is the judgment condition here h != null && h.waitStatus != 0?
- h == null Head has not been initialized yet. Initially, head == null, the first node enters the queue, and Head will be initialized as a virtual node. So, if you haven't had time to join the team here, there will be a situation where head == null.
h2. != null && waitStatus == 0 indicates that the thread corresponding to the successor node is still running and does not need to be woken up. - h != null && waitStatus < 0 indicates that the successor node may be blocked and needs to be woken up.
tryRelease(int)
tryRelease is a template method implemented by subclasses that defines the logic for releasing locks.
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
Because this is the exclusive mode, if the thread releases the resource, it must have obtained the exclusive resource, and the corresponding amount of resources can be directly subtracted (state-=arg), and there is no need to consider the issue of thread safety. But pay attention to its return value. As mentioned above, release() judges whether the thread has released resources based on the return value of tryRelease()! Therefore, when the custom synchronizer is implemented, if the resource has been completely released (state=0), it must return true, otherwise it will return false.
unparkSuccessor(Node)
private void unparkSuccessor(Node node) { // Get the head node waitStatus int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // Get the next node of the current node Node s = node.next; // If the next node is null or the next node is canceled, find the non-cancelled node at the beginning of the queue if (s == null || s.waitStatus > 0) { s = null; // Just start looking from the tail node, go to the head of the queue, and find the first node with waitStatus<0 in the queue. for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // If the next node of the current node is not empty, and the status is <=0, unpark the current node if (s != null) LockSupport.unpark(s.thread); }
Why do you want to find the first non-Cancelled node from the back to the front?
The previous addWaiter method:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
We can see from this that node enqueueing is not an atomic operation, that is to say, node.prev = pred; compareAndSetTail(pred, node) can be regarded as the atomic operation of Tail enqueue, but at this time pred. next = node; It has not been executed yet. If the unparkSuccessor method is executed at this time, there is no way to search from the front to the back, so it needs to be searched from the back to the front. Another reason is that when the CANCELLED state Node is generated, the Next pointer is disconnected first, and the Prev pointer is not disconnected, so it is necessary to traverse from the back to the front to traverse all the Nodes.
To sum up, if you search from the front to the back, due to the non-atomic operation enqueued in extreme cases and the operation of disconnecting the Next pointer during the generation of the CANCELLED node, it may not be possible to traverse all the nodes. Therefore, after waking up the corresponding thread, the corresponding thread will continue to execute.
Summarize
This article mainly explains the exclusive mode of AQS. The most important are acquire() and release, two methods closely related to exclusive use. At the same time, a simple custom demo helps everyone to understand in a simple way. In fact, the function of AQS is not limited to this, the content There are many, here is the first to share the principle of the most basic exclusive lock, I hope it will be helpful to everyone.