分布式锁初见

介绍几种常见的分布式锁写法

多线程中为了防止多个线程同时执行同一段代码,们可以用 synchronized 关键字或 JUC 里面的 ReentrantLock 类来控制,

但是目前几乎任何一个系统都是部署多台机器的,单机部署的应用很少,synchronized 和 ReentrantLock 发挥不出任何作用,

此时就需要一把全局的锁,来代替 JAVA 中的 synchronized 和 ReentrantLock。 分布式锁的实现方式流行的主要有三种

  1. 分别是基于缓存 Redis 的实现方式
  2. 基于 zk 临时顺序节点的实现
  3. 基于数据库行锁的实现。

官网

目录 · redisson/redisson Wiki · GitHub

Jedis

使用 Redis 做分布式锁的思路是: 在 redis 中设置一个值表示加了锁,然后释放锁的时候就把这个 key 删除。

    /**
     * 尝试获取分布式锁
     *
     * @param jedis      Redis客户端
     * @param lockKey    锁
     * @param requestId  请求标识
     * @param expireTime 超期时间
     * @return 是否获取成功
     */
    public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
        // set支持多个参数 NX(not exist) XX(exist) EX(seconds) PX(million seconds)
        String result = jedis.set(lockKey, requestId, "NX", "EX", expireTime);
        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }
    /**
     * 释放分布式锁
     *
     * @param jedis     Redis客户端
     * @param lockKey   锁
     * @param requestId 请求标识,当前工作线程线程的名称
     * @return 是否释放成功
     */
    public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
        if (RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }

问题

  • SET key value ,而没有使用 SETNX+EXPIRE 的命令,原因是 SETNX+EXPIRE 是两条命令无法保证原子性,而 SET 是原子操作。
  • 那这里为什么要设置超时时间呢? 原因是当一个客户端获得了锁在执行任务的过程中挂掉了,来不及显式地释放锁,这块资源将会永远被锁住, 这将会导致死锁,所以必须设置一个超时时间
  • A 加的锁 B 不能去 del 掉,谁加的锁就谁去解,们一般把 value 设为当前线程的 Id, Thread.currentThread().getId(),然后在删的时候判断下是不是当前线程。
  • 验证和释放锁是两个独立操作,不是原子性, 使用 Lua 脚本,即 if redis.call('get', KEYS[1]) == ARGV[1] then returnredis.call('del', KEYS[1]) else return 0 end,它能给们保证原子性。

    当 redis.call() 在执行命令的过程中发生错误时,脚本会停止执行,并返回一个脚本错误,

    错误的输出信息会说明错误造成的原因:

Redisson

Redisson 是 Java 的 Redis 客户端之一,提供了一些 API 方便操作 Redis。 Redisson 跟 Jedis 定位不同,它不是一个单纯的 Redis 客户端,

而是基于 Redis 实现的分布式的服务,

锁只是它的冰山一角,并且它对主从,哨兵,集群等模式都支持。

public class LockTest{
    private static Redis sonClient redissonClient;

    static {
        Config config = new Config();
        config. useSingleServer().setAddress("redis://127.0.0.1:6379");
        redissonClient = Redisson.create ( config);
    }
    public static void main(String[] args) throws InterruptedException {
        RLock rLock = redissonClient . getLock("zwt" );
        //最多等待100秒、 上锁10s以后自动解锁
        if (rLock.tryLock(100, 10, TimeUnit. SECONDS)) {
            System . out . println("获取锁成功" );
        }
        //Thread. sleep(20000);
        rLock. unlock( );
        redissonClient . shutdown();
    }
}

这里获取锁有很多种的方式,有公平锁有读写锁,们使用的是 redissonClient.getLock, 这是一个可重入锁。

在加锁的时候,写入了一个 HASH 类型的值,key 是锁名称 zwt,field 是线程的名称,而 value 是 1(即表示锁的重入次数)。

点进 tryLock() 方法的 tryAcquire() 方法,再到->tryAcquireAsync() 再到->tryLockInnerAsync(),

终于见到庐山真面目了,原来它最终也是通过 Lua 脚本来实现的。

<T> RFuture<T> tryLockInnerAsync (long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit. toMillis( leaseTime);
    return commandExecutor . evalWriteAsync getName(), LongCodec . INSTANCE, command,
                "if (redis.call( 'exists', KEYS[1]) == 0) then" +
                    "redis.call('hset', KEYS[1], ARGV[2], 1);" + 
                    "redis. call('pexpire', KEYS[1], ARGV[1]);" +
                    "return nil;" +
                "end; " + 
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
                    "redis. call( 'hincrby', KEYS[1], ARGV[2], 1);" +
                    "redis. call( 'pexpire', KEYS[1], ARGV[1]);" +
                    "return nil;" +
                "end;" +
                "return redis.call('pttl', KEYS[1]);",
                Collections.<~>singLetonL ist(getName()), internalLockLeaseTime, getLockName (threadId));
}
Lua脚本拉出来分析一下:
// KEYS[1] 锁名称 updateAccount
// ARGV[1] key 过期时间 10000ms
// ARGV[2] 线程名称
// 锁名称不存在
if (redis.call('exists', KEYS[1]) == 0) then
// 创建一个 hash,key=锁名称,field=线程名,value=1
redis.call('hset', KEYS[1], ARGV[2], 1);
// 设置 hash 的过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 锁名称存在,判断是否当前线程持有的锁
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
// 如果是,value+1,代表重入次数+1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
// 重新获得锁,需要重新设置 Key 的过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 锁存在,但是不是当前线程持有,返回过期时间(毫秒)
return redis.call('pttl', KEYS[1]);
unlock() 中的 unlockInnerAsync() 释放锁,同样也是通过 Lua 脚本实现。
// KEYS[1] 锁的名称 updateAccount
// KEYS[2] 频道名称 redisson_lock__channel:
// ARGV[1] 释放锁的消息 0
// ARGV[2] 锁释放时间 10000
// ARGV[3] 线程名称
// 锁不存在(过期或者已经释放了)
if (redis.call('exists', KEYS[1]) == 0) then
// 发布锁已经释放的消息
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
// 锁存在,但是不是当前线程加的锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
// 锁存在,是当前线程加的锁
// 重入次数-1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
// -1 后大于 0,说明这个线程持有这把锁还有其他的任务需要执行
if (counter > 0) then
// 重新设置锁的过期时间
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
// -1 之后等于 0,现在可以删除锁了
redis.call('del', KEYS[1]);
// 删除之后发布释放锁的消息
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
// 其他情况返回 nil
return nil;

问题

  • 业务没执行完,锁到期了怎么办,这个由watchdog来保障。
  • 集群模式下,如果对多个master加锁,导致重复加锁怎么办,Redission会自动选择同一个 master。
  • 业务没执行完,Redis master挂掉了怎么办,没关系,Redis slave还有这个数据。

RedLock

名字由来 RedLock 的中文是直译过来的,就叫红锁。

红锁并非是一个工具,而是 Redis 官方提出的一种分布式锁的算法

们知道如果采用单机部署模式,会存在单点问题,只要 redis 故障了,加锁就不行了。 如果采用 master-slave 模式,加锁的时候只对一个节点加锁,

即便通过 sentinel 做了高可用,但是如果 master 节点故障了,发生主从切换,

此时就会有可能出现锁丢失的问题。

基于以上的考虑,其实 redis 的作者 Antirez 也考虑到这个问题,他提出了一个 RedLock 的算法。

算法实现

通过以下步骤获取一把锁:
1.获取当前时间戳,单位是毫秒
2.轮流尝试在每个 master 节点上创建锁,过期时间设置较短,一般就几十毫秒
3.尝试在大多数节点上建立一个锁,比如5个节点就要求是3个节点(n / 2 +1)
4.客户端计算建立好锁的时间,如果建立锁的时间小于超时时间,就算建立成功了
5.要是锁建立失败了,那么就依次删除这个锁
6.只要别人建立了一把分布式锁,你就得不断轮询去尝试获取锁
RLock lock1 = redissonInstance1. getLock("lock1");
RLock lock2 = redissonInstance2. getLock("lock2");
RLock 1ock3 = redissonInstance3. getLock("lock3");
RedissonRedLock lock = new RedissonRedlock(lock1, lock2, lock3);
//同时加锁。lock1 lock2 lock3
//红锁在大部分节点上加锁成功就算成功。
lock.lock();
//............
lock.unlock();

Zookeeper写法(Curator)

获取锁

Client1 得到了锁,Client2 监听了 Lock1,Client3 监听了 Lock2。这恰恰形成了一个等待队列

释放锁 1.任务完成,客户端显示释放

当任务完成时,Client1 会显示调用删除节点 Lock1 的指令。

2.任务执行过程中,客户端崩溃

获得锁的 Client1 在任务执行过程中,如果 Duang 的一声崩溃,则会断开与 Zookeeper 服务端的链接。

根据临时节点的特性,相关联的节点 Lock1 会随之自动删除。 Client2 一直监听着 Lock1 的存在状态,当 Lock1 节点被删除,Client2 会立刻收到通知。

这时候 Client2 会再次查询 ParentLock 下面的所有节点,确认自己创建的节点 Lock2 是不是目前最小的节点。

如果是最小,则 Client2 顺理成章获得了锁。

Curator

在 Apache 的开源框架 Apache Curator 中,包含了对 Zookeeper 分布式锁的实现。

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.3.0</version>
</dependency>

Curator的几种锁的实现

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式排它锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器
    public class ZkDistributedLock implements DistributedLock {
    private final CuratorF ramework client;
    public ZkDistributedLock ( CuratorFramework client) { this.client = client; }
    @override
    public void acquire(String key) throws Exception {
        InterProcessMutex lock = new InterProcessMutex(client, key);
    lock.acquire();
    }
    @Override
    public boolean acquire(String key, long maxwait, TimeUnit waitunit) throws Exception{
        InterProcessMutex lock = new InterProcessMutex(client, key);
        return lock.acquire (maxWait, waitUnit);
    }
    @Override
    public void release(String key) throws Exception {
        InterProcessMutex lock = new InterProcessMutex(client, key);
        lock. release();
    }
    }
    

总结

zookeeper 天生设计定位就是分布式协调,强一致性,锁很健壮。

如果获取不到锁,只需要添加一个监听器就可以了,不用一直轮询,性能消耗较小。 缺点: 在高请求高并发下,系统疯狂的加锁释放锁,最后 zk 承受不住这么大的压力可能会存在宕机的风险。

zk 锁性能比 redis 低的原因:zk 中的角色分为 leader,flower,

每次写请求只能请求 leader,leader 会把写请求广播到所有 flower,

如果 flower 都成功才会提交给 leader,在加锁的时候是一个写请求,

当写请求很多时,zk 会有很大的压力,最后导致服务器响应很慢。

redis 锁实现简单,理解逻辑简单,性能好,可以支撑高并发的获取、释放锁操作。 缺点: Redis 容易单点故障,集群部署,并不是强一致性的,锁的不够健壮;

​ key 的过期时间设置多少不明确,只能根据实际情况调整;

​ 需要自己不断去尝试获取锁,比较消耗性能。

最后不管 redis 还是 zookeeper,它们都应满足分布式锁的特性:

  • 具备可重入特性(已经获得锁的线程在执行的过程中不需要再次获得锁)
  • 异常或者超时自动删除,避免死锁
  • 互斥性,只有一个客户端能够持有锁
  • 分布式环境下高性能、高可用、容错机制

各有千秋,具体业务场景具体使用。

原文创作:ML李嘉图

原文链接:https://www.cnblogs.com/zwtblog/p/15185894.html

文章列表

更多推荐

更多
  • Java8反应式编程指南-六、使用调度器获得并发性和并行性 RxJava 的调度器,缓冲、节流和去抖动,调试可观察对象及其调度器,可观察的间隔及其默认调度程序,调度器的类型,将可观察对象和调度器相结合,平行性,节流,去抖动,缓冲器和窗口操作器,背压操作人员,调度器。即时调度器,调度员。蹦床调度员
  • Java8反应式编程指南-七、测试 RxJava 应用 使用简单订阅进行测试,阻塞可观测类,聚合运算符和 BlockingObservable 类,使用聚合运算符和 BlockingObservable 类进行测试,使用 TestSubscriber 类进行深入测试,借助 TestSched
  • Java8反应式编程指南-八、资源管理与 RxJava 扩展 资源管理,使用 Observable.cache 缓存数据,使用升降机创建自定义操作员,使用 Observable.compose 运算符组合多个运算符,介绍可观察的使用方法, 通过前面的章节,我们已经学习了如何使用 RxJava
  • Java8反应式编程指南-五、组合器、条件和错误处理 结合可观察实例,条件运算符,处理错误,HTTP 客户端示例,拉链操作员,组合测试操作符,合并运算符,concat 操作员,电磁轴承操作员,takeUntil、takeWhile、skipUntil和 skipWhile条件运算符,def
  • Java8反应式编程指南-四、转换、过滤和积累您的数据 可观测变换,过滤数据,积累数据,使用各种 flatMap 运算符的变换,分组项目,附加有用的变换运算符, 现在我们有了从各种源数据创建`Observable`实例的方法,是时候围绕这些实例构建编程逻辑了。我们将介绍用于实现逐步计算
  • Java8反应式编程指南-三、创建和连接可观察对象、观察者和主体 从方法中观察到的,可观察的、公正的方法,其他可观察的工厂方法,可观察的创建方法,订阅和取消订阅,冷热可观察实例,主体实例,可连通可观测类, RxJava 的`Observable`实例是反应式应用的构建块,RxJava 的这一优势
  • Java8反应式编程指南-一、反应式编程简介 什么是反应式编程?,我们为什么要被动?,介绍 RxJava,下载并设置 RxJava,比较迭代器模式和 RxJava 可观测值, 如今,术语反应式编程正在成为一种趋势。各种编程语言的库和框架正在涌现。关于反应式编程的博客文
  • Java8反应式编程指南-二、使用 Java 8 的函数结构 Java 8 中的 Lambdas,用 lambdas 实现无功和示例,纯函数与高阶函数,引入新的语法和语义,Java 8 和 RxJava 中的功能接口,纯函数,高阶函数,RxJava 与函数式编程, 函数式编程不是一个新概念;
  • Java8反应式编程指南-零、序言 这本书涵盖的内容,这本书你需要什么,这本书是给谁的,公约,读者反馈,客户支持,下载示例代码,勘误表,盗版,问题, 反应式编程已经存在了几十年。从 Smalltalk 还是一种年轻的语言时起,就有一些反应式编程的实现。然而,它只是最
  • Go编程秘籍-三、数据转换与组合 本章将展示一些在数据类型之间转换、使用非常大的数字、使用货币、使用不同类型的编码和解码(包括 Base64 和gob)以及使用闭包创建自定义集合的示例。转换数据类型和接口转换,使用 math 和 math/big ...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多