導讀:Flink是目前流式處理領域的熱門引擎,具備高吞吐、低延遲的特點,在實時數倉、實時風控、實時推薦等多個場景有著廣泛的應用。京東於2018年開始基於Flink+K8s深入打造高效能、穩定、可靠、易用的實時計算平臺,支撐了京東內部多條業務線平穩度過618、雙11多次大促。本次講演將分享京東Flink計算平臺在容器化實踐過程中遇到的問題和方案,在效能、穩定性、易用性等方面對社群版Flink所做的深入的定製和最佳化,以及未來的展望和規劃。主要內容包括:
- 實時計算演進
- Flink容器化實踐
- Flink最佳化改進
- 未來規劃
01實時計算引進1.發展歷程
最初大資料的模式基本都是T+1,但是隨著業務發展,對資料實時性的要求越來越高,比如對於一個數據,希望能夠在分鐘級甚至秒級得到計算結果。京東是在2014年開始基於Storm打造第一代流式計算平臺,並在Storm的基礎上,做了很多最佳化改進,比如基於cgroup實現對worker使用資源的隔離、網路傳輸壓縮最佳化、引入任務粒度toplogy master分擔zk壓力等。到2016年,Storm已經成為京東內部流式處理的最主要的計算引擎,服務於各個業務線,可以達到比較高的實時性。
隨著業務規模的不斷擴大,Storm也暴露出許多問題,特別是對於吞吐量巨大、但是對於延遲不是那麼敏感的業務場景顯得力不從心。於是,京東在2017年引入了Spark Streaming流式計算引擎,用於滿足此類場景業務需要。
隨著業務的發展,不光是對於資料的延遲有很高要求,同時對於資料的吞吐處理能力也有很高的要求,所以迫切需要一個兼具低延遲和高吞吐能力的計算框架,於是在2018年我們引入了Flink。在Flink社群版的基礎上,我們從效能、穩定性、易用性還有功能等方面,都做了一些深入的定製和最佳化。同時我們基於k8s實現了實時計算全面的容器化,因為容器化有很多的優點,它可以做到很好的資源隔離,同時它有一個很強的自愈能力,另外它很容易實現資源的彈性排程。同時我們基於Flink打造了全新的SQL平臺,降低使用者開發實時計算應用的門檻。
到2020年,基於Flink和k8s實時計算平臺已經做的比較完善了。過去流式處理是我們關注的重點,今年我們也開始逐漸的支援批處理,朝著批流一體的方向演進。另外AI是目前比較火的一個方向,對於AI來說,它的實時化也是一個重要的研究方向。所以我們的實時計算平臺將會朝著批流一體和AI的方向進行發展。
2.平臺架構
上面是京東實時計算平臺JRC的整體架構,整個架構以定製化改造後的Flink為核心,Flink執行在K8S上,狀態儲存在HDFS叢集上,透過Zookeeper保證叢集的高可用。支援流式源JDQ(京東基於Kafka深入定製實現的實時資料匯流排)和Hive,資料主要寫入JimDB(京東記憶體資料庫)、ES、Hbase和京東OLAP。計算平臺支援SQL和普通JAR包兩種方式的作業,具有配置、部署、除錯、監控、和日誌處理等功能。
3. 業務場景
京東Flink服務於京東內部非常多的業務線,有70多個一級部門在使用,主要應用場景包括實時數倉,實時大屏,實時推薦,實時報表,實時風控和實時監控,當然還有其他一些應用場景。對資料計算實時性有一定要求的場景,一般都會使用Flink進行開發。
4. 業務規模
京東Flink叢集目前由5000多臺物理機組成,它服務了京東內部70多個一級業務部門,目前線上的流計算任務大概有3000多個,資料的處理能力可以達到每分鐘數十億甚至更高。
02Flink容器化實踐
1.容器化歷程
京東從2018年開始進行計算引擎的容器化改造,2019年初已經實現計算單元全部容器化,2020年進行了容器化方案升級,使用native k8s實現計算資源的彈性擴容。容器化改造的好處是提升了資源使用率,提高了研發效率,增強了業務穩定性,減少了運維部署成本。
2.容器化方案
舊的容器化方案是基於k8s Deployment部署的Standalone Session叢集,它需要事先預估出叢集所需資源,比如需要的JobManager和TaskManager的資源規格和個數。然後JRC平臺透過K8S客戶端向K8S Master提出請求,建立JobManager的Deployment和TaskManager的Deployment。其中使用ZK保證高可用,使用Hdfs實現狀態儲存,使用Prometheus實現監控指標的上傳,結合Grafana實現指標的直觀展示。叢集使用ES儲存日誌,方便日誌的查詢。
3.容器化遇到的問題&對策
容器化過程中可能遇到很多問題:
① JM/TM故障自動恢復
應用部署在容器中,當應用出現異常時,如何發現應用或者異常的情況呢?比如可以使用存活探針,編寫檢測指令碼定期讀取應用的心跳資訊。當檢測到Pod處於不健康狀態時,可以採用k8s的重啟機制來重啟不健康的容器。
② 減少Pod異常對業務影響
在k8s中由於硬體異常、資源過載、Pod不健康等問題會導致Pod被驅逐或自動重啟,Pod重啟時勢必會影響到該Pod上分佈計算任務的正常執行。這個時候可以考慮採用適當的重啟策略、改造核心等方案來減少對任務影響。比如京東實現了JM Failover最佳化,當Pod異常引起JM Failover時採用的是任務不恢復、重建任務狀態恢復的方式,可以一定程度上減少Pod重啟對業務帶來的影響。
③ 效能問題
在容器環境下,JVM對cpu和記憶體的感知會有一定的問題,在Java8版本中,一些引數就要進行顯式的設定。對於機器效能差異或熱點等問題導致部分Pod計算慢的問題,可以考慮進行針對性最佳化(比如實現基於負載的資料分發)或處理(比如檢測到計算慢的Pod將其驅逐到負載較低的機器)。此外,對於使用容器網路的情況下,可能會帶來一定的網路效能損耗,此時可以根據情況選擇使用主機網路避免網路虛擬化帶來的開銷,或者選擇更高效能的網路外掛。
④ 重要業務穩定性
如何保證業務的穩定性是一個需要重點考慮的問題。除了保證系統各個環節的高可用外,還可以根據業務情況考慮使用其它合理的方案,例如業務分級管理,獨立資源池,多機房互備等。
4.容器化方案升級(Native k8s)
原有容器化方案存在一定的問題:
- 資源需要提前分配
- 無法實現資源彈性伸縮
- 極端場景下Pod不能正常拉起,影響任務恢復
- 重要業務穩定性
容器化升級的解決方案是採用Native K8s的方式。由JRC平臺先向K8S Master發出請求,建立JobManager的Deployment;然後在使用者透過Rest服務提交任務後,由JobMaster透過JDResourceManager 向JRC平臺發出請求,然後JRC平臺向 K8s Master 動態申請資源去建立執行TaskManager 的Pod。
此處,透過引入JRC平臺與K8s互動,遮蔽了不同容器平臺的差異,解耦了映象與平臺叢集配置&邏輯變化。另外,為了相容原有Slot分配策略,在提交任務時會預估出任務所需資源並一次性申請,之後採用等待一定時間後進行slot分配的方式達到相容目的。
03Flink最佳化改進
主要做了以下四個方面的最佳化:
- 效能
- 穩定性
- 易用性
- 功能擴充套件
下邊分幾個重要的點進行講解:
1.預覽拓撲
預覽拓撲主要是為了解決業務的一些痛點:比如任務調優繁瑣、SQL任務無法指定並行度、任務需要的額Slot數不清楚、並行度調整後網路buffer不足等。在Flink任務除錯階段,對任務並行度、Slot分組、Chaining策略的調整是個反覆的過程,如果把引數寫到命令列就太繁瑣了。而基於預覽拓撲就可以很方便地對這些引數進行配置。
預覽拓撲基本的實現方案如上圖:使用者提交JAR包後可根據JAR包生成對應的拓撲圖,之後使用者根據拓撲圖可以進行線上調整,最後自動將修改後的配置和原來的JAR包一起進行任務提交。
預覽拓撲機制使得不修改程式多次提交任務調優成為可能,但是如何保證前後兩次提交生成運算元穩定的對應關係呢?解決方案的關鍵是保證運算元有穩定的唯一身份標識,具體演算法是:如果運算元指定了uidHash就用uidHash,如果運算元指定了uid就使用uid,否則就從source開始廣度優先遍歷,利用運算元在graph中的位置生成一個穩定hash值。
2.背壓量化
第二個重要的最佳化是背壓量化。
在Flink開發的時候,主要有兩種方式:
①透過Flink UI背壓面板觀察是否背壓。使用這種方式在某些場景比較方便,但是它存在幾個問題:
- 在有些場景下采集不到背壓
- 對於歷史背壓情況無法跟蹤
- 背壓影響不直觀
- 大並行度時背壓採集壓力
②透過任務背壓相關指標進行觀察和分析,透過將指標定期採集並存儲起來,可以進行實時或歷史的背壓分析。但是它也有一些不足的地方:
- 不同Flink版本中指標含義有一定差異
- 分析背壓有一定門檻,需要對於指標含義有深入理解,聯合進行分析
- 背壓未量化,對業務影響程度不夠直觀
京東的解決方案是採集背壓發生的位置、時間和次數指標,並對這些指標進行上報儲存。同時對量化的背壓指標結合執行時拓撲,可以精確反映發生背壓現場的情況。
3.HDFS最佳化
隨著業務數量的增多,HDFS叢集的壓力就會變得很大。這會直接導致RPC響應時間變慢,造成請求堆積,同時大量小檔案也會對NN記憶體造成很大壓力。對此京東嘗試的解決方案有4方面:限制checkpoint最小間隔,時間最小設定在1min左右可以滿足大部分業務需求;進行小檔案合併;降低cp建立和刪除時的hdfs rpc請求;HDFS叢集多ns分散均衡壓力。
4.網路分發最佳化
在實踐過程中我們發現,即使業務使用了rebalance並且對任務進行了打散分佈,但是由於機器處理能力和負載的差異,會導致任務各個並行度不同程式的背壓表現,嚴重影響了任務的效能。為此,我們開發了基於負載的動態rebalance,在資料進行分發時優先選擇下游負載最小的channel進行分發。
經測試,在特定場景下效能能夠提升近一倍。
5.ZK防抖
目前一般都是使用ZK叢集實現Flink叢集的高可用,但是當網路抖動、機器繁忙、ZK叢集暫時無響應或運維機器的時候,都可能會導致任務重啟。
任務重啟的原因是由於在這些場景發生時,Curator會將狀態設定為suspended,並且Curator認為suspended為Error狀態,從而會釋放leader,Flink發現notleader後會revokeLeadership,從而造成任務重啟。
一個可行的解決辦法是升級Curator的版本,同時將connectionStateErrorPolicy設定為SessionConnetionStateErrorPolicy。
6.日誌分離
目前我們一個叢集是支援跑多個任務的,這時日誌會出現的問題是:任務的日誌和叢集Framework日誌混在一起,同時叢集的多個任務日誌也是混在一起的,不太方便使用者檢視日誌,快速定位問題。
為了解決這個問題,首先要弄清楚目前Flink載入日誌框架的基本機制:為了避免跟業務Job中可能包含的日誌框架的依賴、配置檔案產生衝突,Flink日誌相關類的載入都代理給TaskManager框架的類載入器,也就是Parent Classloader,而框架載入的這些類都是從Flink安裝包的lib目錄下載入的。對於日誌配置檔案,Flink透過 JVM 啟動引數來指定配置日誌配置檔案路徑。
日誌分離的解決方案是:將日誌相關jar包加入到各個task自己classloader(user classloader)的類路徑中;同時確保使用user classloader載入日誌類和載入自己的日誌配置;
另外對於使用了Flink框架的類(比如PrintSinkFunction),日誌不能做到很好的分離,可以考慮使用logback MDC機制。
04未來規劃
未來規劃主要包括四個方面:
① 統一計算引擎
引擎Storm全部升級為Flink,這樣可以減少平臺的運維成本,同時可以提高作業效能(目前已經接近完成)。
② 更多SQL作業
持續完善SQL平臺,降低使用者的使用門檻,推動使用者更多使用SQL開發作業。
③ 智慧運維
使用智慧診斷,自適應調整執行引數,提升任務的魯棒性
④ 批流一體
深度打造批流一體實時計算平臺,兼具低延遲的流處理和高效能的批處理能力。另外統一架構,實現程式碼複用,降低使用者的使用成本。
今天的分享就到這裡,謝謝大家。
嘉賓介紹:
付海濤
京東 | 高階技術專家擁有多年中介軟體、網際網路雲平臺和大資料開發經驗,對分散式計算、容器、微服務有較深入的理解。2018年加入京東,主要負責實時計算引擎storm、flink的相關最佳化和開發工作。
分享嘉賓:付海濤 京東 高階技術專家
編輯整理:趙明明
出品平臺:DataFunTalk