主頁(yè) > 知識(shí)庫(kù) > redis通過(guò)pipeline提升吞吐量的方法

redis通過(guò)pipeline提升吞吐量的方法

熱門(mén)標(biāo)簽:地圖標(biāo)注如何即時(shí)生效 太原營(yíng)銷(xiāo)外呼系統(tǒng) 玄武湖地圖標(biāo)注 西藏教育智能外呼系統(tǒng)價(jià)格 竹間科技AI電銷(xiāo)機(jī)器人 百度商家地圖標(biāo)注怎么做 小紅書(shū)怎么地圖標(biāo)注店 最簡(jiǎn)單的百度地圖標(biāo)注 地圖標(biāo)注費(fèi)用

案例目標(biāo)

簡(jiǎn)單介紹 redis pipeline 的機(jī)制,結(jié)合一段實(shí)例說(shuō)明pipeline 在提升吞吐量方面發(fā)生的效用。

案例背景

應(yīng)用系統(tǒng)在數(shù)據(jù)推送或事件處理過(guò)程中,往往出現(xiàn)數(shù)據(jù)流經(jīng)過(guò)多個(gè)網(wǎng)元;

然而在某些服務(wù)中,數(shù)據(jù)操作對(duì)redis 是強(qiáng)依賴(lài)的,在最近的一次分析中發(fā)現(xiàn):

一次數(shù)據(jù)推送會(huì)對(duì) redis 產(chǎn)生近30次讀寫(xiě)操作!

在數(shù)據(jù)推送業(yè)務(wù)中的性能壓測(cè)中,以數(shù)據(jù)上報(bào) -> 下發(fā)應(yīng)答為一次事務(wù);而對(duì)于這樣的讀寫(xiě)模型,redis 的操作過(guò)于頻繁,很快便導(dǎo)致系統(tǒng)延時(shí)過(guò)高,吞吐量低下,無(wú)法滿(mǎn)足目標(biāo);

優(yōu)化過(guò)程 主要針對(duì)業(yè)務(wù)代碼做的優(yōu)化,其中redis 操作經(jīng)過(guò)大量合并,最終降低到原來(lái)的1/5,而系統(tǒng)吞吐量也提升明顯。

其中,redis pipeline(管道機(jī)制) 的應(yīng)用是一個(gè)關(guān)鍵手段。

pipeline的解釋

Pipeline指的是管道技術(shù),指的是客戶(hù)端允許將多個(gè)請(qǐng)求依次發(fā)給服務(wù)器,過(guò)程中而不需要等待請(qǐng)求的回復(fù),在最后再一并讀取結(jié)果即可。

管道技術(shù)使用廣泛,例如許多POP3協(xié)議已經(jīng)實(shí)現(xiàn)支持這個(gè)功能,大大加快了從服務(wù)器下載新郵件的過(guò)程。 Redis很早就支持管道(pipeline)技術(shù)。(因此無(wú)論你運(yùn)行的是什么版本,你都可以使用管道(pipelining)操作Redis)

普通請(qǐng)求模型

[圖-pipeline1]

Pipeline請(qǐng)求模型

[圖-pipeline2]

從兩個(gè)圖的對(duì)比中可看出,普通的請(qǐng)求模型是同步的,每次請(qǐng)求對(duì)應(yīng)一次IO操作等待;

而Pipeline 化之后所有的請(qǐng)求合并為一次IO,除了時(shí)延可以降低之外,還能大幅度提升系統(tǒng)吞吐量。

代碼實(shí)例

說(shuō)明

本地開(kāi)啟50個(gè)線程,每個(gè)線程完成1000個(gè)key的寫(xiě)入,對(duì)比pipeline開(kāi)啟及不開(kāi)啟兩種場(chǎng)景下的性能表現(xiàn)。

相關(guān)常量

// 并發(fā)任務(wù)
private static final int taskCount = 50;
// pipeline大小
private static final int batchSize = 10;
// 每個(gè)任務(wù)處理命令數(shù)
private static final int cmdCount = 1000;

private static final boolean usePipeline = true;

初始化連接

JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxActive(200);
poolConfig.setMaxIdle(100);
poolConfig.setMaxWait(2000);
poolConfig.setTestOnBorrow(false);
poolConfig.setTestOnReturn(false);

jedisPool = new JedisPool(poolConfig, host, port);

并發(fā)啟動(dòng)任務(wù),統(tǒng)計(jì)執(zhí)行時(shí)間

public static void main(String[] args) throws InterruptedException {
  init();

  flushDB();

  long t1 = System.currentTimeMillis();
  ExecutorService executor = Executors.newCachedThreadPool();

  CountDownLatch latch = new CountDownLatch(taskCount);
  for (int i = 0; i  taskCount; i++) {
   executor.submit(new DemoTask(i, latch));
  }

  latch.await();
  executor.shutdownNow();

  long t2 = System.currentTimeMillis();

  System.out.println("execution finish time(s):" + (t2 - t1) / 1000.0);

 }

DemoTask 封裝了執(zhí)行key寫(xiě)入的細(xì)節(jié),區(qū)分不同場(chǎng)景

 public void run() {
   logger.info("Task[{}] start.", id);
   try {
    if (usePipeline) {
     runWithPipeline();
    } else {
     runWithNonPipeline();
    }
   } finally {
    latch.countDown();
   }

   logger.info("Task[{}] end.", id);
  }

不使用Pipeline的場(chǎng)景比較簡(jiǎn)單,循環(huán)執(zhí)行set操作

for (int i = 0; i  cmdCount; i++) {
    Jedis jedis = get();
    try {
     jedis.set(key(i), UUID.randomUUID().toString());
    } finally {
     if (jedis != null) {
      jedisPool.returnResource(jedis);
     }
    }
    if (i % batchSize == 0) {
     logger.info("Task[{}] process -- {}", id, i);
    }
   }

使用Pipeline,需要處理分段,如10個(gè)作為一批命令執(zhí)行

for (int i = 0; i  cmdCount;) {
    Jedis jedis = get();

    try {
     Pipeline pipeline = jedis.pipelined();
     int j;
     for (j = 0; j  batchSize; j++) {
      if (i + j  cmdCount) {
       pipeline.set(key(i + j), UUID.randomUUID().toString());
      } else {
       break;
      }
     }
     pipeline.sync();
     logger.info("Task[{}] pipeline -- {}", id, i + j);

     i += j;

    } finally {
     if (jedis != null) {
      jedisPool.returnResource(jedis);
     }
    }

   }

運(yùn)行結(jié)果

不使用Pipeline,整體執(zhí)行26s;而使用Pipeline優(yōu)化后的代碼,執(zhí)行時(shí)間僅需要3s!

NoPipeline-stat

[圖-nopipeline]

Pipeline-stat

[圖-pipeline]

注意事項(xiàng)

pipeline機(jī)制可以?xún)?yōu)化吞吐量,但無(wú)法提供原子性/事務(wù)保障,而這個(gè)可以通過(guò)Redis-Multi等命令實(shí)現(xiàn)。

參考這里

部分讀寫(xiě)操作存在相關(guān)依賴(lài),無(wú)法使用pipeline實(shí)現(xiàn),可利用Script機(jī)制,但需要在可維護(hù)性方面做好取舍。

擴(kuò)展閱讀

官方文檔-Redis-Pipelining

官方文檔-Redis-Transaction

以上這篇redis通過(guò)pipeline提升吞吐量的方法就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

您可能感興趣的文章:
  • python中sklearn的pipeline模塊實(shí)例詳解
  • Redis利用Pipeline加速查詢(xún)速度的方法
  • 在Redis集群中使用pipeline批量插入的實(shí)現(xiàn)方法
  • 使用Jenkins Pipeline自動(dòng)化構(gòu)建發(fā)布Java項(xiàng)目的方法
  • python使用pipeline批量讀寫(xiě)redis的方法
  • Python:Scrapy框架中Item Pipeline組件使用詳解
  • 詳解redis大幅性能提升之使用管道(PipeLine)和批量(Batch)操作
  • 如何使用pipeline和jacoco獲取自動(dòng)化測(cè)試代碼覆蓋率

標(biāo)簽:景德鎮(zhèn) 香港 揚(yáng)州 唐山 贛州 林芝 澳門(mén) 廣東

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《redis通過(guò)pipeline提升吞吐量的方法》,本文關(guān)鍵詞  redis,通過(guò),pipeline,提升,吞吐量,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問(wèn)題,煩請(qǐng)?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無(wú)關(guān)。
  • 相關(guān)文章
  • 下面列出與本文章《redis通過(guò)pipeline提升吞吐量的方法》相關(guān)的同類(lèi)信息!
  • 本頁(yè)收集關(guān)于redis通過(guò)pipeline提升吞吐量的方法的相關(guān)信息資訊供網(wǎng)民參考!
  • 推薦文章