前言
分布式鎖一般有三種實(shí)現(xiàn)方式:1. 數(shù)據(jù)庫樂觀鎖;2. 基于Redis的分布式鎖;3. 基于ZooKeeper的分布式鎖。本篇博客將介紹第二種方式,基于Redis實(shí)現(xiàn)分布式鎖。雖然網(wǎng)上已經(jīng)有各種介紹Redis分布式鎖實(shí)現(xiàn)的博客,然而他們的實(shí)現(xiàn)卻有著各種各樣的問題,為了避免誤人子弟,本篇博客將詳細(xì)介紹如何正確地實(shí)現(xiàn)Redis分布式鎖。
可靠性
首先,為了確保分布式鎖可用,我們至少要確保鎖的實(shí)現(xiàn)同時滿足以下四個條件:
1.互斥性。在任意時刻,只有一個客戶端能持有鎖。
2.不會發(fā)生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續(xù)其他客戶端能加鎖。
3.具有容錯性。只要大部分的Redis節(jié)點(diǎn)正常運(yùn)行,客戶端就可以加鎖和解鎖。
4.解鈴還須系鈴人。加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了。
使用Redis的SETNX命令獲取分布式鎖的步驟:
•C1和C2線程同時檢查時間戳獲取鎖,執(zhí)行SETNX命令并都返回0,此時鎖仍被C3持有,并且C3已經(jīng)崩潰
•C1 DEL鎖
•C1 使用SETNX命令獲取鎖,并且成功
•C2 DEL鎖
•C2 使用SETNX命令獲取鎖,并且成功
•ERROR : 由于競態(tài)條件,C1和C2都獲取到了鎖
幸運(yùn)的是,以下面的步驟完全可以避免這種情況發(fā)生,看看C4線程如何操作
•C4使用SETNX命令獲取鎖
•C3已經(jīng)崩潰但是仍然持有鎖,所以Redis返回0給C4
•C4使用GET命令獲取鎖并檢查鎖是否已經(jīng)過期,如果沒有過期,則繼續(xù)等待一段時間并重新重試
•如果鎖已經(jīng)過期,C4嘗試 GETSET lock.foo current Unix timestamp + lock timeout + 1>
•利用GETSET語法,C4可以檢查舊時間是否仍然是過期時間,如果是,則獲取鎖
•如果另一個客戶端C5率先獲取到鎖,C4執(zhí)行GETSET命令后將返回非過期時間,然后C4繼續(xù)從頭開始重新嘗試獲取鎖。此操作C4將延長一點(diǎn)C5獲取到的鎖的過期時間,不過這不是什么大問題。
接下來我們用代碼的形式展現(xiàn):
package com.shuige.components.cache.redis;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* Description: 通用Redis幫助類
* User: zhouzhou
* Date: 2018-09-05
* Time: 15:39
*/
@Component
public class CommonRedisHelper {
public static final String LOCK_PREFIX = "redis_lock";
public static final int LOCK_EXPIRE = 300; // ms
@Autowired
RedisTemplate redisTemplate;
/**
* 最終加強(qiáng)分布式鎖
*
* @param key key值
* @return 是否獲取到
*/
public boolean lock(String key){
String lock = LOCK_PREFIX + key;
// 利用lambda表達(dá)式
return (Boolean) redisTemplate.execute((RedisCallback) connection -> {
long expireAt = System.currentTimeMillis() + LOCK_EXPIRE + 1;
Boolean acquire = connection.setNX(lock.getBytes(), String.valueOf(expireAt).getBytes());
if (acquire) {
return true;
} else {
byte[] value = connection.get(lock.getBytes());
if (Objects.nonNull(value) value.length > 0) {
long expireTime = Long.parseLong(new String(value));
if (expireTime System.currentTimeMillis()) {
// 如果鎖已經(jīng)過期
byte[] oldValue = connection.getSet(lock.getBytes(), String.valueOf(System.currentTimeMillis() + LOCK_EXPIRE + 1).getBytes());
// 防止死鎖
return Long.parseLong(new String(oldValue)) System.currentTimeMillis();
}
}
}
return false;
});
}
/**
* 刪除鎖
*
* @param key
*/
public void delete(String key) {
redisTemplate.delete(key);
}
}
如何使用呢,導(dǎo)入工具類后:
boolean lock = redisHelper.lock(key);
if (lock) {
// 執(zhí)行邏輯操作
redisHelper.delete(key);
} else {
// 設(shè)置失敗次數(shù)計數(shù)器, 當(dāng)?shù)竭_(dá)5次時, 返回失敗
int failCount = 1;
while(failCount = 5){
// 等待100ms重試
try {
Thread.sleep(100l);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (redisHelper.lock(key)){
// 執(zhí)行邏輯操作
redisHelper.delete(key);
}else{
failCount ++;
}
}
throw new RuntimeException("現(xiàn)在創(chuàng)建的人太多了, 請稍等再試");
}
加鎖成功執(zhí)行完邏輯后, 必須解鎖, 否則只能靠鎖機(jī)制來解鎖了不建議這么做
總結(jié)
以上所述是小編給大家介紹的Redis Template實(shí)現(xiàn)分布式鎖的實(shí)例代碼,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復(fù)大家的。在此也非常感謝大家對腳本之家網(wǎng)站的支持!
您可能感興趣的文章:- redis分布式鎖的go-redis實(shí)現(xiàn)方法詳解
- 詳解一種用django_cache實(shí)現(xiàn)分布式鎖的方式
- go如何利用orm簡單實(shí)現(xiàn)接口分布式鎖
- mongo分布式鎖Java實(shí)現(xiàn)方法(推薦)
- 淺談Redis分布式鎖的正確實(shí)現(xiàn)方式
- Java使用Redisson分布式鎖實(shí)現(xiàn)原理
- Go 語言下基于Redis分布式鎖的實(shí)現(xiàn)方式