主頁 > 知識庫 > Redis分布式鎖升級版RedLock及SpringBoot實現(xiàn)方法

Redis分布式鎖升級版RedLock及SpringBoot實現(xiàn)方法

熱門標(biāo)簽:北京400電話辦理收費標(biāo)準(zhǔn) 山東外呼銷售系統(tǒng)招商 貴州電銷卡外呼系統(tǒng) 魔獸2青云地圖標(biāo)注 十堰營銷電銷機器人哪家便宜 日本中國地圖標(biāo)注 宿遷便宜外呼系統(tǒng)平臺 鄭州人工智能電銷機器人系統(tǒng) 超呼電話機器人

分布式鎖概覽

在多線程的環(huán)境下,為了保證一個代碼塊在同一時間只能由一個線程訪問,Java中我們一般可以使用synchronized語法和ReetrantLock去保證,這實際上是本地鎖的方式。但是現(xiàn)在公司都是流行分布式架構(gòu),在分布式環(huán)境下,如何保證不同節(jié)點的線程同步執(zhí)行呢?因此就引出了分布式鎖,它是控制分布式系統(tǒng)之間互斥訪問共享資源的一種方式。

在一個分布式系統(tǒng)中,多臺機器上部署了多個服務(wù),當(dāng)客戶端一個用戶發(fā)起一個數(shù)據(jù)插入請求時,如果沒有分布式鎖機制保證,那么那多臺機器上的多個服務(wù)可能進行并發(fā)插入操作,導(dǎo)致數(shù)據(jù)重復(fù)插入,對于某些不允許有多余數(shù)據(jù)的業(yè)務(wù)來說,這就會造成問題。而分布式鎖機制就是為了解決類似這類問題,保證多個服務(wù)之間互斥的訪問共享資源,如果一個服務(wù)搶占了分布式鎖,其他服務(wù)沒獲取到鎖,就不進行后續(xù)操作。大致意思如下圖所示:

分布式鎖的特點

分布式鎖一般有如下的特點:

  • 互斥性: 同一時刻只能有一個線程持有鎖
  • 可重入性: 同一節(jié)點上的同一個線程如果獲取了鎖之后能夠再次獲取鎖
  • 鎖超時:和J.U.C中的鎖一樣支持鎖超時,防止死鎖
  • 高性能和高可用: 加鎖和解鎖需要高效,同時也需要保證高可用,防止分布式鎖失效
  • 具備阻塞和非阻塞性:能夠及時從阻塞狀態(tài)中被喚醒

分布式鎖的實現(xiàn)方式

我們一般實現(xiàn)分布式鎖有以下幾種方式:

  • 基于數(shù)據(jù)庫
  • 基于Redis
  • 基于zookeeper

Redis普通分布式鎖存在的問題

說到Redis分布式鎖,大部分人都會想到:setnx+lua(redis保證執(zhí)行l(wèi)ua腳本時不執(zhí)行其他操作,保證操作的原子性),或者知道set key value px milliseconds nx。后一種方式的核心實現(xiàn)命令如下:

- 獲取鎖(unique_value可以是UUID等)
SET resource_name unique_value NX PX 30000

- 釋放鎖(lua腳本中,一定要比較value,防止誤解鎖)
if redis.call("get",KEYS[1]) == ARGV[1] then 
 return redis.call("del",KEYS[1])
else 
 return 0
end

這種實現(xiàn)方式有3大要點(也是面試概率非常高的地方):

  • set命令要用set key value px milliseconds nx;
  • value要具有唯一性;
  • 釋放鎖時要驗證value值,不能誤解鎖;

事實上這類鎖最大的缺點就是它加鎖時只作用在一個Redis節(jié)點上,即使Redis通過sentinel保證高可用,如果這個master節(jié)點由于某些原因發(fā)生了主從切換,那么就會出現(xiàn)鎖丟失的情況:

  1. 在Redis的master節(jié)點上拿到了鎖;
  2. 但是這個加鎖的key還沒有同步到slave節(jié)點;
  3. master故障,發(fā)生故障轉(zhuǎn)移,slave節(jié)點升級為master節(jié)點;
  4. 導(dǎo)致鎖丟失。

為了避免單點故障問題,Redis作者antirez基于分布式環(huán)境下提出了一種更高級的分布式鎖的實現(xiàn)方式:Redlock。Redlock也是Redis所有分布式鎖實現(xiàn)方式中唯一能讓面試官高潮的方式。

Redis高級分布式鎖:Redlock

antirez提出的redlock算法大概是這樣的:

在Redis的分布式環(huán)境中,我們假設(shè)有N個Redis master。這些節(jié)點完全互相獨立,不存在主從復(fù)制或者其他集群協(xié)調(diào)機制。我們確保將在N個實例上使用與在Redis單實例下相同方法獲取和釋放鎖?,F(xiàn)在我們假設(shè)有5個Redis master節(jié)點,同時我們需要在5臺服務(wù)器上面運行這些Redis實例,這樣保證他們不會同時都宕掉。

為了取到鎖,客戶端應(yīng)該執(zhí)行以下操作:

  • 獲取當(dāng)前Unix時間,以毫秒為單位。
  • 依次嘗試從5個實例,使用相同的key和具有唯一性的value(例如UUID)獲取鎖。當(dāng)向Redis請求獲取鎖時,客戶端應(yīng)該設(shè)置一個網(wǎng)絡(luò)連接和響應(yīng)超時時間,這個超時時間應(yīng)該小于鎖的失效時間。例如你的鎖自動失效時間TTL為10秒,則超時時間應(yīng)該在5-50毫秒之間。這樣可以避免服務(wù)器端Redis已經(jīng)掛掉的情況下,客戶端還在死死地等待響應(yīng)結(jié)果。如果服務(wù)器端沒有在規(guī)定時間內(nèi)響應(yīng),客戶端應(yīng)該盡快嘗試去另外一個Redis實例請求獲取鎖。
  • 客戶端使用當(dāng)前時間減去開始獲取鎖時間(步驟1記錄的時間)就得到獲取鎖使用的時間。當(dāng)且僅當(dāng)從大多數(shù)(N/2+1,這里是3個節(jié)點)的Redis節(jié)點都取到鎖,并且使用的時間小于鎖失效時間時,鎖才算獲取成功。
  • 如果取到了鎖,key的真正有效時間等于有效時間減去獲取鎖所使用的時間(步驟3計算的結(jié)果)。
  • 如果因為某些原因,獲取鎖失敗(沒有在至少N/2+1個Redis實例取到鎖或者取鎖時間已經(jīng)超過了有效時間),客戶端應(yīng)該在所有的Redis實例上進行解鎖(即便某些Redis實例根本就沒有加鎖成功,防止某些節(jié)點獲取到鎖但是客戶端沒有得到響應(yīng)而導(dǎo)致接下來的一段時間不能被重新獲取鎖)。
  • 此處不討論時鐘漂移

Redlock源碼

redisson已經(jīng)有對redlock算法封裝,接下來對其用法進行簡單介紹,并對核心源碼進行分析(假設(shè)5個redis實例)。

1. Redlock依賴

!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
dependency>
 groupId>org.redisson/groupId>
 artifactId>redisson/artifactId>
 version>3.3.2/version>
/dependency>

2. Redlock用法

首先,我們來看一下redission封裝的redlock算法實現(xiàn)的分布式鎖用法,非常簡單,跟重入鎖(ReentrantLock)有點類似:

Config config = new Config();
config.useSentinelServers().addSentinelAddress("127.0.0.1:6369","127.0.0.1:6379", "127.0.0.1:6389")
 .setMasterName("masterName")
 .setPassword("password").setDatabase(0);
RedissonClient redissonClient = Redisson.create(config);
// 還可以getFairLock(), getReadWriteLock()
RLock redLock = redissonClient.getLock("REDLOCK_KEY");
boolean isLock;
try {
 isLock = redLock.tryLock();
 // 500ms拿不到鎖, 就認(rèn)為獲取鎖失敗。10000ms即10s是鎖失效時間。
 isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
 if (isLock) {
 //TODO if get lock success, do something;
 }
} catch (Exception e) {
} finally {
 // 無論如何, 最后都要解鎖
 redLock.unlock();
}

3. Redlock唯一ID

實現(xiàn)分布式鎖的一個非常重要的點就是set的value要具有唯一性,redisson的value是怎樣保證value的唯一性呢?答案是UUID+threadId。入口在redissonClient.getLock("REDLOCK_KEY"),源碼在Redisson.java和RedissonLock.java中:

protected final UUID id = UUID.randomUUID();
String getLockName(long threadId) {
 return id + ":" + threadId;
}

4. Redlock獲取鎖

獲取鎖的代碼為redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),兩者的最終核心源碼都是下面這段代碼,只不過前者獲取鎖的默認(rèn)租約時間(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s:

T> RFutureT> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommandT> command) {
 internalLockLeaseTime = unit.toMillis(leaseTime);
 // 獲取鎖時向5個redis實例發(fā)送的命令
 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
  // 首先分布式鎖的KEY不能存在,如果確實不存在,那么執(zhí)行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通過pexpire設(shè)置失效時間(也是鎖的租約時間)
  "if (redis.call('exists', KEYS[1]) == 0) then " +
   "redis.call('hset', KEYS[1], ARGV[2], 1); " +
   "redis.call('pexpire', KEYS[1], ARGV[1]); " +
   "return nil; " +
  "end; " +
  // 如果分布式鎖的KEY已經(jīng)存在,并且value也匹配,表示是當(dāng)前線程持有的鎖,那么重入次數(shù)加1,并且設(shè)置失效時間
  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
   "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
   "redis.call('pexpire', KEYS[1], ARGV[1]); " +
   "return nil; " +
  "end; " +
  // 獲取分布式鎖的KEY的失效時間毫秒數(shù)
  "return redis.call('pttl', KEYS[1]);",
  // 這三個參數(shù)分別對應(yīng)KEYS[1],ARGV[1]和ARGV[2]
  Collections.Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

獲取鎖的命令中,

  • KEYS[1]就是Collections.singletonList(getName()),表示分布式鎖的key,即REDLOCK_KEY;
  • ARGV[1]就是internalLockLeaseTime,即鎖的租約時間,默認(rèn)30s;
  • ARGV[2]就是getLockName(threadId),是獲取鎖時set的唯一值,即UUID+threadId:

5. Redlock釋放鎖

釋放鎖的代碼為redLock.unlock(),核心源碼如下:

protected RFutureBoolean> unlockInnerAsync(long threadId) {
 // 向5個redis實例都執(zhí)行如下命令
 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  // 如果分布式鎖KEY不存在,那么向channel發(fā)布一條消息
  "if (redis.call('exists', KEYS[1]) == 0) then " +
  "redis.call('publish', KEYS[2], ARGV[1]); " +
  "return 1; " +
  "end;" +
  // 如果分布式鎖存在,但是value不匹配,表示鎖已經(jīng)被占用,那么直接返回
  "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
  "return nil;" +
  "end; " +
  // 如果就是當(dāng)前線程占有分布式鎖,那么將重入次數(shù)減1
  "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
  // 重入次數(shù)減1后的值如果大于0,表示分布式鎖有重入過,那么只設(shè)置失效時間,還不能刪除
  "if (counter > 0) then " +
  "redis.call('pexpire', KEYS[1], ARGV[2]); " +
  "return 0; " +
  "else " +
  // 重入次數(shù)減1后的值如果為0,表示分布式鎖只獲取過1次,那么刪除這個KEY,并發(fā)布解鎖消息
  "redis.call('del', KEYS[1]); " +
  "redis.call('publish', KEYS[2], ARGV[1]); " +
  "return 1; "+
  "end; " +
  "return nil;",
  // 這5個參數(shù)分別對應(yīng)KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
  Arrays.Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

Redis實現(xiàn)的分布式鎖輪子

下面利用SpringBoot + Jedis + AOP的組合來實現(xiàn)一個簡易的分布式鎖。

1. 自定義注解

自定義一個注解,被注解的方法會執(zhí)行獲取分布式鎖的邏輯

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RedisLock {
 /**
 * 業(yè)務(wù)鍵
 *
 * @return
 */
 String key();
 /**
 * 鎖的過期秒數(shù),默認(rèn)是5秒
 *
 * @return
 */
 int expire() default 5;

 /**
 * 嘗試加鎖,最多等待時間
 *
 * @return
 */
 long waitTime() default Long.MIN_VALUE;
 /**
 * 鎖的超時時間單位
 *
 * @return
 */
 TimeUnit timeUnit() default TimeUnit.SECONDS;
}

2. AOP攔截器實現(xiàn)

在AOP中我們?nèi)?zhí)行獲取分布式鎖和釋放分布式鎖的邏輯,代碼如下:

@Aspect
@Component
public class LockMethodAspect {
 @Autowired
 private RedisLockHelper redisLockHelper;
 @Autowired
 private JedisUtil jedisUtil;
 private Logger logger = LoggerFactory.getLogger(LockMethodAspect.class);

 @Around("@annotation(com.redis.lock.annotation.RedisLock)")
 public Object around(ProceedingJoinPoint joinPoint) {
 Jedis jedis = jedisUtil.getJedis();
 MethodSignature signature = (MethodSignature) joinPoint.getSignature();
 Method method = signature.getMethod();

 RedisLock redisLock = method.getAnnotation(RedisLock.class);
 String value = UUID.randomUUID().toString();
 String key = redisLock.key();
 try {
  final boolean islock = redisLockHelper.lock(jedis,key, value, redisLock.expire(), redisLock.timeUnit());
  logger.info("isLock : {}",islock);
  if (!islock) {
  logger.error("獲取鎖失敗");
  throw new RuntimeException("獲取鎖失敗");
  }
  try {
  return joinPoint.proceed();
  } catch (Throwable throwable) {
  throw new RuntimeException("系統(tǒng)異常");
  }
 } finally {
  logger.info("釋放鎖");
  redisLockHelper.unlock(jedis,key, value);
  jedis.close();
 }
 }
}

3. Redis實現(xiàn)分布式鎖核心類

@Component
public class RedisLockHelper {
 private long sleepTime = 100;
 /**
 * 直接使用setnx + expire方式獲取分布式鎖
 * 非原子性
 *
 * @param key
 * @param value
 * @param timeout
 * @return
 */
 public boolean lock_setnx(Jedis jedis,String key, String value, int timeout) {
 Long result = jedis.setnx(key, value);
 // result = 1時,設(shè)置成功,否則設(shè)置失敗
 if (result == 1L) {
  return jedis.expire(key, timeout) == 1L;
 } else {
  return false;
 }
 }

 /**
 * 使用Lua腳本,腳本中使用setnex+expire命令進行加鎖操作
 *
 * @param jedis
 * @param key
 * @param UniqueId
 * @param seconds
 * @return
 */
 public boolean Lock_with_lua(Jedis jedis,String key, String UniqueId, int seconds) {
 String lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" +
  "redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
 ListString> keys = new ArrayList>();
 ListString> values = new ArrayList>();
 keys.add(key);
 values.add(UniqueId);
 values.add(String.valueOf(seconds));
 Object result = jedis.eval(lua_scripts, keys, values);
 //判斷是否成功
 return result.equals(1L);
 }

 /**
 * 在Redis的2.6.12及以后中,使用 set key value [NX] [EX] 命令
 *
 * @param key
 * @param value
 * @param timeout
 * @return
 */
 public boolean lock(Jedis jedis,String key, String value, int timeout, TimeUnit timeUnit) {
 long seconds = timeUnit.toSeconds(timeout);
 return "OK".equals(jedis.set(key, value, "NX", "EX", seconds));
 }

 /**
 * 自定義獲取鎖的超時時間
 *
 * @param jedis
 * @param key
 * @param value
 * @param timeout
 * @param waitTime
 * @param timeUnit
 * @return
 * @throws InterruptedException
 */
 public boolean lock_with_waitTime(Jedis jedis,String key, String value, int timeout, long waitTime,TimeUnit timeUnit) throws InterruptedException {
 long seconds = timeUnit.toSeconds(timeout);
 while (waitTime >= 0) {
  String result = jedis.set(key, value, "nx", "ex", seconds);
  if ("OK".equals(result)) {
  return true;
  }
  waitTime -= sleepTime;
  Thread.sleep(sleepTime);
 }
 return false;
 }
 /**
 * 錯誤的解鎖方法—直接刪除key
 *
 * @param key
 */
 public void unlock_with_del(Jedis jedis,String key) {
 jedis.del(key);
 }

 /**
 * 使用Lua腳本進行解鎖操縱,解鎖的時候驗證value值
 *
 * @param jedis
 * @param key
 * @param value
 * @return
 */
 public boolean unlock(Jedis jedis,String key,String value) {
 String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
  "return redis.call('del',KEYS[1]) else return 0 end";
 return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L);
 }
}

4. Controller層控制

定義一個TestController來測試我們實現(xiàn)的分布式鎖

@RestController
public class TestController {
 @RedisLock(key = "redis_lock")
 @GetMapping("/index")
 public String index() {
 return "index";
 }
}

站在巨人的肩膀上

1.Redlock:Redis分布式鎖最牛逼的實現(xiàn)

2.基于Redis的分布式鎖實現(xiàn)

到此這篇關(guān)于Redis分布式鎖升級版RedLock及SpringBoot實現(xiàn)的文章就介紹到這了,更多相關(guān)Redis分布式鎖RedLock內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

您可能感興趣的文章:
  • SpringBoot 集成 ShedLock 分布式鎖的示例詳解
  • SpringBoot之使用Redis實現(xiàn)分布式鎖(秒殺系統(tǒng))
  • SpringBoot中使用redis做分布式鎖的方法
  • SpringBoot整合Redis正確的實現(xiàn)分布式鎖的示例代碼
  • SpringBoot使用Redis實現(xiàn)分布式鎖
  • SpringBoot + Spring Cloud Consul 服務(wù)注冊和發(fā)現(xiàn)詳細解析
  • Spring boot2X Consul如何使用Feign實現(xiàn)服務(wù)調(diào)用
  • Spring boot2X Consul如何通過RestTemplate實現(xiàn)服務(wù)調(diào)用
  • 利用consul在spring boot中實現(xiàn)分布式鎖場景分析

標(biāo)簽:楊凌 果洛 吉安 北京 朝陽 江蘇 臺州 大慶

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《Redis分布式鎖升級版RedLock及SpringBoot實現(xiàn)方法》,本文關(guān)鍵詞  Redis,分布式,鎖,升級版,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請?zhí)峁┫嚓P(guān)信息告之我們,我們將及時溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。
  • 相關(guān)文章
  • 下面列出與本文章《Redis分布式鎖升級版RedLock及SpringBoot實現(xiàn)方法》相關(guān)的同類信息!
  • 本頁收集關(guān)于Redis分布式鎖升級版RedLock及SpringBoot實現(xiàn)方法的相關(guān)信息資訊供網(wǎng)民參考!
  • 推薦文章