Principle and application from ReentrantLock to AQS

preface

Java Concurrent contract (JUC:java.util.concurrent) provides many concurrent tools. Among them, many familiar concurrent tools, ReentrantLock and Semaphore, all use a common base class -- AbstractQueuedSynchronizer, or AQS for short. AQS is a framework used to build locks and synchronizers. Using AQS can easily and efficiently build a large number of synchronizers that are widely used, such as ReentrantLock and Semaphore, and other synchronizers such as ReentrantReadWriteLock, SynchronousQueue, FutureTask, etc. are based on AQS. Of course, we can also use AQS to easily construct synchronizers that meet our own needs.

AQS is responsible for managing the state in the synchronizer class. It manages an integer state information, which can be operated through getState, setState, compareAndSetState and other type methods. ReentrantLock indicates that all threads have repeatedly acquired the required number of times, and Semaphore indicates the remaining number of licenses.

ReentrantLock

1.1 ReentrantLock overview

ReentrantLock is a reentrant mutex Lock, also known as "exclusive Lock". The ReentrantLock class implements Lock. It has the same concurrency and memory semantics as synchronized, but adds some features like Lock voting, timed Lock waiting and interruptible Lock waiting. In addition, it provides better performance in the event of intense contention. (in other words, when many threads want to access shared resources, the JVM can spend less time scheduling threads and more time executing threads.)
ReentrantLock locks can only be held by one thread lock at a time point.

Reentrant: ReentrantLock lock, which can be acquired multiple times by a single thread.

ReentrantLock is divided into "fair lock" and "unfair lock". The difference between them lies in whether the lock acquisition mechanism is fair. ReentrantLock can only be obtained by one thread at the same time point (when a thread obtains a lock, other threads must wait); ReentraantLock manages all threads that acquire the lock through a FIFO (first in first out) waiting queue. Under the "fair lock" mechanism, threads queue up to acquire locks in turn; When the "unfair lock" is in the obtainable state, no matter whether you are at the beginning of the queue or not, you will try to obtain the lock without queuing immediately (queue again if you cannot obtain the lock).

Generally, we will compare ReentrantLock and synchronized, which is better or worse. In fact, after java officially optimizes synchronized (biased lock, lightweight lock, adaptive spin, lock elimination, lock coarsening...), The performance difference between the two is not large, but there are differences in some aspects. Here is a simple comparison with a table:

// **************************How Synchronized is used**************************
// 1. for code blocks
synchronized (this) {}
// 2. for objects
synchronized (object) {}
// 3. for method
public synchronized void test () {}
// 4. reentrant
for (int i = 0; i < 100; i++) {
    synchronized (this) {}
}
// **************************How ReentrantLock is used**************************
public void test () throw Exception {
    // 1. initialize and select fair lock and unfair lock
    ReentrantLock lock = new ReentrantLock(true);
    // 2. available for code blocks
    lock.lock();
    try {
        try {
            // 3. it supports multiple locking methods and is flexible; With reentrant feature
            if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ }
        } finally {
            // 4. manually release the lock
            lock.unlock()
        }
    } finally {
        lock.unlock();
    }
}

1.2 ReentrantLock and AQS

Above, we have a general understanding of ReentrantLock. Next, we can take a look at the relationship between ReentrantLock and AQS. In fact, the underlying layer of ReentrantLock is implemented using AQS. Let's take a look at the source code:

/**ReentrantLock Implemented the Lock class, which has a field, Sync**/
public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    
    private final Sync sync;

    /**
     * Sync It inherits the AQS abstract class and is the synchronization control basis of this lock.
     * Next, Sync is subdivided into fair locks and unfair locks.
     * The AQS state is used to indicate the number of lock reentries.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * The abstract lock method implements fair lock and unfair lock respectively.
         */
        abstract void lock();
        ...
    }

    /**
     * Unfair lock
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        ...
    }

    /**
     * Fair lock
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
        
        final void lock() {
            acquire(1);
        }
        ...
    }

    /**
     * ReentrantLock The default construction method of, which creates an unfair lock by default
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * The boolean parameter is passed in to determine whether to create a fair lock or an unfair lock.
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

There is little difference between the implementation methods of fair locks and unfair locks in ReentrantLock. The difference lies in whether fair locks directly enter the queue. First, look at the source code of the locking process of unfair locks:

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        /**Locking method**/
        final void lock() {
            //If setting the variable State (synchronization State) through CAS succeeds, that is, obtaining the lock succeeds, the current thread will be set as the exclusive thread.
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
            //If setting the variable State (synchronization State) through CAS fails, that is, acquiring the lock fails, enter the Acquire method for subsequent processing.
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

Let's take a look at the lock obtaining methods in the fair lock source code:

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }
        ...
    }

From the source code of fair locks and unfair locks, we can see that both locks are locked through the lock() method, but how is the lock() method locked? With this question in mind, we will continue to track and find that this method belongs to the core method of the parent AQS(AbstractQueuedSynchronizer) of FairSync and NonfairSync.
Before learning about AQS, we have a few questions:

  • As mentioned above, CAS attempts to change the state to indicate whether the lock acquisition succeeds or fails. If the lock acquisition fails, how do you call the acquire() method?
  • What is this thread doing when the lock cannot be obtained? Keep trying to lock? Or suspend waiting for wakeup?

Next, we will introduce AQS and the association between ReentrantLock and AQS in detail.

AQS

Before you learn about AQS, take a look at the overall framework of AQS:

  • In the above figure, the colored ones are Method and the colorless ones are attribute.
  • The AQS framework is divided into five layers, from top to bottom, from shallow to deep, from the API exposed by AQS to the underlying basic data.
  • When a user-defined synchronizer is accessed, you only need to rewrite some of the methods required by the first layer, without paying attention to the specific implementation process of the bottom layer. When the user-defined synchronizer performs lock or unlock operations, it first enters the AQS internal method through the API of the first layer, and then obtains the lock through the second layer. Then, for the process of failed lock acquisition, it enters the waiting queue of the third and fourth layers. These processing methods all depend on the basic data providing layer of the fifth layer.

AQS overview

AQS maintains a shared resource with the int type state of the volatil, and completes the queuing of the thread that obtains the resource through the built-in FIFO. (this built-in synchronization queue is called the "CLH" queue). The queue consists of Node nodes one by one. Each Node maintains a prev reference and a next reference, pointing to its own predecessor and successor nodes respectively. AQS maintains two pointers to the queue head and tail respectively.

In fact, it is a variant double ended bidirectional linked list.
When the thread fails to acquire resources (for example, the attempt to set the state state fails when trying to acquire), it will be constructed as a node to join the CLH queue, and the current thread will be blocked in the queue (implemented through LockSupport.park, which is actually in the waiting state). When the thread holding the synchronization state releases the synchronization state, it will wake up the subsequent node, and then the node thread will continue to participate in the contention for the synchronization state.

Node node

static final class Node {
        /** waitStatus Value indicating that the thread has been canceled (wait timeout or interrupted)*/
        static final int CANCELLED =  1;
        /** waitStatus Value, indicating that subsequent threads need to be awakened (unpaking)*/
        static final int SIGNAL    = -1;
        /**waitStatus Value, indicating that the node thread is waiting on the condition. When it is signal ed, it will be transferred from the waiting queue to the synchronization queue */
        static final int CONDITION = -2;
       /** waitStatus Value, indicating that the next shared synchronization state will be unconditionally propagated
        static final int PROPAGATE = -3;
        /** Waiting status, initially 0 */
        volatile int waitStatus;
        /**The previous node of the current node */
        volatile Node prev;
        /** The next node of the current node */
        volatile Node next;
        /** Threads in the queue associated with the current node */
        volatile Thread thread;
        /** ...... */
    }

Synchronization State

/**
     * The synchronization state.
     */
    private volatile int state;
    
    protected final int getState() {
        return state;
    }

    
    protected final void setState(int newState) {
        state = newState;
    }

    
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

From the source code of state in AQS, it can be seen that all state methods are final modified, indicating that they cannot be overridden in subclasses. We can modify the synchronization state represented by the state field to realize the exclusive mode and shared mode (locking process) of multithreading.
Generally speaking, custom synchronizers are either exclusive or shared. They only need to implement one of tryacquire tryrelease and tryacquireshared tryrereleaseshared. AQS also supports the user-defined synchronizer to realize both exclusive and shared modes, such as ReentrantReadWriteLock. ReentrantLock is an exclusive lock, so tryacquire tryrelease is implemented.

Exclusive mode (understand AQS through ReentrantLock)

Get synchronization status --acquire()

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
  • tryAcquire: first, call the tryAcquire method. If true is returned, it means that the synchronization status is obtained successfully, and the following logic will not be executed; If false is returned, that is, failed to obtain the synchronization status, enter the acquireQueued step;
  • Acquirequeueueued: at this time, failed to obtain the synchronization status, constructed an exclusive synchronization node, and added this node to the tail of the synchronization queue through addWatiter (at this time, there may be multiple thread nodes trying to join the tail of the synchronization queue, which need to be added in a thread safe manner);
  • selfInterrupt: this node attempts to obtain the synchronization status in the queue. If the synchronization status cannot be obtained, the node thread will be blocked until it is awakened by the predecessor node or interrupted.

Taking unfair locks as an example, you can see the general process of obtaining locks:

final boolean nonfairTryAcquire(int acquires) {
            //Get current thread
            final Thread current = Thread.currentThread();
            //Get the value of state
            int c = getState();
            //If the state is 0, it means that no other thread is currently occupying shared resources. You can try to obtain a lock
            if (c == 0) {
                //Modify the value of state with CAS
                if (compareAndSetState(0, acquires)) {
                    //Modify successfully, acquire lock successfully, and process the current thread
                    setExclusiveOwnerThread(current);
                    //Return to acquire lock successfully
                    return true;
                }
            }
            //If the state is not 0, judge whether the thread occupying the lock is the current thread
            else if (current == getExclusiveOwnerThread()) {
                //If it is the current thread, increment the state
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                    //Set state
                setState(nextc);
                //Returns the success of lock acquisition (reentrant principle)
                return true;
            }
            return false;
        }
  • Get the current thread;
  • Get the value of state;
  • If the state is 0, it means that no other thread currently occupies shared resources. You can try to obtain a lock;
  • Modify the state value with CAS; Modify successfully, acquire the lock successfully, and process the current thread;
  • Returns the success of obtaining the lock;
  • If state is not 0, judge whether the thread occupying the lock is the current thread;
  • If it is the current thread, increment the state;
  • Returns the success of obtaining the lock (reentrant principle).

The important thing to note about the value is that there is a part of judgment in the fair lock. Let's take a look at the general process of obtaining the lock by the fair lock:

protected final boolean tryAcquire(int acquires) {
            //Get the current thread;
            final Thread current = Thread.currentThread();
            //Get the value of state;
            int c = getState();
            //If the state is 0, it means that no other thread is currently occupying shared resources. You can try to obtain a lock
            if (c == 0) {
                //The method to judge whether there is a valid node in the current waiting queue,
                //If False is returned, it indicates that the current thread can strive for shared resources;
                //If True is returned, it indicates that there are valid nodes in the queue, and the current thread must join the waiting queue.
                //Modify the state value with CAS; Modify successfully, acquire the lock successfully, and process the current thread;
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //If state is not 0, judge whether the thread occupying the lock is the current thread;
            else if (current == getExclusiveOwnerThread()) {
                //If it is the current thread, increment the state;
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                    //Returns the success of obtaining the lock (reentrant principle).
                setState(nextc);
                return true;
            }
            return false;
        }
/**Method to judge whether there is a valid node in the current waiting queue**/
public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on whether the header node is initialized
        // Before tail and on head Next being exact if the current node is between the head node and the tail node
        // thread is first in queue Thread is the first in the queue
        // Get current tail node
        Node t = tail; // Read fields in reverse initialization order
        // Get current tail node
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
In a two-way linked list, the first node is a virtual node. In fact, it does not store any information, but only occupies the space.
The real first node with data starts at the second node.
When h != t When: 
    1. If(s = h.next)==null,The waiting queue is being initialized by a thread, but it is only in progress Tail point Head,No Head point Tail,
       At this time, there are elements in the queue, which need to be returned True(See the following for details addWaiter()already enq()Code analysis).
    2. If(s = h.next) != null,Indicates that there is at least one valid node in the queue at this time,
    3. If at this time s.thread ==Thread.currentThread(),Description the thread in the first valid node of the waiting queue is the same as the current thread,
       Then the current thread can obtain resources;
    4. If s.thread != Thread.currentThread(),Note the first valid node thread of the waiting queue is different from the current thread. The current thread must be added to the waiting queue.

Enter queue addWaiter()

When the attempt to acquire the lock is successful, it ends directly and returns;
When the attempt to acquire the lock fails, the next step is taken: the thread is added to the waiting queue addwait()

private Node addWaiter(Node mode) {
        //Create a new Node with the current thread and the incoming lock mode
        Node node = new Node(Thread.currentThread(), mode);
        //Get the current tail node first, and point the tail node to Node pred
        Node pred = tail;
        //If the obtained tail node is not empty
        if (pred != null) {
            //Set the front node of the new node to the tail obtained above (the back direct point of the tail node is null at this time)
            node.prev = pred;
            //Use CAS to modify the value of the tail node (set the new node as the tail node)
            if (compareAndSetTail(pred, node)) {
                //If the setting is successful, modify the post node of the previously obtained pred (tail node before modification) to the currently created node node
                pred.next = node;
                //Returns the new node successfully modified (at this time, the new node is a new tail node)
                return node;
            }
        }
        //If the obtained tail node is null (indicating that there are no elements in the waiting queue),
        //Or the current Pred pointer and Tail point to different positions (indicating that they have been modified by other threads),
        //The above two cases are handled in enq()
        enq(node);
        return node;
    }

If it is not initialized, a header node needs to be initialized. However, please note that the initialized header node is not the current thread node, but the node that calls the parameterless constructor. If there are elements in the queue after initialization or concurrency, the method is the same as before. In fact, addWaiter is an operation to add a tail node to the double ended linked list. It should be noted that the head node of the double ended linked list is the head node of a nonparametric constructor.

private Node enq(final Node node) {
        for (;;) {
            //Get the current tail node pointing to t
            Node t = tail;
            //If the tail node is empty
            if (t == null) { // Must initialize must initialize a header node
                //Use CAS to initialize a header (call nonparametric construction, and the interior of the header and shoulder points are null)
                if (compareAndSetHead(new Node()))
                    //Set the tail node as the head node (at this time, there is only one node. The head node and tail node are one)
                    tail = head;
            } else {
                //If the tail node is not null, set the front node of the new node node to t
                node.prev = t;
                //Modify the tail node through CAS, and point the tail node to a new node,
                //This method is mainly used to compare tailOffset and Expect. If the Node addresses of tailOffset and Expect are the same,
                //Then set the Tail value to the Update value.
                if (compareAndSetTail(t, node)) {
                    //The modification is successful. Set the post node of the previous tail node (now the penultimate node) to the new tail node
                    //Returns the penultimate node
                    t.next = node;
                    return t;
                }
            }
        }
    }

When to dequeue acquireQueued()

Through the above addWaiter() method, we can know that by putting a new Node into the queue to wait for the lock to be obtained, how can we obtain the lock after entering the queue? Next, let's look at how threads acquire locks.
Generally speaking, a thread fails to acquire a lock and is placed in the waiting queue. acquireQueued will keep acquiring the lock until it succeeds or no longer needs to acquire (interrupt).

final boolean acquireQueued(final Node node, int arg) {
        // Mark whether the resource is successfully obtained
        boolean failed = true;
        try {
            //Whether the mark has been interrupted
            boolean interrupted = false;
            //Start spin
            for (;;) {
                //Get the predecessor node of the current node
                final Node p = node.predecessor();
                //If the front node is the head node, it indicates that the current node is the first valid node (the head node is a virtual node), and starts to try to obtain the lock
                if (p == head && tryAcquire(arg)) {
                    //After obtaining the lock, the header pointer moves to the current node
                    //The setHead method sets the current node as a virtual node, but does not modify the waitStatus because it is always needed data.
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                
                // The description is that p is not the head node or p is the head node and the lock is not obtained currently (it may be that the unfair lock is preempted),
                //At this time, it is necessary to judge whether the current node is blocked (blocking condition: the waitStatus of the precursor node is -1), so as to prevent unlimited recycling and waste of resources.
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
/**Judge whether the current node is blocked**/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //Get the blocking status of the front node
        int ws = pred.waitStatus;
        //If node Signal -1 status, indicating that the status of the front node is wake-up status
        if (ws == Node.SIGNAL)
            //Can directly park (directly block)
            return true;
           //Waitstatus>0 is the cancel status. If the front node is in the cancel status
        if (ws > 0) {
            // Cycle forward to find the cancellation node and remove the cancellation node from the queue
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
             // Set the waiting status of the front node to SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

Cancel status node generates cancelAcquire()

private void cancelAcquire(Node node) {
  // Filter invalid nodes
    if (node == null)
        return;
  // Set the node not to be associated with any thread, that is, the virtual node
    node.thread = null;
    Node pred = node.prev;
  // Skip the node in cancelled status through the precursor node
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
  // Get the successor node of the filtered predecessor node
    Node predNext = pred.next;
  // Set the current node status to CANCELLED
    node.waitStatus = Node.CANCELLED;
  // If the current node is the tail node, set the first non cancelled node from the back to the front as the tail node
  // If the update fails, enter else. If the update succeeds, set the subsequent node of the tail to null
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
    // If the current node is not the successor node of the head, 1: judge whether the predecessor node of the current node is SIGNAL; 2: if not, set the predecessor node to SIGNAL to see whether it succeeds
    // If one of 1 and 2 is true, then judge whether the thread of the current node is null
    // If the above conditions are met, point the subsequent pointer of the predecessor node of the current node to the successor node of the current node
        if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
      // If the current node is the successor node of the head, or if the above conditions are not met, wake up the successor node of the current node
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

The above describes the AQS lock acquisition process from the lock acquisition and locking of ReentrantLock. Later, we will continue to introduce the principle of AQS lock release

reference

Tags: Java aqs

Posted by FredAt on Tue, 31 May 2022 02:50:35 +0530