![]()
作者 | 作業(yè)幫大數據團隊(劉澤強、孫建業(yè))
本文主要分享 25 年 Flink on k8s 的探索與實踐,包括選型思考、平臺架構演進、日志觀測、Flink 版本升級、兼容性適配、工具遷移、穩(wěn)定性和性能優(yōu)化等關鍵內容。
歷史背景
作業(yè)幫實時計算主要基于 Flink 構建,共有 3000 多個任務,均采用 Per-Job 模式部署在 Yarn 集群,因 sla 要求差異不同部門間集群獨立。歷史 on Yarn 模式主要面對問題如下。
資源隔離粒度粗。Yarn 集群本質通過內存隔離,因業(yè)務線差異,流量類 Flink 任務處理大量字符串匹配消耗 cpu 多,導致同集群其他任務吞吐下降,出現延遲;
資源利用率低。無論采用多集群部署還是單集群 Node Label 方式,都需要預留資源防止機器故障、新增任務等情況,導致資源利用率偏低。同時各部門場景差異,集群資源使用特點不同,資源共池管理、任務間資源強隔離后可實現互補。例如流量類場景會做大量字符串匹配 cpu 消耗高,簡單數據同步 cpu 消耗低等;
平臺邏輯復雜。部分業(yè)務有高可用需求,我們部署兩套 Yarn 集群作為主備,狀態(tài)存儲在對象存儲,Flink 任務托管平臺提供切換能力,單集群異常時自動切換。這種模式調度邏輯非常復雜,同時備用資源存在浪費;
選型思考
任務提交方式
目前行業(yè)中有多種提交方式。首先 Standalone 模式,這種模式資源靜態(tài)分配,Flink 任務擴縮繁瑣。其次是 Native 模式,Flink 原生支持,通過構造 Flink 命令提交任務,在平臺層可根據多版本歷史任務情況靈活適配。核心邏輯是 Flink 內置了 Kubernetes client,直接和 k8s api server 服務通訊,創(chuàng)建 JobManager 部署。Flink 的 ResourceManager 將與 k8s api server 服務器通信,動態(tài)按需分配和釋放 TaskManager pod,與 Yarn 原理類似。還有 Flink Operator 模式,更貼近 k8s 生態(tài),通過構造 Flink CR 提交給 k8s api server,Operator Observer 通過 k8s api 檢查 JM Deployment 狀態(tài)、Flink Rest api 獲取 job 指標等信息,Validate 根據預設規(guī)則檢查用戶提交 spec 是否合法,Reconciler 對比“觀察到”的狀態(tài)與“期望”的狀態(tài)決定操作動作,Updater 將最終的執(zhí)行結果和新的狀態(tài)再次同步給 K8s API Server,Operator 托管了 Flink 任務完整生命周期。對平臺而言復雜度大大降低,同時 Operator 模式還兼顧動態(tài)擴縮能力,為實時離線資源彈性提供基礎。最終采用了 Flink K8s Operator 模式。
![]()
任務狀態(tài)觀測
Flink 任務可觀測性主要分為如下幾個方面
Flink 運行態(tài) WebUI:通過 Ingress Service 來代理實現;
Flink 歷史日志查看:通過部署 LogAgent 收集 pod 日志到 kafka,然后落到對象存儲中,通過 LogSearch 檢索。LogAgent 和 LogSearch 復用了在線業(yè)務日志能力。
Flink Metrics 監(jiān)控:在官方 PrometheusReporter 的基礎上增加了 discovery 的功能。Container 的 HTTPServer 啟動后,把對應的 ip:port 以臨時節(jié)點的形式注冊到 zk 上,然后利用 Prometheus 的 discover targets 監(jiān)聽 zk 節(jié)點的變化。由于是臨時節(jié)點,Container 銷毀時節(jié)點消失,Prometheus 感知后停止抓取。
K8S Events 信息:通過 k8s watch 機制訂閱關鍵事件進行存儲;
![]()
環(huán)境資源和成本
作業(yè)幫大數據是建立在多個公有云半托管 EMR 上的,Flink on yarn 模式轉變?yōu)?k8s 時切換到了公有云的 k8s 產品。Flink 任務雖然是兩階段資源申請,但整體形態(tài)和在線業(yè)務場景很像,都是長時服務,在調度方面已有能力基本都能滿足需求。Flink 整體資源規(guī)模萬核左右,pod 個數大概 1.3 萬,常規(guī) k8s 集群調度效率百級每秒,全部任務 1min 左右拉起來,性能滿足需求。
考慮降本效果,單云一個 K8s 集群,采用固定節(jié)點和公有云 serverless 兜底的方式最大化消除 buffer 資源。每個業(yè)務部門一個 Flink k8s Operator 和 namespace 來代替 Yarn 隊列概念,通過 quota 控制資源使用上限。為保障單節(jié)點利用效率 request = 0.1*slot(1 slot 等于 1core), limit = request 超用系數。超用系數根據 Flink 任務特點、穩(wěn)定性、節(jié)點負載等情況調節(jié)。業(yè)務成本分攤由按集群、隊列方式轉變?yōu)楦鶕蝿諏嶋H占用情況分攤,優(yōu)化任務后降本效果立刻體現,避免了平臺和業(yè)務的溝通、運維成本;
技術方案
平臺整體架構
核心兩個方面變化,一個是調度服務解除了 Flink 生命周期管理,并且服務本身無狀態(tài)僅用于處理請求。修改 Operator 將任務狀態(tài)主動上報調度服務。二是高可用保障利用 k8s 拉齊,不在依賴主備集群模式,低保障集群狀態(tài)全部切換到對象存儲。詳細如下對比圖
![]()
任務遷移方案
我們線上 Flink 任務版本比較雜 1.14.x、1.12.x 甚至還有 1.9.x,歷史較低版本的 Flink 在能力和性能上還存在缺陷,我們做了一些適配和優(yōu)化,當前絕大多數能力在最新版本基本都已解決。同時 Operator 模式對 Flink 最低版本有要求,所以在遷移時將 Flink 任務升級到最新穩(wěn)定版。考慮整體變化比較多,最靠譜的方式雙跑對數保障準確,根據已有監(jiān)控關注延遲保障效率。
![]()
驗數整體思想是將 Flink 無界流利用 source offset 轉為有界流,工具化解析并替換為測試 source、sink,分別啟動新老版本的測試任務,通過 sum(hash(fields)) 和 count(1) 方式驗證數據準確性。詳細內容如下圖
Jar 任務整體占比不足 10%,提供對數工具、解決方案等輔助方式配合業(yè)務遷移。
核心問題
Flink 版本兼容性問題
Flink 高、低版本 kafka state 不兼容問題
針對需要從 state 恢復的任務,我們需要在遷移任務的過程中,通過 state processor api 讀取老的 state,然后適配為新的 state(state name, state 序列化類等),然后遷移任務;
Flink 高、低版本語法兼容性問題
高版本 cast 方法要求更嚴格,如果要達到和低版本 cast 一樣的效果,需要使用 try_cast
不同類型數據隱士轉換失敗問題(比如 int -> string),高版本要求更嚴格,會直接報錯失敗,需要修改 flink ddl 對應數據類型
高版本 Flink udf 針對 nested 返回格式的函數要求更嚴格,需要顯示在 udf 中設置相關的 DataTypeHint, 否則運行失敗
高版本 kafka connector 和低版本 kafka connector 邏輯差異:低版本解析數據失敗,try catch 打印錯誤; 高版本會直接讓任務失敗。在 sql 中設置 parse-fail.ignore 參數解決;
Flink 升級后,upsert sink 相關的任務的 state 一直變大,最終任務 oom 掛掉
高版本 Flink 針對 cdc 數據寫入 Upsert Sink,會引入 SinkMaterializer 算子,保證亂序數據的準確性,會導致算子 state 一直擴大
原理:高版本 Flink 針對 cdc 數據寫入 Upsert Sink,為了防止 change log 亂序造成的數據準確性的問題, 引入了 SinkUpsertMaterializer 算子,該算子會根據主鍵維護一個 keydState, 在主鍵比較多的情況下,會占用很大的 state。
解決:確保主鍵不亂序的情況下, 設置 table.exec.sink.upsert-materialize = None 或者 設置 state ttl
Flink OOM kill 問題
問題原因:pod oom-kill 基本都是 jvm 的堆外內存溢出導致的,經過內存分析,發(fā)現是 k8s 節(jié)點的透明大頁開啟導致(Yarn 節(jié)點默認關閉)。
核心原理:透明大頁主要是 linux 為了解決大內存分配場景下,因為默認 page 較小導致 page table 過大,CPU 查找虛擬地址到物理地址映射(TLB, Translation Lookaside Buffer)的效率降低引入的。但是針對 Flink 應用等下內存場景下,卻會導致內存過度分配等問題。
解決方案:因為我們的 Flink 場景下,pod 都是小內存,普遍 2G 左右,透明大頁會造成內存浪費的問題。關閉透明大頁后,堆外內存溢出的問題基本沒再出現。
無資源壓力情況下處理效率變低問題
問題原因:任務的 cpu 使用率沒有超過 pod 的 limit 限制,機器的 cpu 使用率也比較低,但是任務卻延遲嚴重。原因為 K8S CPU Throttling 導致的, 雖然沒有超過 limit,但是已經觸發(fā)了多次的 throtting
核心原理:Kubernetes 進行 cpu 資源隔離是通過 Linux CGroup 的(時間周期,默認 100ms=10000us)和(周期內的 CPU 時間上限)共同限制進程 CPU 使用,若進程在周期內用盡,內核會 throttle(暫停)其 CPU 使用,直到下一個周期開始。實際生產中,突發(fā) / 波動流量會導致進程快速消耗完,進入 throttling 狀態(tài),從而造成進程無法繼續(xù)用 CPU,引發(fā)業(yè)務延遲。
解決方案:全局配置 cpu burst 超用 或者 任務配置更大的 slot(cpu limit)緩解 throtting(我們線上采用這種方式)
資源文件依賴問題
Flink on k8s 支持了 remote artifact fetching 能力,但是不支持 zip 依賴、http 重定向報錯、TM 無法引用等問題。對 HttpArtifactFetcher 進行了擴展。
支持遞歸提取 tgz、zip 等壓縮包中的 jar 包,確保依賴資源的正確加載。
兼容 http 到 https 的協(xié)議重定向,解決了跨協(xié)議資源獲取的問題。
在 TaskManager 啟動時,將相關依賴加載到其 classpath 中,保證任務的正常運行。
ConfigMap 優(yōu)化
默認 Flink 保留近三天的 checkpoint,個數比較多,configMap 中存儲的元信息比較大,導致超過 etcd 1MB 大小限制同時當 configmap 較大、大量 Flink 作業(yè)訪問 configmap 時,對 etcd 的網絡 IO 和磁盤 IO、apiserver 的內存都有較大的壓力。
我們的做法是:修改 Flink 源碼,在寫入 configmap 的時候,將 configmap 相關的信息先 gzip 壓縮再存儲,讀取的時候先 gzip 解壓再返回,以此降低存儲大小保證管控面的穩(wěn)定性。另外該能力通過參數的形式進行控制,這樣也保留了原生 Flink ConfigMap 讀寫的能力
節(jié)點負載不均問題
產生節(jié)點負載不均的原因有多種場景,例如 Cpu 密集型 Flink 任務多個 TM 分配到相同節(jié)點導致 Cpu 覆蓋高、新增任務時非資源使用高峰導致調度誤判引起高峰期時負載不均、高峰期過渡驅逐導致預期外任務重啟。解決方案多種邏輯配合進行,首先 Pod 反親和,盡可能將單個 Flink Job pod 打散,避免單資源密集型任務 pod 過渡集中。其次高負載節(jié)點降壓,以 Pod 實際占用資源和負載情況打分,逐步驅逐高分 Pod。還有負載調度,預選階段根據負載指標閾值,排除高負載節(jié)點,優(yōu)選階段綜合考慮節(jié)點負載、Pod 親和性、Pod 數量、鏡像本地性等因素進行打分,將任務調度到最優(yōu)節(jié)點。
任務停止慢問題
任務暫停時,我們通過平臺調度服務調用 k8s api server 直接將 flinkdeploymnet 給刪掉,Operator 檢測到刪除 event 事件后進入 cleanup 流程,清理 metrics、autoscaler 等相關信息、刪除 Job Manager Pod、Deployment、Ha 數據等。整體流程似乎沒啥問題,但是耗時比較慢大概 1~2min,即便任務 pod 數量很少也是一樣。
分析 Operator 日志發(fā)現,耗時主要發(fā)生在刪除 jobmanager 階段。老的 taskmanager 收到 SIGNAL 15: SIGTERM,然后 jobmanager 申請新的 taskmanager。分析 k8s 的審計日志,在刪除 deployment 的時候出現了 taskmanager pod 新創(chuàng)建的情況,新增的 taskmanager pod 因為 flink configmap 被刪除,導致 pod 無法 mount,然后 pending 1~2min。
綜合分析,flinkdeployment finalizer 自身的刪除邏輯和 k8s 資源刪除邏輯同步執(zhí)行,導致觸發(fā)了 flink 任務的 failover 流程,從而出現新增 taskmanager pod 的情況出現;同時由于 flink configmap 也同步被刪除,導致 creating pod 卡住,進一步導致 deployment 的刪除卡住。解決方案在刪除資源的時候,把 Propagation 從 Foreground 改為了默認的 Background,讓 K8S 先走完 Operator 自身的刪除流程,再走默認的資源清楚邏輯。
整體收益
目前整體已有 95% 的任務完成遷移,總體資源消耗降低 20%。集群數量減少,平臺復雜度降低,運維成本減低。集群整體 sla 提高,向上拉齊。
未來探索
Flink 任務的彈性擴縮和資源調優(yōu)(基于 Flink Kubernetes Operator)。目前高低峰明顯,實際機器資源利用率差 60% 多;
現有業(yè)務實時、離線場景天然錯峰,考慮基于 k8s 彈性混布。
![]()
特別聲明:以上內容(如有圖片或視頻亦包括在內)為自媒體平臺“網易號”用戶上傳并發(fā)布,本平臺僅提供信息存儲服務。
Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.