redisson 实现分布式锁的源码解析

作者:心城以北 时间:2022-06-05 05:38:47 

redisson 实现分布式锁的源码解析

redisson

redisson 实现分布式锁的机制如下:

redisson 实现分布式锁的源码解析

依赖版本

implementation 'org.redisson:redisson-spring-boot-starter:3.17.0'

测试代码

下面是模拟一个商品秒杀的场景,示例代码如下:

public class RedissonTest {
   public static void main(String[] args) {
       //1. 配置部分
       Config config = new Config();
       String address = "redis://127.0.0.1:6379";
       SingleServerConfig serverConfig = config.useSingleServer();
       serverConfig.setAddress(address);
       serverConfig.setDatabase(0);
       config.setLockWatchdogTimeout(5000);
       Redisson redisson = (Redisson) Redisson.create(config);
       RLock rLock = redisson.getLock("goods:1000:1");
       //2. 加锁
       rLock.lock();
       try {
           System.out.println("todo 逻辑处理 1000000.");
       } finally {
           if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {
               //3. 解锁
               rLock.unlock();
           }
       }
   }
}

加锁设计

rLock.lock();是加锁的核心代码,我们一起来看看调用栈

redisson 实现分布式锁的源码解析

加锁的核心方法是:org.redisson.RedissonLock#tryLockInnerAsync

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
       return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
               "if (redis.call('exists', KEYS[1]) == 0) then " +
                       "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                       "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                       "return nil; " +
                       "end; " +
                       "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                       "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                       "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                       "return nil; " +
                       "end; " +
                       "return redis.call('pttl', KEYS[1]);",
               Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
   }

其实它的本质是调用一段 LUA 脚本进行加锁。

锁续期设计

锁的续期是在 org.redisson.RedissonLock#tryAcquireAsync方法中调用 scheduleExpirationRenewal实现的。

续期需要注意的是,看门狗是设置在主线程的延迟队列的线程中。

tryAcquireAsync 代码如下:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
   RFuture<Long> ttlRemainingFuture;
   if (leaseTime != -1) {
       ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
   } else {
       ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
               TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
   }

CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
       // lock acquired
       if (ttlRemaining == null) {
           if (leaseTime != -1) {
               internalLockLeaseTime = unit.toMillis(leaseTime);
           } else {
               // 锁过期时间续期
               scheduleExpirationRenewal(threadId);
           }
       }
       return ttlRemaining;
   });
   return new CompletableFutureWrapper<>(f);
}

锁续期 scheduleExpirationRenewal代码如下:

protected void scheduleExpirationRenewal(long threadId) {
   ExpirationEntry entry = new ExpirationEntry();
   ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
   if (oldEntry != null) {
       oldEntry.addThreadId(threadId);
   } else {
       entry.addThreadId(threadId);
       try {
           renewExpiration();
       } finally {
           if (Thread.currentThread().isInterrupted()) {
               cancelExpirationRenewal(threadId);
           }
       }
   }
}

然后在调用 renewExpiration();执行续期逻辑

private void renewExpiration() {
   ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
   if (ee == null) {
       return;
   }
   // 创建延迟任务
   Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
       @Override
       public void run(Timeout timeout) throws Exception {
           ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
           if (ent == null) {
               return;
           }
           Long threadId = ent.getFirstThreadId();
           if (threadId == null) {
               return;
           }
           // 真正的续期,调用 LUA 脚本续期
           RFuture<Boolean> future = renewExpirationAsync(threadId);
           future.whenComplete((res, e) -> {
               if (e != null) {
                   log.error("Can't update lock " + getRawName() + " expiration", e);
                   EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                   return;
               }

// 如果续期成功
               if (res) {
                   // reschedule itself
                   renewExpiration();
               } else {
                   cancelExpirationRenewal(null);
               }
           });
       }
   }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
   ee.setTimeout(task);
}

renewExpirationAsync方法, 里面还是一段 LUA 脚本,进行重新设置锁的过期时间。

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
       return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
               "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                       "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                       "return 1; " +
                       "end; " +
                       "return 0;",
               Collections.singletonList(getRawName()),
               internalLockLeaseTime, getLockName(threadId));
   }

锁的自旋重试

org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)在执行获取锁失败的时候,会进入重试。其实这里就会执行 18 行以后的 while (true) 逻辑

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
   long threadId = Thread.currentThread().getId();
   Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
   // lock acquired
   if (ttl == null) {
       return;
   }

CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
   RedissonLockEntry entry;
   if (interruptibly) {
       entry = commandExecutor.getInterrupted(future);
   } else {
       entry = commandExecutor.get(future);
   }

try {
       while (true) {
           ttl = tryAcquire(-1, leaseTime, unit, threadId);
           // lock acquired
           if (ttl == null) {
               break;
           }

// waiting for message
           if (ttl >= 0) {
               try {
                   // 阻塞锁的超时时间,等锁过期后再尝试加锁
                   entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
               } catch (InterruptedException e) {
                   if (interruptibly) {
                       throw e;
                   }
                   entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
               }
           } else {
               if (interruptibly) {
                   entry.getLatch().acquire();
               } else {
                   entry.getLatch().acquireUninterruptibly();
               }
           }
       }
   } finally {
       unsubscribe(entry, threadId);
   }
//        get(lockAsync(leaseTime, unit));
}

entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);其实这里就是一个间歇性自旋。 等到上次锁过期的时间,在唤醒进行抢锁 entry.getLatch().acquire();

还有一个逻辑就是

CompletableFuture future = subscribe(threadId);

这里其实是会订阅一个消息,如果解锁过后,会发布解锁的消息。

解锁设计

rLock.unlock(); 的核心就是释放锁,撤销续期和唤醒在等待加锁的线程(发布解锁成功消息)。

核心方法(解锁): org.redisson.RedissonLock#unlockInnerAsync

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
       return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
               "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                       "return nil;" +
                       "end; " +
                       "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                       "if (counter > 0) then " +
                       "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                       "return 0; " +
                       "else " +
                       "redis.call('del', KEYS[1]); " +
                       // 发布解锁成功消息
                       "redis.call('publish', KEYS[2], ARGV[1]); " +
                       "return 1; " +
                       "end; " +
                       "return nil;",
               Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
   }

还是 LUA 的执行方式。

撤销锁续期

核心方法 org.redisson.RedissonBaseLock#unlockAsync(long)

@Override
public RFuture<Void> unlockAsync(long threadId) {
   // 解锁
   RFuture<Boolean> future = unlockInnerAsync(threadId);
   // 撤销续期
   CompletionStage<Void> f = future.handle((opStatus, e) -> {
       cancelExpirationRenewal(threadId);
       if (e != null) {
           throw new CompletionException(e);
       }
       if (opStatus == null) {
           IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                   + id + " thread-id: " + threadId);
           throw new CompletionException(cause);
       }
       return null;
   });
   return new CompletableFutureWrapper<>(f);
}

解锁成功唤排队线程

org.redisson.pubsub.LockPubSub#onMessage中回去唤醒阻塞的线程,让执行前面的锁自旋逻辑,具体代码如下:

@Override
protected void onMessage(RedissonLockEntry value, Long message) {
   if (message.equals(UNLOCK_MESSAGE)) {
       Runnable runnableToExecute = value.getListeners().poll();
       if (runnableToExecute != null) {
           runnableToExecute.run();
       }
       value.getLatch().release();
   } else if (message.equals(READ_UNLOCK_MESSAGE)) {
       while (true) {
           Runnable runnableToExecute = value.getListeners().poll();
           if (runnableToExecute == null) {
               break;
           }
           runnableToExecute.run();
       }
       value.getLatch().release(value.getLatch().getQueueLength());
   }
}

来源:https://juejin.cn/post/7093149727260147749

标签:redisson,分布式锁
0
投稿

猜你喜欢

  • Java+MySQL 图书管理系统

    2023-11-28 19:28:43
  • BroadcastReceiver静态注册案例详解

    2022-12-30 18:31:10
  • Android开发改变字体颜色方法

    2022-11-10 05:51:01
  • Android Internet应用实现获取天气预报的示例代码

    2023-09-26 04:13:22
  • Spring Data环境搭建实现过程解析

    2022-02-26 20:13:38
  • Java数据结构之单链表详解

    2023-11-04 17:02:20
  • springboot2.0和springcloud Finchley版项目搭建(包含eureka,gateWay,Freign,Hystrix)

    2021-09-14 22:57:38
  • springboot+mybatis-plus 两种方式打印sql语句的方法

    2022-12-29 13:41:11
  • Android网易有道词典案例源码分享

    2022-09-18 20:53:25
  • Android多媒体应用使用MediaPlayer播放音频

    2023-07-08 15:37:52
  • 简单实现Android弹出菜单效果

    2023-01-11 18:58:46
  • monkeyrunner之安卓开发环境搭建教程(1)

    2023-02-24 06:24:22
  • Android PopupWindow被输入法弹上去之后无法恢复原位的解决办法

    2023-07-24 12:23:31
  • C# XML基础入门小结(XML文件内容增删改查清)

    2022-10-18 17:12:02
  • java判断回文数示例分享

    2023-03-20 03:18:22
  • MyBatis插入数据返回主键的介绍

    2023-10-26 03:15:21
  • Java实现的基于socket通信的实例代码

    2021-12-30 19:06:50
  • 如何在IDEA Maven项目中导入本地jar包的步骤

    2023-03-25 06:43:48
  • Android转场效果实现示例浅析

    2023-09-21 12:10:17
  • java编程abstract类和方法详解

    2023-12-15 06:08:46
  • asp之家 软件编程 m.aspxhome.com