主頁(yè) > 知識(shí)庫(kù) > redis實(shí)現(xiàn)延時(shí)隊(duì)列的兩種方式(小結(jié))

redis實(shí)現(xiàn)延時(shí)隊(duì)列的兩種方式(小結(jié))

熱門(mén)標(biāo)簽:鄭州人工智能電銷(xiāo)機(jī)器人系統(tǒng) 日本中國(guó)地圖標(biāo)注 十堰營(yíng)銷(xiāo)電銷(xiāo)機(jī)器人哪家便宜 山東外呼銷(xiāo)售系統(tǒng)招商 魔獸2青云地圖標(biāo)注 北京400電話(huà)辦理收費(fèi)標(biāo)準(zhǔn) 宿遷便宜外呼系統(tǒng)平臺(tái) 貴州電銷(xiāo)卡外呼系統(tǒng) 超呼電話(huà)機(jī)器人

背景

項(xiàng)目中的流程監(jiān)控,有幾種節(jié)點(diǎn),需要監(jiān)控每一個(gè)節(jié)點(diǎn)是否超時(shí)。按傳統(tǒng)的做法,肯定是通過(guò)定時(shí)任務(wù),去掃描然后判斷,但是定時(shí)任務(wù)有缺點(diǎn):1,數(shù)據(jù)量大會(huì)慢;2,時(shí)間不好控制,太短,怕一次處理不完,太長(zhǎng)狀態(tài)就會(huì)有延遲。所以就想到用延遲隊(duì)列的方式去實(shí)現(xiàn)。

一,redis的過(guò)期key監(jiān)控

1,開(kāi)啟過(guò)期key監(jiān)聽(tīng)

在redis的配置里把這個(gè)注釋去掉

notify-keyspace-events Ex

然后重啟redis

2,使用redis過(guò)期監(jiān)聽(tīng)實(shí)現(xiàn)延遲隊(duì)列

繼承KeyExpirationEventMessageListener類(lèi),實(shí)現(xiàn)父類(lèi)的方法,就可以監(jiān)聽(tīng)key過(guò)期時(shí)間了。當(dāng)有key過(guò)期,就會(huì)執(zhí)行這里。這里就把需要的key過(guò)濾出來(lái),然后發(fā)送給kafka隊(duì)列。

@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

  @Autowired
  private KafkaProducerService kafkaProducerService;

  public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
    super(listenerContainer);
  }

  /**
   * 針對(duì) redis 數(shù)據(jù)失效事件,進(jìn)行數(shù)據(jù)處理
   * @param message
   * @param pattern
   */
  @Override
  public void onMessage(Message message, byte[] pattern){
    if(message == null || StringUtils.isEmpty(message.toString())){
      return;
    }
    String content = message.toString();
    //key的格式為  flag:時(shí)效類(lèi)型:運(yùn)單號(hào) 示例如下
    try {
      if(content.startsWith(AbnConstant.EMS)){
        kafkaProducerService.sendMessageSync(TopicConstant.EMS_WAYBILL_ABN_QUEUE,content);
      }else if(content.startsWith(AbnConstant.YUNDA)){
        kafkaProducerService.sendMessageSync(TopicConstant.YUNDA_WAYBILL_ABN_QUEUE,content);
      }
    } catch (Exception e) {
      log.error("監(jiān)控過(guò)期key,發(fā)送kafka異常,",e);
    }
  }
}

可以看的出來(lái),這種方式其實(shí)是很簡(jiǎn)單的,但是有幾個(gè)問(wèn)題需要注意,一是,這個(gè)盡量單機(jī)運(yùn)行,因?yàn)槎嗯_(tái)機(jī)器都會(huì)執(zhí)行,浪費(fèi)cpu,增加數(shù)據(jù)庫(kù)負(fù)擔(dān)。二是,機(jī)器頻繁部署的時(shí)候,如果有時(shí)間間隔,會(huì)出現(xiàn)數(shù)據(jù)的漏處理。

二,redis的zset實(shí)現(xiàn)延遲隊(duì)列

1,生產(chǎn)者實(shí)現(xiàn)

可以看到生產(chǎn)者很簡(jiǎn)單,其實(shí)就是利用zset的特性,給一個(gè)zset添加元素而已,而時(shí)間就是它的score。

public void produce(Integer taskId, long exeTime) {
  System.out.println("加入任務(wù), taskId: " + taskId + ", exeTime: " + exeTime + ", 當(dāng)前時(shí)間:" + LocalDateTime.now());
  RedisOps.getJedis().zadd(RedisOps.key, exeTime, String.valueOf(taskId));
}

2,消費(fèi)者實(shí)現(xiàn)

消費(fèi)者的代碼也不難,就是把已經(jīng)過(guò)期的zset中的元素給刪除掉,然后處理數(shù)據(jù)。

public void consumer() {
  Executors.newSingleThreadExecutor().submit(new Runnable() {
    @Override
    public void run() {
      while (true) {
        SetString> taskIdSet = RedisOps.getJedis().zrangeByScore(RedisOps.key, 0, System.currentTimeMillis(), 0, 1);
        if (taskIdSet == null || taskIdSet.isEmpty()) {
          System.out.println("沒(méi)有任務(wù)");
 
        } else {
          taskIdSet.forEach(id -> {
            long result = RedisOps.getJedis().zrem(RedisOps.key, id);
            if (result == 1L) {
              System.out.println("從延時(shí)隊(duì)列中獲取到任務(wù),taskId:" + id + " , 當(dāng)前時(shí)間:" + LocalDateTime.now());
            }
          });
        }
        try {
          TimeUnit.MILLISECONDS.sleep(100);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
  });
}

可以看到這種方式其實(shí)是比上個(gè)方式要好的。因?yàn)?,他的那兩個(gè)缺點(diǎn)都被克服掉了。多臺(tái)機(jī)器也沒(méi)事兒,也不用再擔(dān)心部署時(shí)間間隔長(zhǎng)的問(wèn)題。

總結(jié)

兩個(gè)方式都是不錯(cuò)的,都能解決問(wèn)題。碰到問(wèn)題,多思考,多總結(jié)。

到此這篇關(guān)于redis實(shí)現(xiàn)延時(shí)隊(duì)列的兩種方式(小結(jié))的文章就介紹到這了,更多相關(guān)redis 延時(shí)隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

您可能感興趣的文章:
  • golang實(shí)現(xiàn)redis的延時(shí)消息隊(duì)列功能示例
  • 使用Redis實(shí)現(xiàn)延時(shí)任務(wù)的解決方案
  • 利用Redis實(shí)現(xiàn)延時(shí)處理的方法實(shí)例

標(biāo)簽:果洛 朝陽(yáng) 吉安 江蘇 臺(tái)州 大慶 北京 楊凌

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《redis實(shí)現(xiàn)延時(shí)隊(duì)列的兩種方式(小結(jié))》,本文關(guān)鍵詞  redis,實(shí)現(xiàn),延時(shí),隊(duì)列,的,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問(wèn)題,煩請(qǐng)?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無(wú)關(guān)。
  • 相關(guān)文章
  • 下面列出與本文章《redis實(shí)現(xiàn)延時(shí)隊(duì)列的兩種方式(小結(jié))》相關(guān)的同類(lèi)信息!
  • 本頁(yè)收集關(guān)于redis實(shí)現(xiàn)延時(shí)隊(duì)列的兩種方式(小結(jié))的相關(guān)信息資訊供網(wǎng)民參考!
  • 推薦文章