ZooKeeper 实现分布式锁的方法示例

作者:Beck's Blog 时间:2023-03-20 07:26:43 

ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、分布式协调/通知、集群管理、Master 选举、分布式锁等功能。

节点

在介绍 ZooKeeper 分布式锁前需要先了解一下 ZooKeeper 中节点(Znode),ZooKeeper 的数据存储数据模型是一棵树(Znode Tree),由斜杠(/)的进行分割的路径,就是一个 Znode(如 /locks/my_lock)。每个 Znode 上都会保存自己的数据内容,同时还会保存一系列属性信息。

Znode 又分为以下四种类型:


类型描述
持久节点节点创建后,会一直存在,不会因客户端会话失效而删除
持久顺序节点基本特性与持久节点一致,创建节点的过程中,ZooKeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名
临时节点客户端会话失效或连接关闭后,该节点会被自动删除
临时顺序节点基本特性与临时节点一致,创建节点的过程中,ZooKeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名

锁原理

ZooKeeper 分布式锁是基于 临时顺序节点 来实现的,锁可理解为 ZooKeeper 上的一个节点,当需要获取锁时,就在这个锁节点下创建一个临时顺序节点。当存在多个客户端同时来获取锁,就按顺序依次创建多个临时顺序节点,但只有排列序号是第一的那个节点能获取锁成功,其他节点则按顺序分别监听前一个节点的变化,当被监听者释放锁时,监听者就可以马上获得锁。

而且用临时顺序节点的另外一个用意是如果某个客户端创建临时顺序节点后,自己意外宕机了也没关系,ZooKeeper 感知到某个客户端宕机后会自动删除对应的临时顺序节点,相当于自动释放锁。

ZooKeeper 实现分布式锁的方法示例

如上图:ClientA 和 ClientB 同时想获取锁,所以都在 locks 节点下创建了一个临时节点 1 和 2,而 1 是当前 locks 节点下排列序号第一的节点,所以 ClientA 获取锁成功,而 ClientB 处于等待状态,这时 ZooKeeper 中的 2 节点会监听 1 节点,当 1节点锁释放(节点被删除)时,2 就变成了 locks 节点下排列序号第一的节点,这样 ClientB 就获取锁成功了。

代码测试

请确保 ZooKeeper 服务已启动,ZooKeeper 的搭建可参考Kafka 集群 中的 ZooKeeper 集群部分

以下是基于 C# 的测试,Java 可使用 Curator 框架,实现原理和上面描述是一致的,有兴趣可以看看源码,应该也不难理解。

创建 .NET Core 控制台程序 Nuget

安装 ZooKeeperNetEx.Recipes

创建 ZooKeeper Client


private const int CONNECTION_TIMEOUT = 50000;
private const string CONNECTION_STRING = "127.0.0.1:2181";
private ZooKeeper CreateClient()
{
var zooKeeper = new ZooKeeper(CONNECTION_STRING, CONNECTION_TIMEOUT, NullWatcher.Instance);
Stopwatch sw = new Stopwatch();
sw.Start();
while (sw.ElapsedMilliseconds < CONNECTION_TIMEOUT)
{
var state = zooKeeper.getState();
if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTING)
{
break;
}
}
sw.Stop();
return zooKeeper;
}

class NullWatcher : Watcher
 {
   public static readonly NullWatcher Instance = new NullWatcher();
   private NullWatcher() { }
   public override Task process(WatchedEvent @event)
   {
     return Task.CompletedTask;
   }
 }

添加 Lock 方法


/// <summary>
/// 加锁
/// </summary>
/// <param name="key">加锁的节点名</param>
/// <param name="lockAcquiredAction">加锁成功后需要执行的逻辑</param>
/// <param name="lockReleasedAction">锁释放后需要执行的逻辑,可为空</param>
/// <returns></returns>
public async Task Lock(string key, Action lockAcquiredAction, Action lockReleasedAction = null)
{
// 获取 ZooKeeper Client
ZooKeeper keeper = CreateClient();
// 指定锁节点
WriteLock writeLock = new WriteLock(keeper, $"/{key}", null);

var lockCallback = new LockCallback(() =>
{
lockAcquiredAction.Invoke();
writeLock.unlock();
}, lockReleasedAction);
// 绑定锁获取和释放的监听对象
writeLock.setLockListener(lockCallback);
// 获取锁(获取失败时会监听上一个临时节点)
await writeLock.Lock();
}

class LockCallback : LockListener
{
private readonly Action _lockAcquiredAction;
private readonly Action _lockReleasedAction;

public LockCallback(Action lockAcquiredAction, Action lockReleasedAction)
{
_lockAcquiredAction = lockAcquiredAction;
_lockReleasedAction = lockReleasedAction;
}

/// <summary>
/// 获取锁成功回调
/// </summary>
/// <returns></returns>
public Task lockAcquired()
{
_lockAcquiredAction?.Invoke();
return Task.FromResult(0);
}

/// <summary>
/// 释放锁成功回调
/// </summary>
/// <returns></returns>
public Task lockReleased()
{
_lockReleasedAction?.Invoke();
return Task.FromResult(0);
}
}

多线程模拟测试


static async Task RunAsync()
{
Parallel.For(1, 10, async (i) =>
{
await new ZooKeeprDistributedLock().Lock("locks", () =>
{
Console.WriteLine($"第{i}个请求,获取锁成功:{DateTime.Now},线程Id:{Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(1000); // 业务逻辑...
}, () =>
{
Console.WriteLine($"第{i}个请求,释放锁成功:{DateTime.Now},线程Id:{Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine("-------------------------------");
});
});
await Task.CompletedTask;
}

ZooKeeper 实现分布式锁的方法示例

虽然模拟的是多线程并行执行,但最终都会依赖锁的获取和释放而串行执行实际业务逻辑。

来源:http://beckjin.com/2019/06/16/zookeeper-lock/

标签:ZooKeeper,分布式锁
0
投稿

猜你喜欢

  • 在C# WPF下自定义滚动条ScrollViewer样式的操作

    2022-09-17 16:55:28
  • IntelliJ IDEAx导出安卓(Android)apk文件图文教程

    2022-06-22 18:26:16
  • spring-cloud入门之eureka-client(服务注册)

    2023-12-16 22:42:51
  • Android Studio 报错failed to create jvm error code -4的解决方法

    2023-01-22 03:13:49
  • Mybatis全局配置及映射关系的实现

    2022-07-03 09:31:39
  • Android编程实现获取当前连接wifi名字的方法

    2023-11-24 15:41:50
  • 详解Spring依赖注入的三种方式使用及优缺点

    2023-06-09 18:29:00
  • Kotlin数据容器深入讲解

    2022-03-28 05:19:34
  • C#中的yield关键字详解

    2023-01-11 08:19:01
  • 深入理解c# checked unchecked 关键字

    2021-11-29 01:25:09
  • Unity3D实现导航效果

    2022-09-26 02:15:54
  • Unity3d实现跑马灯广播效果

    2022-11-13 20:08:12
  • Android实现语音播放与录音功能

    2022-01-21 15:39:09
  • Android 使用Vibrator服务实现点击按钮带有震动效果

    2023-06-15 01:57:16
  • 举例讲解Java设计模式编程中Decorator装饰者模式的运用

    2023-01-13 03:17:00
  • Java类锁、对象锁、私有锁冲突测试

    2022-04-25 05:06:22
  • Android实现左侧滑动菜单

    2022-10-10 14:58:41
  • Springboot整合微信支付(订单过期取消及商户主动查单)

    2023-05-15 23:40:50
  • Java爬取豆瓣电影数据的方法详解

    2021-12-12 16:21:06
  • 教你使用java实现去除各种空格

    2022-09-21 21:27:07
  • asp之家 软件编程 m.aspxhome.com