InterProcessMutex实现zookeeper分布式锁原理

作者:冬雪是你 时间:2023-08-11 05:46:43 

原理简介:

zookeeper实现分布式锁的原理就是多个节点同时在一个指定的节点下面创建临时会话顺序节点,谁创建的节点序号最小,谁就获得了锁,并且其他节点就会监听序号比自己小的节点,一旦序号比自己小的节点被删除了,其他节点就会得到相应的事件,然后查看自己是否为序号最小的节点,如果是,则获取锁

zookeeper节点图分析

InterProcessMutex实现zookeeper分布式锁原理

InterProcessMutex实现的锁机制是公平且互斥的,公平的方式是按照每个请求的顺序进行排队的。

InterProcessMutex实现的InterProcessLock接口,InterProcessLock主要规范了如下几个方法:

// 获取互斥锁
public void acquire() throws Exception;
// 在给定的时间内获取互斥锁
public boolean acquire(long time, TimeUnit unit) throws Exception;
// 释放锁处理
public void release() throws Exception;
// 如果此JVM中的线程获取了互斥锁,则返回true
boolean isAcquiredInThisProcess();

接下来我们看看InterProcessMutex中的实现,它究竟有哪些属性,以及实现细节

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
   // LockInternals是真正实现操作zookeeper的类,它内部包含连接zookeeper客户端的CuratorFramework
   // LockInternals的具体实现后面我会讲到
   private final LockInternals internals;
 // basePath是锁的根结点,所有的临时有序的节点都是basePath的子节点,
   private final String basePath;
   //
   private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
// LockData封装了请求对应的线程(owningThread)、锁的重入的次数(lockCount)、线程对应的临时节点(lockPath)
   private static class LockData {
       final Thread owningThread;
       final String lockPath;
     // 原子性的
       final AtomicInteger lockCount = new AtomicInteger(1);

private LockData(Thread owningThread, String lockPath)
       {
           this.owningThread = owningThread;
           this.lockPath = lockPath;
       }
   }

private static final String LOCK_NAME = "lock-";
   // 获取互斥锁,阻塞【InterProcessLock的实现】
   @Override
   public void acquire() throws Exception {
     // 获取锁,一直等待
       if ( !internalLock(-1, null) ) {
           throw new IOException("Lost connection while trying to acquire lock: " + basePath);
       }
   }
   // 获取互斥锁,指定时间time【InterProcessLock的实现】
   @Override
   public boolean acquire(long time, TimeUnit unit) throws Exception {
       return internalLock(time, unit);
   }
   // 当前线程是否占用锁中【InterProcessLock的实现】
   @Override
   public boolean isAcquiredInThisProcess() {
       return (threadData.size() > 0);
   }
   //如果调用线程与获取互斥锁的线程相同,则执行一次互斥锁释放。如果线程已多次调用acquire,当此方法返回时,互斥锁仍将保留 【InterProcessLock的实现】
   @Override
   public void release() throws Exception {
       Thread currentThread = Thread.currentThread(); //当前线程
       LockData lockData = threadData.get(currentThread); //线程对应的锁信息
       if ( lockData == null ) {
           throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
       }
     // 因为获取到的锁是可重入的,对lockCount进行减1,lockCount=0时才是真正释放锁
       int newLockCount = lockData.lockCount.decrementAndGet();
       if ( newLockCount > 0 ) {
           return;
       }
       if ( newLockCount < 0 ) {
           throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
       }
       try {
         // 到这里时lockCount=0,具体释放锁的操作交给LockInternals中的releaseLock方法实现
           internals.releaseLock(lockData.lockPath);
       }
       finally {
           threadData.remove(currentThread);
       }
   }
 // 获取basePath根结点下的所有临时节点的有序集合
 public Collection<String> getParticipantNodes() throws Exception {
       return LockInternals.getParticipantNodes(internals.getClient(), basePath, internals.getLockName(), internals.getDriver());
   }

boolean isOwnedByCurrentThread() {
       LockData lockData = threadData.get(Thread.currentThread());
       return (lockData != null) && (lockData.lockCount.get() > 0);
   }
 protected String getLockPath() {
       LockData lockData = threadData.get(Thread.currentThread());
       return lockData != null ? lockData.lockPath : null;
   }
 // acquire()中调用的internalLock()方法
 private boolean internalLock(long time, TimeUnit unit) throws Exception {
       Thread currentThread = Thread.currentThread();
       LockData lockData = threadData.get(currentThread);
       if ( lockData != null ) {
           // 如果当前线程已经获取到了锁,那么将重入次数lockCount+1,返回true
           lockData.lockCount.incrementAndGet();
           return true;
       }
     // attemptLock方法是获取锁的真正实现,lockPath是当前线程成功在basePath下创建的节点,若lockPath不为空代表成功获取到锁
       String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
       if ( lockPath != null ) {
         // lockPath封装到当前线程对应的锁信息中
           LockData newLockData = new LockData(currentThread, lockPath);
           threadData.put(currentThread, newLockData);
           return true;
       }
       return false;
   }
}

接下来我们看看InterProcessMutex中使用的LockInternals类的实现细节

public class LockInternals {
   private final CuratorFramework                  client; // 连接zookeeper的客户端
   private final String                            path;// 等于basePath,InterProcessMutex中传进来的
   private final String                            basePath; // 根结点
   private final LockInternalsDriver               driver; // 操作zookeeper节点的driver
   private final String                            lockName; // "lock-"
   private final AtomicReference<RevocationSpec>   revocable = new AtomicReference<RevocationSpec>(null);

private final CuratorWatcher                    revocableWatcher = new CuratorWatcher() {
       @Override
       public void process(WatchedEvent event) throws Exception {
           if ( event.getType() == Watcher.Event.EventType.NodeDataChanged ) {
               checkRevocableWatcher(event.getPath());
           }
       }
   };
   // 监听节点的 * ,若被监听的节点有动静,则唤醒 notifyFromWatcher()=>notifyAll();
  private final Watcher watcher = new Watcher() {
       @Override
       public void process(WatchedEvent event) {
           notifyFromWatcher();
       }
   };
  private volatile int    maxLeases;
 // 获取basePath的子节点,排序后的
 public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception
   {
       List<String> children = client.getChildren().forPath(basePath);
       List<String> sortedList = Lists.newArrayList(children);
       Collections.sort
       (
           sortedList,
           new Comparator<String>()
           {
               @Override
               public int compare(String lhs, String rhs)
               {
                   return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
               }
           }
       );
       return sortedList;
   }
 // 尝试获取锁【internalLock=>attemptLock】
 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
   {// 开始时间
       final long      startMillis = System.currentTimeMillis();
     // 记录等待时间
       final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
       final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
     // 重试次数
       int             retryCount = 0;
     // 当前节点
       String          ourPath = null;
     // 是否获取到锁的标志
       boolean         hasTheLock = false;
     // 是否放弃获取到标志
       boolean         isDone = false;
     // 不停尝试获取
       while ( !isDone )
       {
           isDone = true;

try
           {// 创建当前线程对应的节点
               ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
             // internalLockLoop中获取
               hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
           }
           catch ( KeeperException.NoNodeException e )
           {// 是否可再次尝试
               if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
               {
                   isDone = false;
               }
               else
               {
                   throw e;
               }
           }
       }
// 获取到锁后,返回当前线程对应创建的节点路径
       if ( hasTheLock )
       {
           return ourPath;
       }

return null;
   }
 // 循环获取【attemptLock=>internalLockLoop】
 private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
   {
       boolean     haveTheLock = false; // 是否拥有分布式锁
       boolean     doDelete = false;// 是否需要删除当前节点
       try
       {
           if ( revocable.get() != null )
           {
               client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
           }
// 循环尝试获取锁
           while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
           {// 得到basePath下排序后的临时子节点
               List<String>        children = getSortedChildren();
             // 获取之前创建的当前线程对应的子节点
               String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
// 判断是否获取到锁,没有就返回监听路径
               PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
             // 成功获取到
               if ( predicateResults.getsTheLock() )
               {
                   haveTheLock = true;
               }
               else
               {// 没有获取到锁,监听前一个临时顺序节点
                   String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

synchronized(this)
                   {
                       try
                       {                          
// 上一个临时顺序节点如果被删除,会唤醒当前线程继续竞争锁  
                         client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                           if ( millisToWait != null )
                           {
                               millisToWait -= (System.currentTimeMillis() - startMillis);
                               startMillis = System.currentTimeMillis();
                             // 获取锁超时
                               if ( millisToWait <= 0 )
                               {
                                   doDelete = true;    // timed out - delete our node
                                   break;
                               }

wait(millisToWait);
                           }
                           else
                           {
                               wait();
                           }
                       }
                       catch ( KeeperException.NoNodeException e )
                       {
                           // it has been deleted (i.e. lock released). Try to acquire again
                       }
                   }
               }
           }
       }
       catch ( Exception e )
       {
           ThreadUtils.checkInterrupted(e);
           doDelete = true;
           throw e;
       }
       finally
       {
           if ( doDelete )
           {
             // 因为获取锁超时,所以删除之前创建的临时子节点
               deleteOurPath(ourPath);
           }
       }
       return haveTheLock;
   }

private void deleteOurPath(String ourPath) throws Exception {
       try
       {
         // 删除
           client.delete().guaranteed().forPath(ourPath);
       }
       catch ( KeeperException.NoNodeException e )
       {
           // ignore - already deleted (possibly expired session, etc.)
       }
   }

}

StandardLockInternalsDriver implements LockInternalsDriver

// 前面internalLockLoop方法中driver.getsTheLock执行的方法
@Override
   public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
   {
   // 获取子节点在临时顺序节点列表中的位置
       int             ourIndex = children.indexOf(sequenceNodeName);
       // 检验子节点在临时顺序节点列表中是否有效
       validateOurIndex(sequenceNodeName, ourIndex);
       // 若当前子节点的位置<maxLeases,代表可获取锁【maxLeases默认=1,若ourIndex=0,代笔自己位置最小】
       boolean         getsTheLock = ourIndex < maxLeases;
       // getsTheLock=true,则不需要监听前maxLeases的节点【maxLeases默认=1,代表监听前面最靠近自己的节点】
       String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);

return new PredicateResults(pathToWatch, getsTheLock);
   }

用InterProcessMutex在自己业务实现分布式锁,请点击此链接阅读点我

来源:https://blog.csdn.net/m0_45097637/article/details/123591672

标签:InterProcessMutex,zookeeper,分布式锁
0
投稿

猜你喜欢

  • 详解spring boot集成ehcache 2.x 用于hibernate二级缓存

    2023-06-25 05:49:44
  • FasfDFS整合Java实现文件上传下载功能实例详解

    2022-05-27 02:30:46
  • java synchronized用法详解

    2022-06-22 00:38:03
  • WPF+DiffPlex实现文本比对工具

    2022-04-20 21:32:07
  • SpringBoot打War包上传到阿里云的LINUX服务器的操作方法

    2021-09-26 22:27:01
  • C# 开发(创蓝253)手机短信验证码接口的实例

    2023-05-22 11:15:54
  • 简单了解java自定义和自然排序

    2022-01-29 02:47:26
  • Spring的@Validation和javax包下的@Valid区别以及自定义校验注解

    2021-06-20 04:06:35
  • C# List实现行转列的通用方案

    2022-03-28 02:53:04
  • ElasticSearch查询文档基本操作实例

    2023-11-24 14:20:02
  • Android入门教程之Fragment的具体使用详解

    2021-09-30 01:21:57
  • java实现上传图片尺寸修改和质量压缩

    2023-04-04 03:42:55
  • springboot max-http-header-size最大长度的那些事及JVM调优方式

    2021-12-14 03:43:46
  • SpringBoot Knife4j在线API文档框架基本使用

    2022-03-10 21:27:48
  • Android开发中ImageLoder加载网络图片时将图片设置为ImageView背景的方法

    2021-12-14 14:58:38
  • MyBatis-plus实现逆向生成器

    2021-08-26 16:37:14
  • C#中分部类和分部方法的应用

    2022-08-16 06:49:05
  • Flutter学习之实现自定义themes详解

    2022-04-17 17:20:49
  • C#实现网络小程序的步骤详解

    2023-08-17 18:16:37
  • SpringBoot 整合 ElasticSearch操作各种高级查询搜索

    2023-03-25 17:12:40
  • asp之家 软件编程 m.aspxhome.com