近期工作遇到需要業(yè)務(wù)場景如下,需要每天定時推送給另一系統(tǒng)一批數(shù)據(jù),但是由于系統(tǒng)是集群部署的,會造成統(tǒng)一情況下任務(wù)爭用的情況,所以需要增加分布式鎖來保證一定時間范圍內(nèi)有一個Job來完成定時任務(wù). 前期考慮的方案有采用ZooKeeper分布式任務(wù),Quartz分布式任務(wù)調(diào)度,但是由于Zookeeper需要增加額外組件,Quartz需要增加表,并且項目中現(xiàn)在已經(jīng)有Redis這一組件存在,所以考慮采用Redis分布式鎖的情況來完成分布式任務(wù)搶占這一功能
記錄一下走過的彎路.
第一版本:
@Override
public T> Long set(String key,T value, Long cacheSeconds) {
if (value instanceof HashMap) {
BoundHashOperations valueOperations = redisTemplate.boundHashOps(key);
valueOperations.putAll((Map) value);
valueOperations.expire(cacheSeconds, TimeUnit.SECONDS);
}
else{
//使用map存儲
BoundHashOperations valueOperations = redisTemplate.boundHashOps(key);
valueOperations.put(key, value);
//秒
valueOperations.expire(cacheSeconds, TimeUnit.SECONDS);
}
return null;
}
@Override
public void del(String key) {
redisTemplate.delete(key);
}
采用set 和 del 完成鎖的占用與釋放,后經(jīng)測試得知,set不是線程安全,在并發(fā)情況下常常會導(dǎo)致數(shù)據(jù)不一致.
第二版本:
/**
* 分布式鎖
* @param range 鎖的長度 允許有多少個請求搶占資源
* @param key
* @return
*/
public boolean getLock(int range, String key) {
ValueOperationsString, Integer> valueOper1 = template.opsForValue();
return valueOper1.increment(key, 1) = range;
}
/**
* 初始化鎖, 設(shè)置等于0
* @param key
* @param expireSeconds
* @return
*/
public void initLock(String key, Long expireSeconds) {
ValueOperationsString, Integer> operations = template.opsForValue();
template.setKeySerializer(new GenericJackson2JsonRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
operations.set(key, 0, expireSeconds * 1000);
}
/**
* 釋放鎖
* @param key
*/
public void releaseLock(String key) {
ValueOperationsString, Integer> operations = template.opsForValue();
template.setKeySerializer(new GenericJackson2JsonRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
template.delete(key);
}
采用redis的 increament操作完成鎖的搶占.但是釋放鎖時,是每個線程都可以刪除redis中的key值. 并且initLock會降上一次的操作給覆蓋掉,所以也廢棄掉此方法
最終版本:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnection;
import org.springframework.stereotype.Service;
import org.springframework.util.ReflectionUtils;
import redis.clients.jedis.Jedis;
import java.lang.reflect.Field;
import java.util.Collections;
@Service
public class RedisLock {
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final Long RELEASE_SUCCESS = 1L;
@Autowired
private RedisConnectionFactory connectionFactory;
/**
* 嘗試獲取分布式鎖
* @param lockKey 鎖
* @param requestId 請求標(biāo)識
* @param expireTime 超期時間
* @return 是否獲取成功
*/
public boolean lock(String lockKey, String requestId, int expireTime) {
Field jedisField = ReflectionUtils.findField(JedisConnection.class, "jedis");
ReflectionUtils.makeAccessible(jedisField);
Jedis jedis = (Jedis) ReflectionUtils.getField(jedisField, connectionFactory.getConnection());
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
/**
* 釋放分布式鎖
* @param lockKey 鎖
* @param requestId 請求標(biāo)識
* @return 是否釋放成功
*/
public boolean releaseLock(String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = getJedis().eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}
public Jedis getJedis() {
Field jedisField = ReflectionUtils.findField(JedisConnection.class, "jedis");
ReflectionUtils.makeAccessible(jedisField);
Jedis jedis = (Jedis) ReflectionUtils.getField(jedisField, connectionFactory.getConnection());
return jedis;
}
}