主頁 > 知識(shí)庫 > 解析Facebook的數(shù)據(jù)庫查詢引擎Presto在美團(tuán)的應(yīng)用

解析Facebook的數(shù)據(jù)庫查詢引擎Presto在美團(tuán)的應(yīng)用

熱門標(biāo)簽:機(jī)器人電銷哪個(gè)牌子好 浙江呼叫中心外呼系統(tǒng)多少錢 地圖標(biāo)注操作方法 清遠(yuǎn)語音外呼系統(tǒng)平臺(tái) 地圖標(biāo)注銷售好做嗎 廣西防封卡外呼系統(tǒng)原理是什么 阿里機(jī)器人電銷 地圖標(biāo)注標(biāo)記位置導(dǎo)航 電銷外呼系統(tǒng)罵人

Facebook的數(shù)據(jù)倉庫存儲(chǔ)在少量大型Hadoop/HDFS集群。Hive是Facebook在幾年前專為Hadoop打造的一款數(shù)據(jù)倉庫工具。在以前,F(xiàn)acebook的科學(xué)家和分析師一直依靠Hive來做數(shù)據(jù)分析。但Hive使用MapReduce作為底層計(jì)算框架,是專為批處理設(shè)計(jì)的。但隨著數(shù)據(jù)越來越多,使用Hive進(jìn)行一個(gè)簡單的數(shù)據(jù)查詢可能要花費(fèi)幾分到幾小時(shí),顯然不能滿足交互式查詢的需求。Facebook也調(diào)研了其他比Hive更快的工具,但它們要么在功能有所限制要么就太簡單,以至于無法操作Facebook龐大的數(shù)據(jù)倉庫。

2012年開始試用的一些外部項(xiàng)目都不合適,他們決定自己開發(fā),這就是Presto。2012年秋季開始開發(fā),目前該項(xiàng)目已經(jīng)在超過 1000名Facebook雇員中使用,運(yùn)行超過30000個(gè)查詢,每日數(shù)據(jù)在1PB級(jí)別。Facebook稱Presto的性能比Hive要好上10倍多。2013年Facebook正式宣布開源Presto。

本文首先介紹Presto從用戶提交SQL到執(zhí)行的這一個(gè)過程,然后嘗試對(duì)Presto實(shí)現(xiàn)實(shí)時(shí)查詢的原理進(jìn)行分析和總結(jié),最后介紹Presto在美團(tuán)的使用情況。

Presto架構(gòu)

Presto查詢引擎是一個(gè)Master-Slave的架構(gòu),由一個(gè)Coordinator節(jié)點(diǎn),一個(gè)Discovery Server節(jié)點(diǎn),多個(gè)Worker節(jié)點(diǎn)組成,Discovery Server通常內(nèi)嵌于Coordinator節(jié)點(diǎn)中。Coordinator負(fù)責(zé)解析SQL語句,生成執(zhí)行計(jì)劃,分發(fā)執(zhí)行任務(wù)給Worker節(jié)點(diǎn)執(zhí)行。Worker節(jié)點(diǎn)負(fù)責(zé)實(shí)際執(zhí)行查詢?nèi)蝿?wù)。Worker節(jié)點(diǎn)啟動(dòng)后向Discovery Server服務(wù)注冊(cè),Coordinator從Discovery Server獲得可以正常工作的Worker節(jié)點(diǎn)。如果配置了Hive Connector,需要配置一個(gè)Hive MetaStore服務(wù)為Presto提供Hive元信息,Worker節(jié)點(diǎn)與HDFS交互讀取數(shù)據(jù)。

Presto執(zhí)行查詢過程簡介
既然Presto是一個(gè)交互式的查詢引擎,我們最關(guān)心的就是Presto實(shí)現(xiàn)低延時(shí)查詢的原理,我認(rèn)為主要是下面幾個(gè)關(guān)鍵點(diǎn),當(dāng)然還有一些傳統(tǒng)的SQL優(yōu)化原理,這里不介紹了。

完全基于內(nèi)存的并行計(jì)算
流水線
本地化計(jì)算
動(dòng)態(tài)編譯執(zhí)行計(jì)劃
小心使用內(nèi)存和數(shù)據(jù)結(jié)構(gòu)
類BlinkDB的近似查詢
GC控制
為了介紹上述幾個(gè)要點(diǎn),這里先介紹一下Presto執(zhí)行查詢的過程

提交查詢
用戶使用Presto Cli提交一個(gè)查詢語句后,Cli使用HTTP協(xié)議與Coordinator通信,Coordinator收到查詢請(qǐng)求后調(diào)用SqlParser解析SQL語句得到Statement對(duì)象,并將Statement封裝成一個(gè)QueryStarter對(duì)象放入線程池中等待執(zhí)行。

SQL編譯過程
Presto與Hive一樣,使用Antlr編寫SQL語法,語法規(guī)則定義在Statement.g和StatementBuilder.g兩個(gè)文件中。
如下圖中所示從SQL編譯為最終的物理執(zhí)行計(jì)劃大概分為5部,最終生成在每個(gè)Worker節(jié)點(diǎn)上運(yùn)行的LocalExecutionPlan,這里不詳細(xì)介紹SQL解析為邏輯執(zhí)行計(jì)劃的過程,通過一個(gè)SQL語句來理解查詢計(jì)劃生成之后的計(jì)算過程。

樣例SQL:

復(fù)制代碼
代碼如下:
select c1.rank, count(*) from dim.city c1 join dim.city c2 on c1.id = c2.id where c1.id > 10 group by c1.rank limit 10;

上面的SQL語句生成的邏輯執(zhí)行計(jì)劃Plan如上圖所示。那么Presto是如何對(duì)上面的邏輯執(zhí)行計(jì)劃進(jìn)行拆分以較高的并行度去執(zhí)行完這個(gè)計(jì)劃呢,我們來看看物理執(zhí)行計(jì)劃。

物理執(zhí)行計(jì)劃
邏輯執(zhí)行計(jì)劃圖中的虛線就是Presto對(duì)邏輯執(zhí)行計(jì)劃的切分點(diǎn),邏輯計(jì)劃Plan生成的SubPlan分為四個(gè)部分,每一個(gè)SubPlan都會(huì)提交到一個(gè)或者多個(gè)Worker節(jié)點(diǎn)上執(zhí)行。

SubPlan有幾個(gè)重要的屬性planDistribution、outputPartitioning、partitionBy屬性。

PlanDistribution表示一個(gè)查詢Stage的分發(fā)方式,邏輯執(zhí)行計(jì)劃圖中的4個(gè)SubPlan共有3種不同的PlanDistribution方式:Source表示這個(gè)SubPlan是數(shù)據(jù)源,Source類型的任務(wù)會(huì)按照數(shù)據(jù)源大小確定分配多少個(gè)節(jié)點(diǎn)進(jìn)行執(zhí)行;Fixed表示這個(gè)SubPlan會(huì)分配固定的節(jié)點(diǎn)數(shù)進(jìn)行執(zhí)行(Config配置中的query.initial-hash-partitions參數(shù)配置,默認(rèn)是8);None表示這個(gè)SubPlan只分配到一個(gè)節(jié)點(diǎn)進(jìn)行執(zhí)行。在下面的執(zhí)行計(jì)劃中,SubPlan1和SubPlan0 PlanDistribution=Source,這兩個(gè)SubPlan都是提供數(shù)據(jù)源的節(jié)點(diǎn),SubPlan1所有節(jié)點(diǎn)的讀取數(shù)據(jù)都會(huì)發(fā)向SubPlan0的每一個(gè)節(jié)點(diǎn);SubPlan2分配8個(gè)節(jié)點(diǎn)執(zhí)行最終的聚合操作;SubPlan3只負(fù)責(zé)輸出最后計(jì)算完成的數(shù)據(jù)。
OutputPartitioning屬性只有兩個(gè)值HASH和NONE,表示這個(gè)SubPlan的輸出是否按照partitionBy的key值對(duì)數(shù)據(jù)進(jìn)行Shuffle。在下面的執(zhí)行計(jì)劃中只有SubPlan0的OutputPartitioning=HASH,所以SubPlan2接收到的數(shù)據(jù)是按照rank字段Partition后的數(shù)據(jù)。

完全基于內(nèi)存的并行計(jì)算
查詢的并行執(zhí)行流程
Presto SQL的執(zhí)行流程如下圖所示

Cli通過HTTP協(xié)議提交SQL查詢之后,查詢請(qǐng)求封裝成一個(gè)SqlQueryExecution對(duì)象交給Coordinator的SqlQueryManager#queryExecutor線程池去執(zhí)行
每個(gè)SqlQueryExecution線程(圖中Q-X線程)啟動(dòng)后對(duì)查詢請(qǐng)求的SQL進(jìn)行語法解析和優(yōu)化并最終生成多個(gè)Stage的SqlStageExecution任務(wù),每個(gè)SqlStageExecution任務(wù)仍然交給同樣的線程池去執(zhí)行
每個(gè)SqlStageExecution線程(圖中S-X線程)啟動(dòng)后每個(gè)Stage的任務(wù)按PlanDistribution屬性構(gòu)造一個(gè)或者多個(gè)RemoteTask通過HTTP協(xié)議分配給遠(yuǎn)端的Worker節(jié)點(diǎn)執(zhí)行
Worker節(jié)點(diǎn)接收到RemoteTask請(qǐng)求之后,啟動(dòng)一個(gè)SqlTaskExecution線程(圖中T-X線程)將這個(gè)任務(wù)的每個(gè)Split包裝成一個(gè)PrioritizedSplitRunner任務(wù)(圖中SR-X)交給Worker節(jié)點(diǎn)的TaskExecutor#executor線程池去執(zhí)行

上面的執(zhí)行計(jì)劃實(shí)際執(zhí)行效果如下圖所示。

Coordinator通過HTTP協(xié)議調(diào)用Worker節(jié)點(diǎn)的 /v1/task 接口將執(zhí)行計(jì)劃分配給所有Worker節(jié)點(diǎn)(圖中藍(lán)色箭頭)
SubPlan1的每個(gè)節(jié)點(diǎn)讀取一個(gè)Split的數(shù)據(jù)并過濾后將數(shù)據(jù)分發(fā)給每個(gè)SubPlan0節(jié)點(diǎn)進(jìn)行Join操作和Partial Aggr操作
SubPlan1的每個(gè)節(jié)點(diǎn)計(jì)算完成后按GroupBy Key的Hash值將數(shù)據(jù)分發(fā)到不同的SubPlan2節(jié)點(diǎn)
所有SubPlan2節(jié)點(diǎn)計(jì)算完成后將數(shù)據(jù)分發(fā)到SubPlan3節(jié)點(diǎn)
SubPlan3節(jié)點(diǎn)計(jì)算完成后通知Coordinator結(jié)束查詢,并將數(shù)據(jù)發(fā)送給Coordinator

源數(shù)據(jù)的并行讀取
在上面的執(zhí)行計(jì)劃中SubPlan1和SubPlan0都是Source節(jié)點(diǎn),其實(shí)它們讀取HDFS文件數(shù)據(jù)的方式就是調(diào)用的HDFS InputSplit API,然后每個(gè)InputSplit分配一個(gè)Worker節(jié)點(diǎn)去執(zhí)行,每個(gè)Worker節(jié)點(diǎn)分配的InputSplit數(shù)目上限是參數(shù)可配置的,Config中的query.max-pending-splits-per-node參數(shù)配置,默認(rèn)是100。

分布式的Hash聚合
上面的執(zhí)行計(jì)劃在SubPlan0中會(huì)進(jìn)行一次Partial的聚合計(jì)算,計(jì)算每個(gè)Worker節(jié)點(diǎn)讀取的部分?jǐn)?shù)據(jù)的部分聚合結(jié)果,然后SubPlan0的輸出會(huì)按照group by字段的Hash值分配不同的計(jì)算節(jié)點(diǎn),最后SubPlan3合并所有結(jié)果并輸出

流水線
數(shù)據(jù)模型
Presto中處理的最小數(shù)據(jù)單元是一個(gè)Page對(duì)象,Page對(duì)象的數(shù)據(jù)結(jié)構(gòu)如下圖所示。一個(gè)Page對(duì)象包含多個(gè)Block對(duì)象,每個(gè)Block對(duì)象是一個(gè)字節(jié)數(shù)組,存儲(chǔ)一個(gè)字段的若干行。多個(gè)Block橫切的一行是真實(shí)的一行數(shù)據(jù)。一個(gè)Page最大1MB,最多16*1024行數(shù)據(jù)。

節(jié)點(diǎn)內(nèi)部流水線計(jì)算
下圖是一個(gè)Worker節(jié)點(diǎn)內(nèi)部的計(jì)算流程圖,左側(cè)是任務(wù)的執(zhí)行流程圖。

Worker節(jié)點(diǎn)將最細(xì)粒度的任務(wù)封裝成一個(gè)PrioritizedSplitRunner對(duì)象,放入pending split優(yōu)先級(jí)隊(duì)列中。每個(gè)

Worker節(jié)點(diǎn)啟動(dòng)一定數(shù)目的線程進(jìn)行計(jì)算,線程數(shù)task.shard.max-threads=availableProcessors() * 4,在config中配置。

每個(gè)空閑的線程從隊(duì)列中取出一個(gè)PrioritizedSplitRunner對(duì)象執(zhí)行,如果執(zhí)行完成一個(gè)周期,超過最大執(zhí)行時(shí)間1秒鐘,判斷任務(wù)是否執(zhí)行完成,如果完成,從allSplits隊(duì)列中刪除,如果沒有,則放回pendingSplits隊(duì)列中。

每個(gè)任務(wù)的執(zhí)行流程如下圖右側(cè),依次遍歷所有Operator,嘗試從上一個(gè)Operator取一個(gè)Page對(duì)象,如果取得的Page不為空,交給下一個(gè)Operator執(zhí)行。

節(jié)點(diǎn)間流水線計(jì)算
下圖是ExchangeOperator的執(zhí)行流程圖,ExchangeOperator為每一個(gè)Split啟動(dòng)一個(gè)HttpPageBufferClient對(duì)象,主動(dòng)向上一個(gè)Stage的Worker節(jié)點(diǎn)拉數(shù)據(jù),數(shù)據(jù)的最小單位也是一個(gè)Page對(duì)象,取到數(shù)據(jù)后放入Pages隊(duì)列中

本地化計(jì)算
Presto在選擇Source任務(wù)計(jì)算節(jié)點(diǎn)的時(shí)候,對(duì)于每一個(gè)Split,按下面的策略選擇一些minCandidates

優(yōu)先選擇與Split同一個(gè)Host的Worker節(jié)點(diǎn)
如果節(jié)點(diǎn)不夠優(yōu)先選擇與Split同一個(gè)Rack的Worker節(jié)點(diǎn)
如果節(jié)點(diǎn)還不夠隨機(jī)選擇其他Rack的節(jié)點(diǎn)
對(duì)于所有Candidate節(jié)點(diǎn),選擇assignedSplits最少的節(jié)點(diǎn)。

動(dòng)態(tài)編譯執(zhí)行計(jì)劃
Presto會(huì)將執(zhí)行計(jì)劃中的ScanFilterAndProjectOperator和FilterAndProjectOperator動(dòng)態(tài)編譯為Byte Code,并交給JIT去編譯為native代碼。Presto也使用了Google Guava提供的LoadingCache緩存生成的Byte Code。

上面的兩段代碼片段中,第一段為沒有動(dòng)態(tài)編譯前的代碼,第二段代碼為動(dòng)態(tài)編譯生成的Byte Code反編譯之后還原的優(yōu)化代
碼,我們看到這里采用了循環(huán)展開的優(yōu)化方法。

循環(huán)展開最常用來降低循環(huán)開銷,為具有多個(gè)功能單元的處理器提供指令級(jí)并行。也有利于指令流水線的調(diào)度。

小心使用內(nèi)存和數(shù)據(jù)結(jié)構(gòu)
使用Slice進(jìn)行內(nèi)存操作,Slice使用Unsafe#copyMemory實(shí)現(xiàn)了高效的內(nèi)存拷貝,Slice倉庫參考:https://github.com/airlift/slice

Facebook工程師在另一篇介紹ORCFile優(yōu)化的文章中也提到使用Slice將ORCFile的寫性能提高了20%~30%,參考:https://code.facebook.com/posts/229861827208629/scaling-the-facebook-data-warehouse-to-300-pb/

類BlinkDB的近似查詢
為了加快avg、count distinct、percentile等聚合函數(shù)的查詢速度,Presto團(tuán)隊(duì)與BlinkDB作者之一Sameer Agarwal合作引入了一些近似查詢函數(shù)approx_avg、approx_distinct、approx_percentile。approx_distinct使用HyperLogLog Counting算法實(shí)現(xiàn)。

GC控制
Presto團(tuán)隊(duì)在使用hotspot java7時(shí)發(fā)現(xiàn)了一個(gè)JIT的BUG,當(dāng)代碼緩存快要達(dá)到上限時(shí),JIT可能會(huì)停止工作,從而無法將使用頻率高的代碼動(dòng)態(tài)編譯為native代碼。

Presto團(tuán)隊(duì)使用了一個(gè)比較Hack的方法去解決這個(gè)問題,增加一個(gè)線程在代碼緩存達(dá)到70%以上時(shí)進(jìn)行顯式GC,使得已經(jīng)加載的Class從perm中移除,避免JIT無法正常工作的BUG。

美團(tuán)如何使用Presto
選擇presto的原因
2013年我們也用過一段時(shí)間的impala,當(dāng)時(shí)impala不支持線上1.x的hadoop社區(qū)版,所以搭了一個(gè)CDH的小集群,每天將大集群的熱點(diǎn)數(shù)據(jù)導(dǎo)入小集群。但是hadoop集群年前完成升級(jí)2.2之后,當(dāng)時(shí)的impala還不支持2.2 hadoop版本。而Presto剛好開始支持2.x hadoop社區(qū)版,并且Presto在Facebook 300PB大數(shù)據(jù)量的環(huán)境下可以成功的得到大量使用,我們相信它在美團(tuán)也可以很好的支撐我們實(shí)時(shí)分析的需求,于是決定先上線測試使用一段時(shí)間。

部署和使用形式
考慮到兩個(gè)原因:1、由于Hadoop集群主要是夜間完成昨天的計(jì)算任務(wù),白天除了日志寫入外,集群的計(jì)算負(fù)載較低。2、Presto Worker節(jié)點(diǎn)與DataNode節(jié)點(diǎn)布置在一臺(tái)機(jī)器上可以本地計(jì)算。因此我們將Presto部署到了所有的DataNode機(jī)器上,并且夜間停止Presto服務(wù),避免占用集群資源,夜間基本也不會(huì)有用戶查詢數(shù)據(jù)。

Presto二次開發(fā)和BUG修復(fù)
年后才正式上線Presto查詢引擎,0.60版本,使用的時(shí)間不長,但是也遇到了一些問題:

美團(tuán)的Hadoop使用的是2.2版本,并且開啟了Security模式,但是Presto不支持Kerberos認(rèn)證,我們修改了Presto代碼,增加了Kerberos認(rèn)證的功能。
Presto還不支持SQL的隱式類型轉(zhuǎn)換,而Hive支持,很多自助查詢的用戶習(xí)慣了Hive,導(dǎo)致使用Presto時(shí)都會(huì)出現(xiàn)表達(dá)式中左右變量類型不匹配的問題,我們?cè)黾恿穗[式類型轉(zhuǎn)換的功能,大大減小了用戶SQL出錯(cuò)的概率。
Presto不支持查詢lzo壓縮的數(shù)據(jù),需要修改hadoop-lzo的代碼。
解決了一個(gè)having子句中有distinct字段時(shí)查詢失敗的BUG,并反饋了Presto團(tuán)隊(duì) https://github.com/facebook/presto/pull/1104
所有代碼的修改可以參考我們?cè)趃ithub上的倉庫 https://github.com/MTDATA/presto/commits/mt-0.60

實(shí)際使用效果
這里給出一個(gè)公司內(nèi)部開放給分析師、PM、工程師進(jìn)行自助查詢的查詢中心的一個(gè)測試報(bào)告。這里選取了平時(shí)的5000個(gè)Hive查詢,通過Presto查詢的對(duì)比見下面的表格。

標(biāo)簽:伊春 廊坊 臺(tái)灣 沈陽 江蘇 包頭 雅安 德宏

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《解析Facebook的數(shù)據(jù)庫查詢引擎Presto在美團(tuán)的應(yīng)用》,本文關(guān)鍵詞  解析,Facebook,的,數(shù)據(jù)庫,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請(qǐng)?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。
  • 相關(guān)文章
  • 下面列出與本文章《解析Facebook的數(shù)據(jù)庫查詢引擎Presto在美團(tuán)的應(yīng)用》相關(guān)的同類信息!
  • 本頁收集關(guān)于解析Facebook的數(shù)據(jù)庫查詢引擎Presto在美團(tuán)的應(yīng)用的相關(guān)信息資訊供網(wǎng)民參考!
  • 推薦文章