多线程对分布式值进行自增+Redission的分布式锁源码解读
背景
最近入职了一家新公司,有一道题需要使用redis-lock解决分布式并发问题,所以这篇文章主要研究一下redis-lock是如何实现的。
首先,题目以及解析如下:
// 分布式的Lock,写一个多线程,创建一个Map给Map添加一个key-value,三个线程同时对这个key的value进行递增,保证线程安全。
@Test
public void testRedisson() throws InterruptedException {
RLock lock = Redisson.getLock("lock1");
RMapCache map = Redisson.getCachedMap("count2");
map.put("count2", 0);
for (int i = 0; i {
for (int j = 0; j
- tryAcquire尝试获取锁,如果获取到返回true
- 获取不到锁说明锁被占用了,订阅解锁消息通知
- 收到解锁消息通知,再次尝试获取锁,如果获取不到重复步骤三,直到超过waitTime获取锁失败
- 不论是否获取锁成功,取消解锁消息订阅。
```java
// 在waitTime时间范围内尝试获取锁,如果获取到锁,则设置锁过期时间leaseTime
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 第一步:尝试获取锁
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
// ttl为空说明获取到了锁
if (ttl == null) {
return true;
} else {
// 判断尝试获取锁是否超过waitTime
time -= System.currentTimeMillis() - current;
if (time subscribeFuture = this.subscribe(threadId);
try {
// 订阅锁释放消息,等待时间超过waitTime,获取锁失败
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException var21) {
if (!subscribeFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + time + "ms. Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
// 如果订阅解锁Future在执行中,等任务执行完后取消订阅锁释放
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
// 取消订阅解锁通知
this.unsubscribe(res, threadId);
}
});
}
this.acquireFailed(waitTime, unit, threadId);
return false;
} catch (ExecutionException var22) {
this.acquireFailed(waitTime, unit, threadId);
return false;
}
boolean var16;
try {
// 判断尝试获取锁以及订阅解锁消息的时间是否超过waitTime
time -= System.currentTimeMillis() - current;
if (time = 0L && ttl 0L);
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
} finally {
// 第四步:取消解锁订阅this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
}
return var16;
}
}
}
抢锁代码
下面是加锁的代码实现,包括抢锁以及看门狗的实现
private RFuture tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 这里需要注意的是leaseTime==-1,会触发redisson看门狗机制,此处的方法使用的是lua脚本
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// 获取锁成功
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 锁自动续时(看门狗机制)触发条件leaseTime == -1;此处底层也是lua脚本,下面会详述
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
抢锁的lua脚本
在下列代码中的lua脚本操作是原子的,其实lua脚本本身不是原子的,但是redis在执行lua脚本的操作是原子的。
// 下列代码中变量含义
// KEYS[1] = "锁key"
// ARGV[1] = "锁过期时间"
// ARGV[2] = "当前连接的UUID + : + 线程id“ 对应后面的getLockName方法返回
RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
// 如果key一开始就不存在,则直接创建一个key
"if (redis.call('exists', KEYS[1]) == 0) then " +
// 使用 `HINCRB。Y` 将哈希表 `KEYS[1]` 中的字段 `ARGV[2]`(表示当前线程的锁计数器)加 1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 设置该锁的过期时间为 `ARGV[1]`(即 `leaseTime` 转换为毫秒)。
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 这里是重入锁的实现,同一个线程多次获取锁只需要在value加1即可,value相当于一个加锁计数器
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// `HEXISTS` 用于检查当前线程(由 `ARGV[2]` 标识)是否已经持有锁。
// 如果线程已经持有锁(即锁为重入),通过 `HINCRBY` 将计数器加 1,并重置锁的过期时间。
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 有其他线程持有锁,加锁失败,返回锁过期时间
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
看门狗的实现
续时间的方法解读
protected void scheduleExpirationRenewal(long threadId) {
// 保存当前加锁key有那些线程自动续时,取消自动续时后会清除此对象内部数据
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
// 更新锁过期时间
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
此处是更新时间操作:首先判断锁是否占用,如果占用,则使用定时任务对当前锁进行续时。
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 定时任务(可以搜io.netty.util.HashedWheelTimer)
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 更新锁过期时间(lua脚本)
RFuture future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
// 更新锁过期时间成功
if (res) {
// 递归调用 如果10秒后依然没有解锁,继续更新锁过期时间
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
// internalLockLeaseTime在不设置lockWatchdogTimeout情况下默认30s,这里会延迟10s触发此任务
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
看门狗对应的更新过期时间的lua脚本
protected RFuture renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 当前线程已持有锁,更新锁过期时间
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
unLock源码解读
public void unlock() {
try {
// 解锁
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
unlockAsync()方法内部会调用lua解锁脚本
protected RFuture unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
// 推送解锁通知
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
加解锁的订阅发布类
在redissonLock中会有一个LockPubSub的实体类,这个就是消息发布订阅对应的实体,
可以看到此实体类,在发现消息时,会比较是否是释放锁的消息,如果是则将队列中的任务进行释放,通过回调的runnable来通知等待的线程
public class LockPubSub extends PublishSubscribe {
public static final Long UNLOCK_MESSAGE = 0L;
public static final Long READ_UNLOCK_MESSAGE = 1L;
public LockPubSub(PublishSubscribeService service) {
super(service);
}
protected RedissonLockEntry createEntry(CompletableFuture newPromise) {
return new RedissonLockEntry(newPromise);
}
protected void onMessage(RedissonLockEntry value, Long message) {
Runnable runnableToExecute;
if (message.equals(UNLOCK_MESSAGE)) {
runnableToExecute = (Runnable)value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
value.getLatch().release();
} else if (message.equals(READ_UNLOCK_MESSAGE)) {
while(true) {
runnableToExecute = (Runnable)value.getListeners().poll();
if (runnableToExecute == null) {
value.getLatch().release(value.getLatch().getQueueLength());
break;
}
runnableToExecute.run();
}
}
}
}
总结
加锁操作:
-
tryAcquire尝试获取锁,会通过lua脚本判断是否有锁占用,如果已经有锁占用则判断是否是当前客户端,如果是当前客户端,则说明是可重入的锁,当前客户端可以继续获得这把锁,如果获取到返回true。如果没抢到当前锁,则使用看门狗定时任务,每隔10s对持有锁加上过期时间。
-
获取不到锁说明锁被占用了,订阅解锁消息通知
-
收到解锁消息通知,再次尝试获取锁,如果获取不到重复步骤三,直到超过waitTime获取锁失败
-
不论是否获取锁成功,取消解锁消息订阅。
解锁操作:
-
发布解锁消息通知。
-
监听器判断为解锁操作时,执行等待队列中的任务,放开对应的锁。
加解锁消息通知
-
使用的是定义的LockPubSub进行通讯。
-
加锁时订阅此通道,
-
解锁时在此通道发布消息。
评论
暂无公开评论。