Redisson principle - Redis distributed lock - scheme / mechanism - renewal / release / mutual exclusion / reentry

Original website: Redisson principle - Redis distributed lock - scheme / mechanism - renewal / release / mutual exclusion / reentry_ CSDN blog

Other web sites

Principle analysis of Redisson's implementation of distributed lock - Zhihu

Distributed locks - this is a full understanding (Redis's perfect scheme for implementing distributed locks)_ Franco's blog - CSDN blog

brief introduction

Other web sites

Redis distlock - redis China user group (CRUG)

explain

         Distributed locks are often used in projects, and Redis is the most commonly used scheme of distributed locks. Distributed locks should consider such issues as lock renewal, release, reentry, mutual exclusion and so on. Redisson client is the most perfect solution at present. It can automatically renew the lock internally. The lock can be released when the program execution ends, an exception occurs or the whole application hangs. Reentry and mutual exclusion can also be handled well. With redisson, we don't need to write distributed locks by ourselves. Our handwritten distributed locks are not as comprehensive as redisson considers.

Advantages of Redisson distributed lock scheme

  1. Redisson solves the problem of lock renewal through the Watch Dog mechanism.
  2. Compared with Zookeeper, Redisson has higher performance based on Redis and is suitable for scenarios with high performance requirements.
  3. Distributed reentrant locks are realized through Redisson, which is better than the native SET mylock userId NX PX milliseconds + lua   The implementation effect is better. Although the basic principles are the same, it helps us shield the internal execution details.
  4. In the process of waiting to apply for lock resources, some optimizations are also made on the implementation of waiting for lock, which reduces invalid lock applications and improves the utilization of resources.

Problems needing attention in distributed locking

  • Safety property: exclusive (mutually exclusive). At any one time, only one client holds the lock.
  • Liveness property a: no deadlock. Even if the client holding the lock crashes or the network gets partitioned, the lock can still be obtained.
  • Liveness property B: fault tolerance. As long as most Redis nodes are alive, the client can acquire and release locks

Disadvantages of Redisson distributed lock scheme

Master slave replication problem

What if Redis hangs? You might say that you can solve this problem by adding a slave node. But this usually doesn't work. In this way, we can't realize the exclusive sharing of resources, because the master-slave synchronization of Redis is usually asynchronous.

There is obvious competition in this scenario (master-slave structure):

  1. Client A obtains the lock from the master
  2. Before the master synchronizes the lock to the slave, the Master goes down.
  3. slave node is promoted to master node
  4. Client B has obtained another lock of the same resource that client A has obtained. Security failure!

Sometimes the program is such a coincidence. For example, when a node hangs, multiple clients get the lock at the same time. If you can accept this small probability error, there is no problem with this replication based scheme.

System loss

There are some opinions that using the Watch Dog mechanism to open a timing thread to continuously extend the lock time will cause losses to the system (this is just a statement on the network. The blogger checked a lot of data and does not think there is a great system loss in combination with the actual production. This is for your reference only).

Lock mechanism analysis

usage

RLock lock = redisson.getLock("myLock"); 
lock.lock(); 
try {
    // do sth.
} finally {
    lock.unlock(); 
}

Locking mechanism

summary

If the lock acquisition is unsuccessful, subscribe to the lock release message, block before obtaining the message, and cycle to obtain the lock after receiving the release notification.  

Code call process

lock()            //org.redisson.RedissonLock.java#lock
    lock(-1, null, false)
        tryAcquire(-1, leaseTime, unit, threadId)
            tryAcquireAsync(waitTime, leaseTime, unit, threadId)
                 (waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG)    // This is the most important method

lock(-1, null, false)

Summary: if the lock acquisition is unsuccessful, subscribe to the lock release message, block before obtaining the message, and cycle to obtain the lock after receiving the release notice.  

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
		// Acquire lock
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // Get success
        if (ttl == null) {
            return;
        }

		// Asynchronous subscription to redis channel
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }

        try {
            while (true) {
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    try {
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        future.getNow().getLatch().acquire();
                    } else {
                        future.getNow().getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
			// Unsubscribe
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

tryAcquire(leaseTime, unit, threadId)

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId));// Lock is acquired asynchronously, but get(future) achieves synchronization
}

tryAcquireAsync(waitTime, leaseTime, unit, threadId)

Summary: Netty's Future listen model is used: give Future a Promise.  

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) { //1. If the timeout is set, directly call tryLockInnerAsync
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    //2. If leaseTime==-1, the default timeout is 30s
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
    //3. Listen to the Future and obtain the return value ttlRemaining (remaining timeout). The lock is obtained successfully, but ttlRemaining refreshes the expiration time
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }
 
            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

tryLockInnerAsync

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    return commandExecutor.evalWriteAsync(
        getName(), 
        LongCodec.INSTANCE, 
        command,
          "if (redis.call('exists', KEYS[1]) == 0) then " +
              "redis.call('hset', KEYS[1], ARGV[2], 1); " +
              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
              "return nil; " +
          "end; " +
          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
              "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
              "return nil; " +
          "end; " +
          "return redis.call('pttl', KEYS[1]);",
        Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

Script input parameters

parameterExample valuemeaning
Number of keys1The number of keys. Count and count for reentrant.
KEYS[1]"myLock"Lock key name
ARGV[1]60000Valid time to hold lock: Ms. the default is 30 seconds
ARGV[2]285475da-9152-4c83-822a-67ee2f116a79:52Unique ID: Redisson client ID(UUID) + thread ID

Take a look at the storage structure in Redis:

127.0.0.1:6379> HGETALL myLock
1) "285475da-9152-4c83-822a-67ee2f116a79:52"
2) "1"

Script interpretation

-- If the lock does not exist: add a new lock, set the lock reentry count to 1, set the lock expiration time, and return
if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('hset', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
 
-- If the lock exists and the unique ID matches, it indicates that the current lock request is a lock reentry request, so the lock reentry count+1,Set the lock expiration time again and return to
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
 
-- If the lock exists but the unique ID does not match, it indicates that the lock is occupied by other threads, and the current thread has no right to unlock others. It directly returns the remaining expiration time of the lock
return redis.call('pttl', KEYS[1]);

Lock mutual exclusion mechanism

summary

What happens if client 2 tries to lock?

First, the first if judge will execute exists myLock and find that the lock key myLock already exists. Then the second if judge will judge whether the hash data structure of the myLock lock key contains the ID of client 2, which is obviously not here, because it contains the ID of client 1. Therefore, client 2 will execute:

return redis.call('pttl', KEYS[1]);

Returns a number that represents the remaining lifetime of the myLock key.

Take a look at the main process of redisson trylock:

@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        // 1. Try to acquire the lock
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }

        // If the time taken to apply for a lock is greater than or equal to the maximum waiting time, the application for a lock fails
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }

        current = System.currentTimeMillis();

        /**
         * 2.Subscribe to lock release events and block waiting for lock release through await method, which effectively solves the problem of waste of resources caused by invalid lock applications:
         * Based on the amount of information, when the lock is occupied by other resources, the current thread subscribes to the lock release event through the Redis channel,
         * Once the lock is released, a message is sent to notify the waiting thread to compete
         *
         * When this.await returns false, it indicates that the waiting time has exceeded the maximum waiting time for obtaining the lock. Unsubscribe and return the failure to obtain the lock
         * When this.await returns true, it enters the loop to try to obtain the lock
         */
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        // Within the await method, CountDownLatch is used to implement blocking and obtain the result of asynchronous execution of subscribe (Netty's Future is applied)
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            // Calculate the total time taken to acquire the lock. If it is greater than or equal to the maximum waiting time, acquiring the lock fails
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
              }

            /**
             * 3.After receiving the signal of lock release, cycle one attempt after another to obtain the lock within the maximum waiting time
             * If the lock is obtained successfully, it will immediately return true,
             * If the lock is not obtained within the maximum waiting time, it is considered that obtaining the lock failed, and false is returned to end the cycle
             */
            while (true) {
                long currentTime = System.currentTimeMillis();

                // 4. Try to acquire the lock again
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }
                // 5. If the maximum waiting time is exceeded, false is returned to end the cycle, and lock acquisition fails
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                /**
                 * 6.Blocking waiting lock (blocking by semaphore (shared lock), waiting for unlocking message):
                 */
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    //If the remaining time (ttl) is less than the wait time, it is obtained from the semaphore of the Entry within the ttl time
                    //A license (unless interrupted or no license is available).
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    //Then wait for the semaphore to pass within the wait time range
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                // Update the remaining waiting time (maximum waiting time - elapsed blocking time)
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            // 7. Unsubscribe from the unlock message no matter whether the lock is obtained or not
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

Process analysis:

  1. When trying to obtain a lock, if null is returned, it indicates that the lock is successful. If a value is returned, it indicates that the lock already exists. ttl is the remaining lifetime of the lock.
  2. If the process of client 2 fails to acquire the lock at this time, the thread id of client 2 (essentially the process id) is used to subscribe to the events released by the lock through the channel of Redis,. If the lock release event notification has not been waited during the waiting process, when the maximum waiting time is exceeded, the lock acquisition fails and returns false, that is, the second time   thirty-nine   Line code. If you wait for the notification of the lock release event, you start a cycle of repeatedly trying to acquire the lock.
  3. Each time in the loop, try to acquire the lock and get the remaining lifetime of the existing lock. If the lock is obtained in the retry, it will be returned directly. If the lock is still occupied, the message waiting to release the lock uses the Semaphore semaphore of JDK to block the thread. When the lock is released and the message of releasing the lock is released, the Semaphore release() method will be called. At this time, a thread in the waiting queue blocked by the Semaphore can continue to try to obtain the lock.

Special attention:

        There is a detail in the above process. It is necessary to explain here, which is also a key point of distributed lock: when the lock is being occupied, the process waiting to obtain the lock does not obtain the lock through a while(true) loop, but uses Redis's publish and subscribe mechanism to block the process waiting for the lock through the await method, It effectively solves the problem of waste of resources in invalid lock application.

Renewal mechanism

summary

        The default lifetime of the lock key locked by client 1 is only 30 seconds. If it exceeds 30 seconds, client 1 still wants to hold the lock all the time. What should I do?

        Redisson provides a renewal mechanism. Once client 1 locks successfully, it will start a Watch Dog. You can use the watchdog in this way (leaseTime is not set, or set to - 1)

lock.tryLock()
lock.tryLock(xxx, -1, xxx)

Source code  

org.redisson.RedissonLock#tryAcquireAsync 

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        if (ttlRemaining == null) {
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

Note: from the above source code, we can see that the leaseTime must be - 1 to enable the Watch Dog mechanism, that is, if you want to enable the Watch Dog mechanism, you must use the default locking time of 30s. If you customize the time, beyond this time, the lock will be released automatically and will not be extended.

private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        renewExpiration();
    }
}

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
        Collections.<Object>singletonList(getName()),
        internalLockLeaseTime, getLockName(threadId));
}

The Watch Dog mechanism is actually a background scheduled task thread. After obtaining the lock successfully, the thread holding the lock will be put into a redissonlock.expiration_ RENEWAL_ Check in the map every 10 seconds (internalLockLeaseTime / 3). If client 1 still holds the lock key (judging whether the client still holds the key is actually traversing the thread id in the expiration_renewable_map, and then go to Redis according to the thread id. if it exists, the time of the key will be extended), then the lifetime of the lock key will be prolonged continuously.

Note: here is a detail. If the service goes down, there will be no threads of the Watch Dog mechanism. At this time, the expiration time of the key will not be extended. It will expire automatically after 30s, and other threads can obtain the lock.

Reentrant locking mechanism

Redisson also supports reentrant locks, such as the following code:

@Override
public void lock() {
    RLock lock = redissonSingle.getLock("myLock");
    try {
        lock.lock();

        // Execute business
        doBusiness();

        lock.lock();

    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        // Release lock
        lock.unlock();
        lock.unlock();
        logger.info("The task is completed, Release lock!");
    }
}

Let's analyze the locking lua code again:

if (redis.call('exists', KEYS[1]) == 0) then " +
   "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
   "redis.call('pexpire', KEYS[1], ARGV[1]); " +
   "return nil; " +
   "end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
    "return nil; " +
    "end; " +
"return redis.call('pttl', KEYS[1]);"

The first if judgment is definitely not true. exists myLock will show that the lock key already exists. The second if judgment holds because the ID contained in the hash data structure of myLock is the ID of client 1. At this time, the logic of reentrant locking will be executed. Use hincrby myLock 285475da-9152-4c83-822a-67ee2f116a79:52 1 to add 1 to the locking times of client 1. At this point, the myLock data structure changes to the following:

127.0.0.1:6379> HGETALL myLock
1) "285475da-9152-4c83-822a-67ee2f116a79:52"
2) "2"

So far, we all know that the key of the hash structure is the name of the lock, the field is the client ID, and the value is the number of times the client has locked.

Lock release mechanism

summary

The release of the lock takes advantage of the subscription function of Redis.

There are three steps to release the lock:

  1. Delete the lock (note the reentrant lock here, which is analyzed in detail in the above script).
  2. Broadcast a lock release message to notify the blocking waiting process (to the channel named   redisson_lock__channel   publish one   UNLOCK_MESSAGE   Information).
  3. Cancel the Watch Dog mechanism, about to   RedissonLock.EXPIRATION_RENEWAL_MAP   Delete the thread id and cancel the scheduled task thread of Netty.

Source code

implement

lock.unlock()

The distributed lock can be released. Let's take a look at the process code for releasing locks:

@Override
public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<Void>();
    // 1. Asynchronous release lock
    RFuture<Boolean> future = unlockInnerAsync(threadId);
    // Cancel the Watch Dog mechanism
    future.onComplete((opStatus, e) -> {
        cancelExpirationRenewal(threadId);

        if (e != null) {
            result.tryFailure(e);
            return;
        }

        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }

        result.trySuccess(null);
    });

    return result;
}

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // Determine whether the lock key exists
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            // Decrement the value of the hash structure of the lock corresponding to the client to 0 before deleting
            // Then the channel name is redisson_lock__channel publish an UNLOCK_MESSAGE information
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

Tags: Database Redis lua

Posted by chwebdesigns on Sat, 25 Sep 2021 15:25:07 +0530