浅谈Java(SpringBoot)基于zookeeper的分布式锁实现

作者:LJY_SUPER 时间:2023-11-16 08:14:56 

通过zookeeper实现分布式锁

1、创建zookeeper的client

首先通过CuratorFrameworkFactory创建一个连接zookeeper的连接CuratorFramework client


public class CuratorFactoryBean implements FactoryBean<CuratorFramework>, InitializingBean, DisposableBean {
private static final Logger LOGGER = LoggerFactory.getLogger(ContractFileInfoController.class);
private String connectionString;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
private RetryPolicy retryPolicy;
private CuratorFramework client;
public CuratorFactoryBean(String connectionString) {
 this(connectionString, 500, 500);
}
public CuratorFactoryBean(String connectionString, int sessionTimeoutMs, int connectionTimeoutMs) {
 this.connectionString = connectionString;
 this.sessionTimeoutMs = sessionTimeoutMs;
 this.connectionTimeoutMs = connectionTimeoutMs;
}
@Override
public void destroy() throws Exception {
 LOGGER.info("Closing curator framework...");
 this.client.close();
 LOGGER.info("Closed curator framework.");
}
@Override
public CuratorFramework getObject() throws Exception {
 return this.client;
}
@Override
public Class<?> getObjectType() {
  return this.client != null ? this.client.getClass() : CuratorFramework.class;
}
@Override
public boolean isSingleton() {
 return true;
}
@Override
public void afterPropertiesSet() throws Exception {
 if (StringUtils.isEmpty(this.connectionString)) {
  throw new IllegalStateException("connectionString can not be empty.");
 } else {
  if (this.retryPolicy == null) {
   this.retryPolicy = new ExponentialBackoffRetry(1000, 2147483647, 180000);
  }
this.client = CuratorFrameworkFactory.newClient(this.connectionString, this.sessionTimeoutMs, this.connectionTimeoutMs, this.retryPolicy);
  this.client.start();
  this.client.blockUntilConnected(30, TimeUnit.MILLISECONDS);
 }
}
public void setConnectionString(String connectionString) {
 this.connectionString = connectionString;
}
public void setSessionTimeoutMs(int sessionTimeoutMs) {
 this.sessionTimeoutMs = sessionTimeoutMs;
}
public void setConnectionTimeoutMs(int connectionTimeoutMs) {
 this.connectionTimeoutMs = connectionTimeoutMs;
}
public void setRetryPolicy(RetryPolicy retryPolicy) {
 this.retryPolicy = retryPolicy;
}
public void setClient(CuratorFramework client) {
 this.client = client;
}
}

2、封装分布式锁

根据CuratorFramework创建InterProcessMutex(分布式可重入排它锁)对一行数据进行上锁


public InterProcessMutex(CuratorFramework client, String path) {
 this(client, path, new StandardLockInternalsDriver());
}

使用 acquire方法
1、acquire() :入参为空,调用该方法后,会一直堵塞,直到抢夺到锁资源,或者zookeeper连接中断后,上抛异常。
2、acquire(long time, TimeUnit unit):入参传入超时时间、单位,抢夺时,如果出现堵塞,会在超过该时间后,返回false。


public void acquire() throws Exception {
 if (!this.internalLock(-1L, (TimeUnit)null)) {
  throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
 }
}
public boolean acquire(long time, TimeUnit unit) throws Exception {
 return this.internalLock(time, unit);
}

释放锁 mutex.release();


public void release() throws Exception {
 Thread currentThread = Thread.currentThread();
 InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
 if (lockData == null) {
  throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
 } else {
  int newLockCount = lockData.lockCount.decrementAndGet();
  if (newLockCount <= 0) {
   if (newLockCount < 0) {
    throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);
   } else {
    try {
     this.internals.releaseLock(lockData.lockPath);
    } finally {
     this.threadData.remove(currentThread);
    }
}
  }
 }
}

封装后的DLock代码
1、调用InterProcessMutex processMutex = dLock.mutex(path);

2、手动释放锁processMutex.release();

3、需要手动删除路径dLock.del(path);

推荐 使用:
都是 函数式编程
在业务代码执行完毕后 会释放锁和删除path
1、这个有返回结果
public T mutex(String path, ZkLockCallback zkLockCallback, long time, TimeUnit timeUnit)
2、这个无返回结果
public void mutex(String path, ZkVoidCallBack zkLockCallback, long time, TimeUnit timeUnit)


public class DLock {
private final Logger logger;
private static final long TIMEOUT_D = 100L;
private static final String ROOT_PATH_D = "/dLock";
private String lockRootPath;
private CuratorFramework client;
public DLock(CuratorFramework client) {
 this("/dLock", client);
}
public DLock(String lockRootPath, CuratorFramework client) {
 this.logger = LoggerFactory.getLogger(DLock.class);
 this.lockRootPath = lockRootPath;
 this.client = client;
}
public InterProcessMutex mutex(String path) {
 if (!StringUtils.startsWith(path, "/")) {
  path = Constant.keyBuilder(new Object[]{"/", path});
 }
return new InterProcessMutex(this.client, Constant.keyBuilder(new Object[]{this.lockRootPath, "", path}));
}
public <T> T mutex(String path, ZkLockCallback<T> zkLockCallback) throws ZkLockException {
 return this.mutex(path, zkLockCallback, 100L, TimeUnit.MILLISECONDS);
}
public <T> T mutex(String path, ZkLockCallback<T> zkLockCallback, long time, TimeUnit timeUnit) throws ZkLockException {
 String finalPath = this.getLockPath(path);
 InterProcessMutex mutex = new InterProcessMutex(this.client, finalPath);
try {
  if (!mutex.acquire(time, timeUnit)) {
   throw new ZkLockException("acquire zk lock return false");
  }
 } catch (Exception var13) {
  throw new ZkLockException("acquire zk lock failed.", var13);
 }
T var8;
 try {
  var8 = zkLockCallback.doInLock();
 } finally {
  this.releaseLock(finalPath, mutex);
 }
return var8;
}
private void releaseLock(String finalPath, InterProcessMutex mutex) {
 try {
  mutex.release();
  this.logger.info("delete zk node path:{}", finalPath);
  this.deleteInternal(finalPath);
 } catch (Exception var4) {
  this.logger.error("dlock", "release lock failed, path:{}", finalPath, var4);
//   LogUtil.error(this.logger, "dlock", "release lock failed, path:{}", new Object[]{finalPath, var4});
 }
}
public void mutex(String path, ZkVoidCallBack zkLockCallback, long time, TimeUnit timeUnit) throws ZkLockException {
 String finalPath = this.getLockPath(path);
 InterProcessMutex mutex = new InterProcessMutex(this.client, finalPath);
try {
  if (!mutex.acquire(time, timeUnit)) {
   throw new ZkLockException("acquire zk lock return false");
  }
 } catch (Exception var13) {
  throw new ZkLockException("acquire zk lock failed.", var13);
 }
try {
  zkLockCallback.response();
 } finally {
  this.releaseLock(finalPath, mutex);
 }
}
public String getLockPath(String customPath) {
 if (!StringUtils.startsWith(customPath, "/")) {
  customPath = Constant.keyBuilder(new Object[]{"/", customPath});
 }
String finalPath = Constant.keyBuilder(new Object[]{this.lockRootPath, "", customPath});
 return finalPath;
}
private void deleteInternal(String finalPath) {
 try {
  ((ErrorListenerPathable)this.client.delete().inBackground()).forPath(finalPath);
 } catch (Exception var3) {
  this.logger.info("delete zk node path:{} failed", finalPath);
 }
}
public void del(String customPath) {
 String lockPath = "";
try {
  lockPath = this.getLockPath(customPath);
  ((ErrorListenerPathable)this.client.delete().inBackground()).forPath(lockPath);
 } catch (Exception var4) {
  this.logger.info("delete zk node path:{} failed", lockPath);
 }
}
}

@FunctionalInterface
public interface ZkLockCallback<T> {
T doInLock();
}
@FunctionalInterface
public interface ZkVoidCallBack {
void response();
}
public class ZkLockException extends Exception {
public ZkLockException() {
}
public ZkLockException(String message) {
 super(message);
}
public ZkLockException(String message, Throwable cause) {
 super(message, cause);
}
}

配置CuratorConfig


@Configuration
public class CuratorConfig {
@Value("${zk.connectionString}")
private String connectionString;
@Value("${zk.sessionTimeoutMs:500}")
private int sessionTimeoutMs;
@Value("${zk.connectionTimeoutMs:500}")
private int connectionTimeoutMs;
@Value("${zk.dLockRoot:/dLock}")
private String dLockRoot;
@Bean
public CuratorFactoryBean curatorFactoryBean() {
 return new CuratorFactoryBean(connectionString, sessionTimeoutMs, connectionTimeoutMs);
}
@Bean
@Autowired
public DLock dLock(CuratorFramework client) {
 return new DLock(dLockRoot, client);
}
}

测试代码


@RestController
@RequestMapping("/dLock")
public class LockController {
@Autowired
private DLock dLock;
@RequestMapping("/lock")
public Map testDLock(String no){
 final String path = Constant.keyBuilder("/test/no/", no);
 Long mutex=0l;
 try {
  System.out.println("在拿锁:"+path+System.currentTimeMillis());
   mutex = dLock.mutex(path, () -> {
   try {
    System.out.println("拿到锁了" + System.currentTimeMillis());
    Thread.sleep(10000);
    System.out.println("操作完成了" + System.currentTimeMillis());
   } finally {
    return System.currentTimeMillis();
   }
  }, 1000, TimeUnit.MILLISECONDS);
 } catch (ZkLockException e) {
  System.out.println("拿不到锁呀"+System.currentTimeMillis());
 }
 return Collections.singletonMap("ret",mutex);
}
@RequestMapping("/dlock")
public Map testDLock1(String no){
 final String path = Constant.keyBuilder("/test/no/", no);
 Long mutex=0l;
 try {
  System.out.println("在拿锁:"+path+System.currentTimeMillis());
  InterProcessMutex processMutex = dLock.mutex(path);
  processMutex.acquire();
  System.out.println("拿到锁了" + System.currentTimeMillis());
  Thread.sleep(10000);
  processMutex.release();
  System.out.println("操作完成了" + System.currentTimeMillis());
 } catch (ZkLockException e) {
  System.out.println("拿不到锁呀"+System.currentTimeMillis());
  e.printStackTrace();
 }catch (Exception e){
  e.printStackTrace();
 }
 return Collections.singletonMap("ret",mutex);
}
@RequestMapping("/del")
public Map delDLock(String no){
 final String path = Constant.keyBuilder("/test/no/", no);
 dLock.del(path);
 return Collections.singletonMap("ret",1);
}
}

以上所述是小编给大家介绍的Java(SpringBoot)基于zookeeper的分布式锁实现详解整合,希望对大家有所帮助.

来源:https://blog.csdn.net/LJY_SUPER/article/details/87807091

标签:Java,SpringBoot,zookeeper,分布式锁
0
投稿

猜你喜欢

  • maven中配置项目的jdk版本无效的排查方式

    2023-07-18 21:43:42
  • Java数组与堆栈相关知识总结

    2023-11-12 06:12:18
  • Android实现人脸支付的示例代码

    2023-07-30 14:23:52
  • java 一个类实现两个接口的案例

    2023-08-09 12:24:35
  • SpringBoot微信消息接口配置详解

    2023-08-23 09:51:21
  • C#中32位浮点数Float(Real)一步步按位Bit进行分析

    2023-07-19 16:01:20
  • Java 线程的优先级(setPriority)案例详解

    2023-11-12 23:46:39
  • JPA save()方法将字段更新为null的解决方案

    2023-10-28 22:29:28
  • 探讨:android项目开发 统筹兼顾 需要考虑的因素

    2023-08-05 11:21:32
  • @CacheEvict 清除多个key的实现方式

    2023-11-21 08:28:04
  • 聊一聊Java的JVM类加载机制

    2023-11-08 03:58:37
  • 关于java 图形验证码的解决方法

    2023-08-09 15:21:19
  • Java面试题冲刺第二十四天--并发编程

    2023-08-31 05:39:02
  • java锁synchronized面试常问总结

    2023-08-01 05:11:37
  • JavaApi实现更新删除及读取节点

    2023-11-10 07:30:33
  • C# GDI+实现时钟表盘

    2023-06-20 07:11:32
  • Java如何跳过https的ssl证书验证详解

    2023-08-24 11:34:56
  • java实现鲜花销售系统

    2023-08-29 20:23:42
  • Android中外接键盘的检测的实现

    2023-07-27 21:15:13
  • 详解备忘录模式及其在Java设计模式编程中的实现

    2023-08-24 22:34:02
  • asp之家 软件编程 m.aspxhome.com