主頁 > 知識庫 > 深入剖析美團(tuán)基于Flume的網(wǎng)站日志收集系統(tǒng)

深入剖析美團(tuán)基于Flume的網(wǎng)站日志收集系統(tǒng)

熱門標(biāo)簽:廣西防封卡外呼系統(tǒng)原理是什么 地圖標(biāo)注操作方法 阿里機(jī)器人電銷 電銷外呼系統(tǒng)罵人 浙江呼叫中心外呼系統(tǒng)多少錢 地圖標(biāo)注銷售好做嗎 機(jī)器人電銷哪個牌子好 清遠(yuǎn)語音外呼系統(tǒng)平臺 地圖標(biāo)注標(biāo)記位置導(dǎo)航

美團(tuán)的日志收集系統(tǒng)負(fù)責(zé)美團(tuán)的所有業(yè)務(wù)日志的收集,并分別給Hadoop平臺提供離線數(shù)據(jù)和Storm平臺提供實(shí)時數(shù)據(jù)流。美團(tuán)的日志收集系統(tǒng)基于Flume設(shè)計(jì)和搭建而成。

《基于Flume的美團(tuán)日志收集系統(tǒng)》將分兩部分給讀者呈現(xiàn)美團(tuán)日志收集系統(tǒng)的架構(gòu)設(shè)計(jì)和實(shí)戰(zhàn)經(jīng)驗(yàn)。

第一部分架構(gòu)和設(shè)計(jì),將主要著眼于日志收集系統(tǒng)整體的架構(gòu)設(shè)計(jì),以及為什么要做這樣的設(shè)計(jì)。

第二部分改進(jìn)和優(yōu)化,將主要著眼于實(shí)際部署和使用過程中遇到的問題,對Flume做的功能修改和優(yōu)化等。

1 日志收集系統(tǒng)簡介
日志收集是大數(shù)據(jù)的基石。

許多公司的業(yè)務(wù)平臺每天都會產(chǎn)生大量的日志數(shù)據(jù)。收集業(yè)務(wù)日志數(shù)據(jù),供離線和在線的分析系統(tǒng)使用,正是日志收集系統(tǒng)的要做的事情。高可用性,高可靠性和可擴(kuò)展性是日志收集系統(tǒng)所具有的基本特征。

目前常用的開源日志收集系統(tǒng)有Flume, Scribe等。Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸?shù)南到y(tǒng),目前已經(jīng)是Apache的一個子項(xiàng)目。Scribe是Facebook開源的日志收集系統(tǒng),它為日志的分布式收集,統(tǒng)一處理提供一個可擴(kuò)展的,高容錯的簡單方案。

2 常用的開源日志收集系統(tǒng)對比
下面將對常見的開源日志收集系統(tǒng)Flume和Scribe的各方面進(jìn)行對比。對比中Flume將主要采用Apache下的Flume-NG為參考對象。同時,美團(tuán)將常用的日志收集系統(tǒng)分為三層(Agent層,Collector層和Store層)來進(jìn)行對比。

3 美團(tuán)日志收集系統(tǒng)架構(gòu)
美團(tuán)的日志收集系統(tǒng)負(fù)責(zé)美團(tuán)的所有業(yè)務(wù)日志的收集,并分別給Hadoop平臺提供離線數(shù)據(jù)和Storm平臺提供實(shí)時數(shù)據(jù)流。美團(tuán)的日志收集系統(tǒng)基于Flume設(shè)計(jì)和搭建而成。目前每天收集和處理約T級別的日志數(shù)據(jù)。

下圖是美團(tuán)的日志收集系統(tǒng)的整體框架圖。

a. 整個系統(tǒng)分為三層:Agent層,Collector層和Store層。其中Agent層每個機(jī)器部署一個進(jìn)程,負(fù)責(zé)對單機(jī)的日志收集工作;Collector層部署在中心服務(wù)器上,負(fù)責(zé)接收Agent層發(fā)送的日志,并且將日志根據(jù)路由規(guī)則寫到相應(yīng)的Store層中;Store層負(fù)責(zé)提供永久或者臨時的日志存儲服務(wù),或者將日志流導(dǎo)向其它服務(wù)器。

b. Agent到Collector使用LoadBalance策略,將所有的日志均衡地發(fā)到所有的Collector上,達(dá)到負(fù)載均衡的目標(biāo),同時并處理單個Collector失效的問題。

c. Collector層的目標(biāo)主要有三個:SinkHdfs, SinkKafka和SinkBypass。分別提供離線的數(shù)據(jù)到Hdfs,和提供實(shí)時的日志流到Kafka和Bypass。其中SinkHdfs又根據(jù)日志量的大小分為SinkHdfs_b,SinkHdfs_m和SinkHdfs_s三個Sink,以提高寫入到Hdfs的性能,具體見后面介紹。

d. 對于Store來說,Hdfs負(fù)責(zé)永久地存儲所有日志;Kafka存儲最新的7天日志,并給Storm系統(tǒng)提供實(shí)時日志流;Bypass負(fù)責(zé)給其它服務(wù)器和應(yīng)用提供實(shí)時日志流。

下圖是美團(tuán)的日志收集系統(tǒng)的模塊分解圖,詳解Agent, Collector和Bypass中的Source, Channel和Sink的關(guān)系。

a. 模塊命名規(guī)則:所有的Source以src開頭,所有的Channel以ch開頭,所有的Sink以sink開頭;

b. Channel統(tǒng)一使用美團(tuán)開發(fā)的DualChannel,具體原因后面詳述;對于過濾掉的日志使用NullChannel,具體原因后面詳述;

c. 模塊之間內(nèi)部通信統(tǒng)一使用Avro接口;

4 架構(gòu)設(shè)計(jì)考慮
下面將從可用性,可靠性,可擴(kuò)展性和兼容性等方面,對上述的架構(gòu)做細(xì)致的解析。

4.1 可用性(availablity)
對日志收集系統(tǒng)來說,可用性(availablity)指固定周期內(nèi)系統(tǒng)無故障運(yùn)行總時間。要想提高系統(tǒng)的可用性,就需要消除系統(tǒng)的單點(diǎn),提高系統(tǒng)的冗余度。下面來看看美團(tuán)的日志收集系統(tǒng)在可用性方面的考慮。

4.1.1 Agent死掉
Agent死掉分為兩種情況:機(jī)器死機(jī)或者Agent進(jìn)程死掉。

對于機(jī)器死機(jī)的情況來說,由于產(chǎn)生日志的進(jìn)程也同樣會死掉,所以不會再產(chǎn)生新的日志,不存在不提供服務(wù)的情況。

對于Agent進(jìn)程死掉的情況來說,確實(shí)會降低系統(tǒng)的可用性。對此,美團(tuán)有下面三種方式來提高系統(tǒng)的可用性。首先,所有的Agent在supervise的方式下啟動,如果進(jìn)程死掉會被系統(tǒng)立即重啟,以提供服務(wù)。其次,對所有的Agent進(jìn)行存活監(jiān)控,發(fā)現(xiàn)Agent死掉立即報警。最后,對于非常重要的日志,建議應(yīng)用直接將日志寫磁盤,Agent使用spooldir的方式獲得最新的日志。

4.1.2 Collector死掉
由于中心服務(wù)器提供的是對等的且無差別的服務(wù),且Agent訪問Collector做了LoadBalance和重試機(jī)制。所以當(dāng)某個Collector無法提供服務(wù)時,Agent的重試策略會將數(shù)據(jù)發(fā)送到其它可用的Collector上面。所以整個服務(wù)不受影響。

4.1.3 Hdfs正常停機(jī)
美團(tuán)在Collector的HdfsSink中提供了開關(guān)選項(xiàng),可以控制Collector停止寫Hdfs,并且將所有的events緩存到FileChannel的功能。

4.1.4 Hdfs異常停機(jī)或不可訪問
假如Hdfs異常停機(jī)或不可訪問,此時Collector無法寫Hdfs。由于美團(tuán)使用DualChannel,Collector可以將所收到的events緩存到FileChannel,保存在磁盤上,繼續(xù)提供服務(wù)。當(dāng)Hdfs恢復(fù)服務(wù)以后,再將FileChannel中緩存的events再發(fā)送到Hdfs上。這種機(jī)制類似于Scribe,可以提供較好的容錯性。

4.1.5 Collector變慢或者Agent/Collector網(wǎng)絡(luò)變慢
如果Collector處理速度變慢(比如機(jī)器load過高)或者Agent/Collector之間的網(wǎng)絡(luò)變慢,可能導(dǎo)致Agent發(fā)送到Collector的速度變慢。同樣的,對于此種情況,美團(tuán)在Agent端使用DualChannel,Agent可以將收到的events緩存到FileChannel,保存在磁盤上,繼續(xù)提供服務(wù)。當(dāng)Collector恢復(fù)服務(wù)以后,再將FileChannel中緩存的events再發(fā)送給Collector。

4.1.6 Hdfs變慢
當(dāng)Hadoop上的任務(wù)較多且有大量的讀寫操作時,Hdfs的讀寫數(shù)據(jù)往往變的很慢。由于每天,每周都有高峰使用期,所以這種情況非常普遍。

對于Hdfs變慢的問題,美團(tuán)同樣使用DualChannel來解決。當(dāng)Hdfs寫入較快時,所有的events只經(jīng)過MemChannel傳遞數(shù)據(jù),減少磁盤IO,獲得較高性能。當(dāng)Hdfs寫入較慢時,所有的events只經(jīng)過FileChannel傳遞數(shù)據(jù),有一個較大的數(shù)據(jù)緩存空間。

4.2 可靠性(reliability)
對日志收集系統(tǒng)來說,可靠性(reliability)是指Flume在數(shù)據(jù)流的傳輸過程中,保證events的可靠傳遞。

對Flume來說,所有的events都被保存在Agent的Channel中,然后被發(fā)送到數(shù)據(jù)流中的下一個Agent或者最終的存儲服務(wù)中。那么一個Agent的Channel中的events什么時候被刪除呢?當(dāng)且僅當(dāng)它們被保存到下一個Agent的Channel中或者被保存到最終的存儲服務(wù)中。這就是Flume提供數(shù)據(jù)流中點(diǎn)到點(diǎn)的可靠性保證的最基本的單跳消息傳遞語義。

那么Flume是如何做到上述最基本的消息傳遞語義呢?

首先,Agent間的事務(wù)交換。Flume使用事務(wù)的辦法來保證event的可靠傳遞。Source和Sink分別被封裝在事務(wù)中,這些事務(wù)由保存event的存儲提供或者由Channel提供。這就保證了event在數(shù)據(jù)流的點(diǎn)對點(diǎn)傳輸中是可靠的。在多級數(shù)據(jù)流中,如下圖,上一級的Sink和下一級的Source都被包含在事務(wù)中,保證數(shù)據(jù)可靠地從一個Channel到另一個Channel轉(zhuǎn)移。

其次,數(shù)據(jù)流中 Channel的持久性。Flume中MemoryChannel是可能丟失數(shù)據(jù)的(當(dāng)Agent死掉時),而FileChannel是持久性的,提供類似mysql的日志機(jī)制,保證數(shù)據(jù)不丟失。

4.3 可擴(kuò)展性(scalability)
對日志收集系統(tǒng)來說,可擴(kuò)展性(scalability)是指系統(tǒng)能夠線性擴(kuò)展。當(dāng)日志量增大時,系統(tǒng)能夠以簡單的增加機(jī)器來達(dá)到線性擴(kuò)容的目的。

對于基于Flume的日志收集系統(tǒng)來說,需要在設(shè)計(jì)的每一層,都可以做到線性擴(kuò)展地提供服務(wù)。下面將對每一層的可擴(kuò)展性做相應(yīng)的說明。

4.3.1 Agent層
對于Agent這一層來說,每個機(jī)器部署一個Agent,可以水平擴(kuò)展,不受限制。一個方面,Agent收集日志的能力受限于機(jī)器的性能,正常情況下一個Agent可以為單機(jī)提供足夠服務(wù)。另一方面,如果機(jī)器比較多,可能受限于后端Collector提供的服務(wù),但Agent到Collector是有Load Balance機(jī)制,使得Collector可以線性擴(kuò)展提高能力。

4.3.2 Collector層
對于Collector這一層,Agent到Collector是有Load Balance機(jī)制,并且Collector提供無差別服務(wù),所以可以線性擴(kuò)展。其性能主要受限于Store層提供的能力。

4.3.3 Store層
對于Store這一層來說,Hdfs和Kafka都是分布式系統(tǒng),可以做到線性擴(kuò)展。Bypass屬于臨時的應(yīng)用,只對應(yīng)于某一類日志,性能不是瓶頸。

4.4 Channel的選擇
Flume1.4.0中,其官方提供常用的MemoryChannel和FileChannel供大家選擇。其優(yōu)劣如下:

MemoryChannel: 所有的events被保存在內(nèi)存中。優(yōu)點(diǎn)是高吞吐。缺點(diǎn)是容量有限并且Agent死掉時會丟失內(nèi)存中的數(shù)據(jù)。
FileChannel: 所有的events被保存在文件中。優(yōu)點(diǎn)是容量較大且死掉時數(shù)據(jù)可恢復(fù)。缺點(diǎn)是速度較慢。
上述兩種Channel,優(yōu)缺點(diǎn)相反,分別有自己適合的場景。然而,對于大部分應(yīng)用來說,美團(tuán)希望Channel可以同提供高吞吐和大緩存?;诖耍缊F(tuán)開發(fā)了DualChannel。

DualChannel:基于 MemoryChannel和 FileChannel開發(fā)。當(dāng)堆積在Channel中的events數(shù)小于閾值時,所有的events被保存在MemoryChannel中,Sink從MemoryChannel中讀取數(shù)據(jù); 當(dāng)堆積在Channel中的events數(shù)大于閾值時, 所有的events被自動存放在FileChannel中,Sink從FileChannel中讀取數(shù)據(jù)。這樣當(dāng)系統(tǒng)正常運(yùn)行時,美團(tuán)可以使用MemoryChannel的高吞吐特性;當(dāng)系統(tǒng)有異常時,美團(tuán)可以利用FileChannel的大緩存的特性。
4.5 和scribe兼容
在設(shè)計(jì)之初,美團(tuán)就要求每類日志都有一個category相對應(yīng),并且Flume的Agent提供AvroSource和ScribeSource兩種服務(wù)。這將保持和之前的Scribe相對應(yīng),減少業(yè)務(wù)的更改成本。

4.6 權(quán)限控制
在目前的日志收集系統(tǒng)中,美團(tuán)只使用最簡單的權(quán)限控制。只有設(shè)定的category才可以進(jìn)入到存儲系統(tǒng)。所以目前的權(quán)限控制就是category過濾。

如果權(quán)限控制放在Agent端,優(yōu)勢是可以較好地控制垃圾數(shù)據(jù)在系統(tǒng)中流轉(zhuǎn)。但劣勢是配置修改麻煩,每增加一個日志就需要重啟或者重載Agent的配置。

如果權(quán)限控制放在Collector端,優(yōu)勢是方便進(jìn)行配置的修改和加載。劣勢是部分沒有注冊的數(shù)據(jù)可能在Agent/Collector之間傳輸。

考慮到Agent/Collector之間的日志傳輸并非系統(tǒng)瓶頸,且目前日志收集屬內(nèi)部系統(tǒng),安全問題屬于次要問題,所以選擇采用Collector端控制。

4.7 提供實(shí)時流
美團(tuán)的部分業(yè)務(wù),如實(shí)時推薦,反爬蟲服務(wù)等服務(wù),需要處理實(shí)時的數(shù)據(jù)流。因此美團(tuán)希望Flume能夠?qū)С鲆环輰?shí)時流給Kafka/Storm系統(tǒng)。

一個非常重要的要求是實(shí)時數(shù)據(jù)流不應(yīng)該受到其它Sink的速度影響,保證實(shí)時數(shù)據(jù)流的速度。這一點(diǎn),美團(tuán)是通過Collector中設(shè)置不同的Channel進(jìn)行隔離,并且DualChannel的大容量保證了日志的處理不受Sink的影響。

5 系統(tǒng)監(jiān)控
對于一個大型復(fù)雜系統(tǒng)來說,監(jiān)控是必不可少的部分。設(shè)計(jì)合理的監(jiān)控,可以對異常情況及時發(fā)現(xiàn),只要有一部手機(jī),就可以知道系統(tǒng)是否正常運(yùn)作。對于美團(tuán)的日志收集系統(tǒng),美團(tuán)建立了多維度的監(jiān)控,防止未知的異常發(fā)生。

5.1 發(fā)送速度,擁堵情況,寫Hdfs速度
通過發(fā)送給zabbix的數(shù)據(jù),美團(tuán)可以繪制出發(fā)送數(shù)量、擁堵情況和寫Hdfs速度的圖表,對于超預(yù)期的擁堵,美團(tuán)會報警出來查找原因。

下面是Flume Collector HdfsSink寫數(shù)據(jù)到Hdfs的速度截圖:

下面是Flume Collector的FileChannel中擁堵的events數(shù)據(jù)量截圖:

5.2 flume寫hfds狀態(tài)的監(jiān)控
Flume寫入Hdfs會先生成tmp文件,對于特別重要的日志,美團(tuán)會每15分鐘左右檢查一下各個Collector是否都產(chǎn)生了tmp文件,對于沒有正常產(chǎn)生tmp文件的Collector和日志美團(tuán)需要檢查是否有異常。這樣可以及時發(fā)現(xiàn)Flume和日志的異常.

5.3 日志大小異常監(jiān)控
對于重要的日志,美團(tuán)會每個小時都監(jiān)控日志大小周同比是否有較大波動,并給予提醒,這個報警有效的發(fā)現(xiàn)了異常的日志,且多次發(fā)現(xiàn)了應(yīng)用方日志發(fā)送的異常,及時給予了對方反饋,幫助他們及早修復(fù)自身系統(tǒng)的異常。

通過上述的講解,美團(tuán)可以看到,基于Flume的美團(tuán)日志收集系統(tǒng)已經(jīng)是具備高可用性,高可靠性,可擴(kuò)展等特性的分布式服務(wù)。

改進(jìn)和優(yōu)化
下面,美團(tuán)將會講述在實(shí)際部署和使用過程中遇到的問題,對Flume的功能改進(jìn)和對系統(tǒng)做的優(yōu)化。

1 Flume的問題總結(jié)
在Flume的使用過程中,遇到的主要問題如下:

a. Channel“水土不服”:使用固定大小的MemoryChannel在日志高峰時常報隊(duì)列大小不夠的異常;使用FileChannel又導(dǎo)致IO繁忙的問題;

b. HdfsSink的性能問題:使用HdfsSink向Hdfs寫日志,在高峰時間速度較慢;

c. 系統(tǒng)的管理問題:配置升級,模塊重啟等;

2 Flume的功能改進(jìn)和優(yōu)化點(diǎn)
從上面的問題中可以看到,有一些需求是原生Flume無法滿足的,因此,基于開源的Flume美團(tuán)增加了許多功能,修改了一些Bug,并且進(jìn)行一些調(diào)優(yōu)。下面將對一些主要的方面做一些說明。

2.1 增加Zabbix monitor服務(wù)
一方面,F(xiàn)lume本身提供了http, ganglia的監(jiān)控服務(wù),而美團(tuán)目前主要使用zabbix做監(jiān)控。因此,美團(tuán)為Flume添加了zabbix監(jiān)控模塊,和sa的監(jiān)控服務(wù)無縫融合。

另一方面,凈化Flume的metrics。只將美團(tuán)需要的metrics發(fā)送給zabbix,避免 zabbix server造成壓力。目前美團(tuán)最為關(guān)心的是Flume能否及時把應(yīng)用端發(fā)送過來的日志寫到Hdfs上, 對應(yīng)關(guān)注的metrics為:

Source : 接收的event數(shù)和處理的event數(shù)
Channel : Channel中擁堵的event數(shù)
Sink : 已經(jīng)處理的event數(shù)


2.2 為HdfsSink增加自動創(chuàng)建index功能
首先,美團(tuán)的HdfsSink寫到hadoop的文件采用lzo壓縮存儲。 HdfsSink可以讀取hadoop配置文件中提供的編碼類列表,然后通過配置的方式獲取使用何種壓縮編碼,美團(tuán)目前使用lzo壓縮數(shù)據(jù)。采用lzo壓縮而非bz2壓縮,是基于以下測試數(shù)據(jù):

其次,美團(tuán)的HdfsSink增加了創(chuàng)建lzo文件后自動創(chuàng)建index功能。Hadoop提供了對lzo創(chuàng)建索引,使得壓縮文件是可切分的,這樣Hadoop Job可以并行處理數(shù)據(jù)文件。HdfsSink本身lzo壓縮,但寫完lzo文件并不會建索引,美團(tuán)在close文件之后添加了建索引功能。

Java Code復(fù)制內(nèi)容到剪貼板
  1.   /**  
  2.    * Rename bucketPath file from .tmp to permanent location.  
  3.    */  
  4.   private void renameBucket() throws IOException, InterruptedException {   
  5.       if(bucketPath.equals(targetPath)) {   
  6.               return;   
  7.         }   
  8.   
  9.         final Path srcPath = new Path(bucketPath);   
  10.         final Path dstPath = new Path(targetPath);   
  11.   
  12.         callWithTimeout(new CallRunnerObject>() {   
  13.               @Override  
  14.               public Object call() throws Exception {   
  15.                 if(fileSystem.exists(srcPath)) { // could block   
  16.                       LOG.info("Renaming " + srcPath + " to " + dstPath);   
  17.                      fileSystem.rename(srcPath, dstPath); // could block   
  18.   
  19.                       //index the dstPath lzo file   
  20.                       if (codeC != null  ".lzo".equals(codeC.getDefaultExtension()) ) {   
  21.                               LzoIndexer lzoIndexer = new LzoIndexer(new Configuration());   
  22.                               lzoIndexer.index(dstPath);   
  23.                       }   
  24.                 }   
  25.                 return null;   
  26.               }   
  27.     });   
  28. }  


2.3 增加HdfsSink的開關(guān)
美團(tuán)在HdfsSink和DualChannel中增加開關(guān),當(dāng)開關(guān)打開的情況下,HdfsSink不再往Hdfs上寫數(shù)據(jù),并且數(shù)據(jù)只寫向DualChannel中的FileChannel。以此策略來防止Hdfs的正常停機(jī)維護(hù)。

2.4 增加DualChannel
Flume本身提供了MemoryChannel和FileChannel。MemoryChannel處理速度快,但緩存大小有限,且沒有持久化;FileChannel則剛好相反。美團(tuán)希望利用兩者的優(yōu)勢,在Sink處理速度夠快,Channel沒有緩存過多日志的時候,就使用MemoryChannel,當(dāng)Sink處理速度跟不上,又需要Channel能夠緩存下應(yīng)用端發(fā)送過來的日志時,就使用FileChannel,由此美團(tuán)開發(fā)了DualChannel,能夠智能的在兩個Channel之間切換。

其具體的邏輯如下:

Java Code復(fù)制內(nèi)容到剪貼板
  1. /***  
  2.  * putToMemChannel indicate put event to memChannel or fileChannel  
  3.  * takeFromMemChannel indicate take event from memChannel or fileChannel  
  4.  * */  
  5. private AtomicBoolean putToMemChannel = new AtomicBoolean(true);   
  6. private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);   
  7.   
  8. void doPut(Event event) {   
  9.         if (switchon  putToMemChannel.get()) {   
  10.               //往memChannel中寫數(shù)據(jù)   
  11.               memTransaction.put(event);   
  12.   
  13.               if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {   
  14.                 putToMemChannel.set(false);   
  15.               }   
  16.         } else {   
  17.               //往fileChannel中寫數(shù)據(jù)   
  18.               fileTransaction.put(event);   
  19.         }   
  20.   }   
  21.   
  22. Event doTake() {   
  23.     Event event = null;   
  24.     if ( takeFromMemChannel.get() ) {   
  25.         //從memChannel中取數(shù)據(jù)   
  26.         event = memTransaction.take();   
  27.         if (event == null) {   
  28.             takeFromMemChannel.set(false);   
  29.         }    
  30.     } else {   
  31.         //從fileChannel中取數(shù)據(jù)   
  32.         event = fileTransaction.take();   
  33.         if (event == null) {   
  34.             takeFromMemChannel.set(true);   
  35.   
  36.             putToMemChannel.set(true);   
  37.         }    
  38.     }   
  39.     return event;   
  40. }  


2.5 增加NullChannel
Flume提供了NullSink,可以把不需要的日志通過NullSink直接丟棄,不進(jìn)行存儲。然而,Source需要先將events存放到Channel中,NullSink再將events取出扔掉。為了提升性能,美團(tuán)把這一步移到了Channel里面做,所以開發(fā)了NullChannel。

2.6 增加KafkaSink
為支持向Storm提供實(shí)時數(shù)據(jù)流,美團(tuán)增加了KafkaSink用來向Kafka寫實(shí)時數(shù)據(jù)流。其基本的邏輯如下:

Java Code復(fù)制內(nèi)容到剪貼板
  1. public class KafkaSink extends AbstractSink implements Configurable {   
  2.         private String zkConnect;   
  3.         private Integer zkTimeout;   
  4.         private Integer batchSize;   
  5.         private Integer queueSize;   
  6.         private String serializerClass;   
  7.         private String producerType;   
  8.         private String topicPrefix;   
  9.   
  10.         private ProducerString, String> producer;   
  11.   
  12.         public void configure(Context context) {   
  13.             //讀取配置,并檢查配置   
  14.         }   
  15.   
  16.         @Override  
  17.         public synchronized void start() {   
  18.             //初始化producer   
  19.         }   
  20.   
  21.         @Override  
  22.         public synchronized void stop() {   
  23.             //關(guān)閉producer   
  24.         }   
  25.   
  26.         @Override  
  27.         public Status process() throws EventDeliveryException {   
  28.   
  29.             Status status = Status.READY;   
  30.   
  31.             Channel channel = getChannel();   
  32.             Transaction tx = channel.getTransaction();   
  33.             try {   
  34.                     tx.begin();   
  35.   
  36.                     //將日志按category分隊(duì)列存放   
  37.                     MapString, ListString>> topic2EventList = new HashMapString, ListString>>();   
  38.   
  39.                     //從channel中取batchSize大小的日志,從header中獲取category,生成topic,并存放于上述的Map中;   
  40.   
  41.                     //將Map中的數(shù)據(jù)通過producer發(fā)送給kafka    
  42.   
  43.                    tx.commit();   
  44.             } catch (Exception e) {   
  45.                     tx.rollback();   
  46.                     throw new EventDeliveryException(e);   
  47.             } finally {   
  48.                 tx.close();   
  49.             }   
  50.             return status;   
  51.         }   
  52. }  


2.7 修復(fù)和scribe的兼容問題
Scribed在通過ScribeSource發(fā)送數(shù)據(jù)包給Flume時,大于4096字節(jié)的包,會先發(fā)送一個Dummy包檢查服務(wù)器的反應(yīng),而Flume的ScribeSource對于logentry.size()=0的包返回TRY_LATER,此時Scribed就認(rèn)為出錯,斷開連接。這樣循環(huán)反復(fù)嘗試,無法真正發(fā)送數(shù)據(jù)?,F(xiàn)在在ScribeSource的Thrift接口中,對size為0的情況返回OK,保證后續(xù)正常發(fā)送數(shù)據(jù)。

3. Flume系統(tǒng)調(diào)優(yōu)經(jīng)驗(yàn)總結(jié)
3.1 基礎(chǔ)參數(shù)調(diào)優(yōu)經(jīng)驗(yàn)
HdfsSink中默認(rèn)的serializer會每寫一行在行尾添加一個換行符,美團(tuán)日志本身帶有換行符,這樣會導(dǎo)致每條日志后面多一個空行,修改配置不要自動添加換行符;
lc.sinks.sink_hdfs.serializer.appendNewline = false
調(diào)大MemoryChannel的capacity,盡量利用MemoryChannel快速的處理能力;
調(diào)大HdfsSink的batchSize,增加吞吐量,減少hdfs的flush次數(shù);
適當(dāng)調(diào)大HdfsSink的callTimeout,避免不必要的超時錯誤;


3.2 HdfsSink獲取Filename的優(yōu)化
HdfsSink的path參數(shù)指明了日志被寫到Hdfs的位置,該參數(shù)中可以引用格式化的參數(shù),將日志寫到一個動態(tài)的目錄中。這方便了日志的管理。例如美團(tuán)可以將日志寫到category分類的目錄,并且按天和按小時存放:

lc.sinks.sink_hdfs.hdfs.path = /user/hive/work/orglog.db/%{category}/dt=%Y%m%d/hour=%H
HdfsS ink中處理每條event時,都要根據(jù)配置獲取此event應(yīng)該寫入的Hdfs path和filename,默認(rèn)的獲取方法是通過正則表達(dá)式替換配置中的變量,獲取真實(shí)的path和filename。因?yàn)榇诉^程是每條event都要做的操作,耗時很長。通過美團(tuán)的測試,20萬條日志,這個操作要耗時6-8s左右。

由于美團(tuán)目前的path和filename有固定的模式,可以通過字符串拼接獲得。而后者比正則匹配快幾十倍。拼接定符串的方式,20萬條日志的操作只需要幾百毫秒。

3.3 HdfsSink的b/m/s優(yōu)化
在美團(tuán)初始的設(shè)計(jì)中,所有的日志都通過一個Channel和一個HdfsSink寫到Hdfs上。美團(tuán)來看一看這樣做有什么問題。

首先,美團(tuán)來看一下HdfsSink在發(fā)送數(shù)據(jù)的邏輯:

Java Code復(fù)制內(nèi)容到剪貼板
  1. //從Channel中取batchSize大小的events   
  2. for (txnEventCount = 0; txnEventCount  batchSize; txnEventCount++) {   
  3.     //對每條日志根據(jù)category append到相應(yīng)的bucketWriter上;   
  4.     bucketWriter.append(event);   
  5. }   
  6.   
  7. for (BucketWriter bucketWriter : writers) {   
  8.     //然后對每一個bucketWriter調(diào)用相應(yīng)的flush方法將數(shù)據(jù)flush到Hdfs上   
  9.     bucketWriter.flush();   
  10. }  

假設(shè)美團(tuán)的系統(tǒng)中有100個category,batchSize大小設(shè)置為20萬。則每20萬條數(shù)據(jù),就需要對100個文件進(jìn)行append或者flush操作。

其次,對于美團(tuán)的日志來說,基本符合80/20原則。即20%的category產(chǎn)生了系統(tǒng)80%的日志量。這樣對大部分日志來說,每20萬條可能只包含幾條日志,也需要往Hdfs上flush一次。

上述的情況會導(dǎo)致HdfsSink寫Hdfs的效率極差。下圖是單Channel的情況下每小時的發(fā)送量和寫hdfs的時間趨勢圖。

鑒于這種實(shí)際應(yīng)用場景,美團(tuán)把日志進(jìn)行了大小歸類,分為big, middle和small三類,這樣可以有效的避免小日志跟著大日志一起頻繁的flush,提升效果明顯。下圖是分隊(duì)列后big隊(duì)列的每小時的發(fā)送量和寫hdfs的時間趨勢圖。

4 未來發(fā)展
目前,F(xiàn)lume日志收集系統(tǒng)提供了一個高可用,高可靠,可擴(kuò)展的分布式服務(wù),已經(jīng)有效地支持了美團(tuán)的日志數(shù)據(jù)收集工作。

后續(xù),美團(tuán)將在如下方面繼續(xù)研究:

日志管理系統(tǒng):圖形化的展示和控制日志收集系統(tǒng);
跟進(jìn)社區(qū)發(fā)展:跟進(jìn)Flume 1.5的進(jìn)展,同時回饋社區(qū);

標(biāo)簽:沈陽 雅安 臺灣 德宏 包頭 廊坊 伊春 江蘇

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《深入剖析美團(tuán)基于Flume的網(wǎng)站日志收集系統(tǒng)》,本文關(guān)鍵詞  深入,剖析,美團(tuán),基于,Flume,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請?zhí)峁┫嚓P(guān)信息告之我們,我們將及時溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。
  • 相關(guān)文章
  • 下面列出與本文章《深入剖析美團(tuán)基于Flume的網(wǎng)站日志收集系統(tǒng)》相關(guān)的同類信息!
  • 本頁收集關(guān)于深入剖析美團(tuán)基于Flume的網(wǎng)站日志收集系統(tǒng)的相關(guān)信息資訊供網(wǎng)民參考!
  • 推薦文章