Go语言使用Etcd实现分布式锁
作者:small_to_large 时间:2024-04-23 09:37:33
1 分布式锁概述
谈到分布式锁,必然是因为单机锁无法满足要求,在现阶段微服务多实例部署的情况下,单机语言级别的锁,无法满足并发互斥资源的安全访问。常见的单机锁如Java的jvm锁Lock
、synchronized
,golang的Mutex
等 对于分布式锁有很多种实现方式,常见的有以下几种:
基于数据库:通过数据库事务锁例如
for update
操作基于缓存中间件:redis分布式锁、etcd分布式锁等
基于ZK临时节点:zookeeper 临时节点实现分布式锁
每种方式实现的分布式锁各有优缺点简单介绍一下:
数据库实现不用额外引入新的中间件,减少系统的依赖性和不稳定性,但性能不会太高,且并发量大时,对数据库压力比较大。
ZK实现分布式,因为zk满足了CP,能够保证其数据一致性,不会出现加锁成功后又丢失的问题,但相反性能会降低,并且可用性降低 CAP
A
不是满足的,详细可以自行了解zk细节redis 实现:最大的优点性能高,能保证AP,保证其高可用。但无法保证一致性,因为redis满足的是AP,可能存在某一个时间节点集群数据S-M同步不一致。
2 分布式锁要点
实现分布式锁需要满足一下几点:
锁载体:redis 受用 K-V 键值作为锁载体,ZK使用临时节点作为载体
锁租期:进程持有分布式锁后不能一直占用,如果因为宕机情况造成锁释放失败,就会一直占用,reds 可以设置过期时间,zk临时节点也会自动删除。
其他要求:比如减少惊群效应、可重入机制、公平锁机制,不同的实现方式有的不能完全满足。
分布式锁选择:
qps不大的情况下,那种方式都可以
结合目前技术体系,在不引入新的技术中间件情况下解决问题
qps并发极高,但容忍极少的数据丢失或者不一致,建议使用redis实现分布式锁
如果业务要求任何情况下都不允许数据丢失,可以使用zk或者etcd实现
3 Etcd 实现机制
锁载体: 使用 k-v 结构实现
锁租期: Etcd 通过
lease
可以对 kv 设置租约,当租约到期,kv 将失效删除;避免长时间占用锁不释放放。自动续期: Etcd 可以对租约进行自动续期,通过
KeepAlive
实现公平锁: 多个程序同时抢锁时,会根据
Revision
值大小依次获得锁,可以有效避免 “惊群效应”,公平获取。Watch 机制: 监听机制,Watch 机制支持 Watch 某个固定的 key或者目录, key 或目录发生变化,客户端可以收到通知。
4 代码实现
操作步骤:
初始化客户端
创建一个session并设置默认租期30s
获取指定前缀的锁对象
加锁
执行业务
释放锁
代码:
package main
import (
"context"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"log"
"time"
)
func main() {
// 初始化客户端
log.Println("客户端初始化")
client, err := clientv3.New(clientv3.Config{Endpoints: []string{"localhost:2379"}, DialTimeout: time.Second * 3})
if err != nil {
log.Fatalf("客户端初始化失败:%v\n", err)
}
// 创建一个session并设置默认租期30s,即锁默认超过30s会自动释放(内部会自动续期Etcd KeepAlive)
log.Println("Session初始化")
session, err := concurrency.NewSession(client, concurrency.WithTTL(30))
if err != nil {
log.Fatalf("Session初始化失败:%v\n", err)
return
}
defer func(session *concurrency.Session) {
err := session.Close()
if err != nil {
log.Fatalf("Session关闭失败:%v\n", err)
}
}(session)
// 获取指定前缀的锁对象
mutex := concurrency.NewMutex(session, "my-lock")
// 加锁默认等待3s
log.Println("TryLock加锁失败不会等待")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
err = mutex.TryLock(ctx)
if err != nil {
log.Fatalf("加锁失败立即返回:%v\n", err)
return
}
//log.Println("加锁最多等待3s")
//ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
//defer cancel()
//err = mutex.Lock(ctx)
//if err != nil {
//log.Fatalf("加锁失败:%v\n", err)
//return
//}
// Exe biz
log.Println("加锁成功开始执行业务")
for i := 1; i <= 10; i++ {
time.Sleep(time.Second)
log.Printf("执行 %%%d ...", i*10)
}
// 释放锁
err = mutex.Unlock(context.TODO())
if err != nil {
log.Fatalf("释放锁失败:%v\n", err)
return
}
log.Println("释放锁完成")
}
测试结果
来源:https://juejin.cn/post/7234709239537418300