本文轉(zhuǎn)載自「之家技術(shù)」,作者劉首維。介紹了汽車之家在基于 Flink 的實時物化視圖的一些實踐經(jīng)驗與探索,并嘗試讓用戶直接以批處理 SQL 的思路開發(fā) Flink Streaming SQL 任務(wù)。主要內(nèi)容為:
- 系統(tǒng)分析與問題拆解
- 問題解決與系統(tǒng)實現(xiàn)
- 實時物化視圖實踐
- 限制與不足
- 總結(jié)與展望
物化視圖這一功能想必大家都不陌生,我們可以通過使用物化視圖,將預(yù)先設(shè)定好的復(fù)雜 SQL 邏輯,以增量迭代的形式實時 (按照事務(wù)地) 更新結(jié)果集,從而通過查詢結(jié)果集來避免每次查詢復(fù)雜的開銷,從而節(jié)省時間與計算資源。事實上,很多數(shù)據(jù)庫系統(tǒng)和 OLAP 引擎都不同程度地支持了物化視圖。另一方面,Streaming SQL 本身就和物化視圖有著很深的聯(lián)系,那么基于 Apche Flink (下稱 Flink) SQL 去做一套實時物化視圖系統(tǒng)是一件十分自然而然的事情了。
本文介紹了汽車之家 (下稱之家) 在基于 Flink 的實時物化視圖的一些實踐經(jīng)驗與探索,并嘗試讓用戶直接以批處理 SQL 的思路開發(fā) Flink Streaming SQL 任務(wù)。希望能給大家?guī)硪恍﹩l(fā),共同探索這一領(lǐng)域。
Flink 在 Table & SQL 模塊做了大量的工作,F(xiàn)link SQL 已經(jīng)實現(xiàn)了一套成熟與相對完備的 SQL 系統(tǒng),同時,我們也在 Flink SQL 上有著比較多的技術(shù)和產(chǎn)品積累,直接基于 Flink SQL 本身就已經(jīng)解決了構(gòu)建實時物化系統(tǒng)的大部分問題,而唯一一個需要我們解決的問題是如何不重不漏地生成數(shù)據(jù)源表對應(yīng)的語義完備的 Changelog DataStream,包括增量和全量歷史兩部分。
雖然規(guī)約到只剩一個問題,但是這個問題解決起來還是比較困難的,那我們將這個問題繼續(xù)拆解為以下幾個子問題:
1. 加載全量數(shù)據(jù);
2. 加載增量數(shù)據(jù);
3. 增量數(shù)據(jù)與全量數(shù)據(jù)整合。
復(fù)制代碼
增量數(shù)據(jù)加載還是相對比較好解決的,我們直接復(fù)用實時數(shù)據(jù)傳輸平臺的基礎(chǔ)建設(shè)。數(shù)據(jù)傳輸平臺[1] 已經(jīng)將 Mysql / SqlServer / TiDB 等增量數(shù)據(jù)以統(tǒng)一的數(shù)據(jù)格式寫入到特定的 Kafka Topic 中,我們只要獲取到對應(yīng)的 Kafka Topic 就可以進行讀取即可。
對于全量數(shù)據(jù)載入,我們先后寫了兩個版本。
第一版我們用 Legacy Source 寫了一套 BulkLoadSourceFunction
,這一版的思路比較樸素,就是全量從數(shù)據(jù)源表進行查詢。這個版本確實能完成全量數(shù)據(jù)的加載,但是問題也是比較明顯的。如果在 bulk load 階段作業(yè)發(fā)生了重啟,我們就不得不重新進行全量數(shù)據(jù)加載。對于數(shù)據(jù)量大的表,這個問題帶來的后果還是比較嚴(yán)重的。
對于第一版的固有問題,我們一直都沒有特別好的對策,直到 Flink-CDC[2] 2.0 的發(fā)布。我們參考了 Flink-CDC 的全量數(shù)據(jù)加載階段支持 Checkpoint 的思路,基于 FLIP-27 開發(fā)了新的 BulkLoadSource
。第二版不論在性能上還是可用性上,對比第一版都有了大幅提升。
這三個子問題中,問題三的難度是遠(yuǎn)大于前面兩個子問題的。這個問題的樸素思路或許很簡單,我們只要按照 Key 緩存全部數(shù)據(jù),然后根據(jù)增量數(shù)據(jù)流來觸發(fā) Changelog DataStream 更新即可。
事實上我們也曾按照這個思路開發(fā)了一版整合邏輯的算子。這版算子對于小表還是比較 work 的,但是對于大表,這種思路固有的 overhead 開始變得不可接受。我們曾用一張數(shù)據(jù)量在 12 億,大小約 120G 的 SqlServer 表進行測試,本身就巨大的數(shù)據(jù)再加上 JVM 上不可避免的膨脹,狀態(tài)大小變得比較夸張。經(jīng)過這次測試,我們一致認(rèn)為這樣粗放的策略似乎不適合作為生產(chǎn)版本發(fā)布,于是我們不得不開始重新思考數(shù)據(jù)整合的算法與策略。
在談?wù)撐覀兊乃惴ㄔO(shè)計思路之前,我不得不提到 DBLog[3] 的算法設(shè)計, 這個算法的核心思路利用 watermark 對歷史數(shù)據(jù)進行標(biāo)識,并和對應(yīng)的增量數(shù)據(jù)進行合并,達到不使用鎖即可完成整個增量數(shù)據(jù)和歷史數(shù)據(jù)的整合,F(xiàn)link-CDC 也是基于這個思路進行的實現(xiàn)與改進。在相關(guān)資料搜集和分析的過程中,我們發(fā)現(xiàn)我們的算法思路與 DBLog 的算法的核心思路非常相似, 但是是基于我們的場景和情況進行了設(shè)計與特化。
首先分析我們的情況:
結(jié)合上述情況進行分析,我們來規(guī)約一下這個算法必須要達成的目標(biāo):
經(jīng)過大家的分析與討論后,我們設(shè)計出了一套數(shù)據(jù)整合的算法,命名為 Global Version Based Pause-free Change-Data-Capture Algorithm。
我們同時讀入 BulkLoadSource
的全量數(shù)據(jù)與 RealtimeChangelogSource
增量數(shù)據(jù),并根據(jù)主鍵進行 KeyBy 與 Connect,而算法的核心邏輯主要由之后的 KeyedCoProcess 階段完成。下面交待幾個關(guān)鍵的字段值:
KeyedCoProcess 收到全量數(shù)據(jù)后,不會直接發(fā)送,而是先緩存起來,等到 Watermark 的值大于該 SearchTs 后發(fā)送并清除對應(yīng) version0 版本數(shù)據(jù)的緩存。在等待的期間,如果有對應(yīng)的 Changlog Data,就將被緩存的 Version0 全量數(shù)據(jù)丟棄,然后處理 Changelog Data 并發(fā)送。在整個數(shù)據(jù)處理的流程中,全量數(shù)據(jù)和增量數(shù)據(jù)都是同時進行消費與處理的,完全不需要引入暫停階段來進行數(shù)據(jù)的整合。
增量數(shù)據(jù)在全量數(shù)據(jù)發(fā)送 watermark 之前到來,只發(fā)送增量數(shù)據(jù)即可,全量數(shù)據(jù)直接丟棄
復(fù)制代碼
全量數(shù)據(jù)發(fā)送 watermark 到達后,仍未有對應(yīng)的增量數(shù)據(jù),直接發(fā)送全量數(shù)據(jù)
復(fù)制代碼
我們決定以 Flink Connector 的形式開展算法的實現(xiàn),我們以接入 SDK 的名字 Estuary 為該 Connector 命名。通過使用 DataStreamScanProvider
,來完成 Source 內(nèi)部算子間的串聯(lián),Source 的算子組織如下圖 (chain 到一起的算子已拆開展示)。
BulkLoadSource
/ ChangelogSource
主要負(fù)責(zé)數(shù)據(jù)的讀入和統(tǒng)一格式處理;BulkNormalize
/ ChangelogNormalize
主要是負(fù)責(zé)處理數(shù)據(jù)運行時信息的添加與覆蓋,主鍵語義處理等工作;WatermarkGenerator
是針對算法工作需求定制的 Watermark 生成邏輯的算子;VersionBasedKeyedCoProcess
就是核心的處理合并邏輯和 RowKind 語義完備性的算子。算法實現(xiàn)的過程中還是有很多需要優(yōu)化或者進行權(quán)衡的點。全量數(shù)據(jù)進入 CoProcess 數(shù)據(jù)后,會首先檢查當(dāng)前是否處理過更大版本的數(shù)據(jù),如果沒有的話才進行處理,數(shù)據(jù)首先會被存入 State 中并根據(jù) SearchTs + T (T 是我們設(shè)置的固有時延) 注冊 EventTimeTimer。如果沒有高版本的數(shù)據(jù)到來,定時器觸發(fā)發(fā)送 Version 0 的數(shù)據(jù),否則直接拋棄改為發(fā)送 RowKind 語義處理好的高版本增量數(shù)據(jù)。
另一方面,避免狀態(tài)的無限增長,當(dāng)系統(tǒng)判定 BulkLoad 階段結(jié)束后,會結(jié)束對相關(guān) Flink State 的使用,存在的 State 只要等待 TTL 過期即可。
另外,我們針對在數(shù)據(jù)同步且下游 Sink 支持 Upsert 能力的場景下,開發(fā)了特別優(yōu)化的超輕量模式,可以以超低的 overhead 完成全量+增量的數(shù)據(jù)同步。
開發(fā)完成后,我們的反復(fù)測試修改與驗證,完成 MVP 版本的開發(fā)。
MVP 版本發(fā)布后,我們與用戶同學(xué)一起,進行了基于 Flink 的物化視圖試點。
下面是用戶的一個真實生產(chǎn)需求:有三張表,分別來自于 TiDB /。SqlServer / Mysql,數(shù)據(jù)行數(shù)分別為千萬級 / 億級 / 千萬級,計算邏輯相對復(fù)雜,涉及到去重,多表 Join。原有通過離線批處理產(chǎn)生 T+1 的結(jié)果表。而用戶希望盡可能降低該 Pipeline 的延遲。
由于我們使用的 TiCDC Update 數(shù)據(jù)尚不包含 -U 部分,故 TiDB 表的整合算法還是采取 Legacy Mode 進行加載。
我們與用戶溝通,建議他們以批處理的思路去編寫 Flink SQL,把結(jié)果的明細(xì)數(shù)據(jù)的數(shù)據(jù)輸出到 StarRocks 中。用戶也在我們的協(xié)助下,較為快速地完成了 SQL 的開發(fā),任務(wù)的計算拓補圖如下:
結(jié)果是相當(dāng)讓人驚喜的!我們成功地在保證了數(shù)據(jù)準(zhǔn)確性的情況下,將原來天級延遲的 Pipeline 降低至 10s 左右的延遲。數(shù)據(jù)也從原來查詢 Hive 變?yōu)椴樵?StarRocks,不論從數(shù)據(jù)接入,數(shù)據(jù)預(yù)計算,還是數(shù)據(jù)計算與查詢,實現(xiàn)了全面的實時化。另一方面,三張表每秒的增量最大不超過 300 條,且該任務(wù)不存在更新放大的問題,所以資源使用相當(dāng)?shù)纳?。根?jù)監(jiān)控反饋的信息,初始化階段完成后,整個任務(wù) TM 部分只需要使用 1 個 Cpu (on YARN),且 Cpu 使用常態(tài)不超過 20%。對比原來批處理的資源使用,無疑也是巨大提升。
正如上文提到的,對于數(shù)據(jù)同步,我們做了專門的優(yōu)化。只需要使用專用的 Source 表,就可以一鍵開啟歷史數(shù)據(jù) + 增量數(shù)據(jù)數(shù)據(jù)同步,大大簡化了數(shù)據(jù)同步的流程。我們目前嘗試使用該功能將數(shù)據(jù)同步至基于 Iceberg 的數(shù)據(jù)湖中,從數(shù)據(jù)同步層面大幅提升數(shù)據(jù)新鮮度。
雖然我們在這個方向的探索取得了一定成果,但是仍有一定的限制和不足。
仔細(xì)閱讀上面算法原理,我們會發(fā)現(xiàn),不論是 SearchTs 的生成還是 Watermark 的生成,實際上最后都依賴了服務(wù)器系統(tǒng)的時鐘,而非依賴類似 Time Oracle 機制。我們雖然算法實現(xiàn)上引入固有延遲去規(guī)避這個問題,但是如果服務(wù)器出現(xiàn)非常嚴(yán)重時鐘不一致,超過固有延遲的話,此時 watermark 是不可靠的,有可能會造成處理邏輯的錯誤。
經(jīng)確認(rèn),之家服務(wù)器時鐘會進行校準(zhǔn)操作。
事實上我們目前這套實現(xiàn)沒有任何事務(wù)相關(guān)的保證機制,僅能承諾結(jié)果的最終一致性,最終一致性其實是一種相當(dāng)弱的保證。就拿上文提到的例子來說,如果其中一張表存在 2 個小時的消費延遲,另一張表基本不存在延遲,這個時候兩表 Join 產(chǎn)生的結(jié)果其實是一種中間狀態(tài),或者說對于外部系統(tǒng)應(yīng)該是不可見的。
為了完成更高的一致性保證,避免上面問題的產(chǎn)生,我們自然會想到引入事務(wù)提交機制。然而目前我們暫時沒有找到比較好的實現(xiàn)思路,但是可以探討下我們目前的思考。
事務(wù)這個概念想必大家或多或少都有認(rèn)識,在此不多贅述。如何數(shù)據(jù)庫系統(tǒng)內(nèi)部定義事務(wù)是一件特別自然且必要的事情,但是如何在這種跨數(shù)據(jù)源場景下定義事務(wù),其實是一件非常困難的事情。還是以上文的例子來展開,我們能看到數(shù)據(jù)源來自各種不同數(shù)據(jù)庫,我們其實對于單表記錄了對應(yīng)的事務(wù)信息,但是確實沒有辦法定義來自不同數(shù)據(jù)源的統(tǒng)一事務(wù)。我們目前的樸素思路是根據(jù)數(shù)據(jù)產(chǎn)生的時間為基準(zhǔn),結(jié)合 checkpoint 統(tǒng)一劃定 Epoch,實現(xiàn)類似 Epoch-based Commit 的提交機制。但是這樣做又回到前面提到的問題,需要對服務(wù)器時間產(chǎn)生依賴,無法從根源保證正確性。
對于 Flink 物化視圖一致性提交這個問題,TiFlink[4] 已經(jīng)做了很多相關(guān)工作。但是我們的 Source 來自不同數(shù)據(jù)源,且讀取自 Kafka,所以問題變得更為復(fù)雜,還是上面提到的例子,兩張表 Join 過后,如果想保證一致性,不只是 Source 和 Sink 算子,整個關(guān)系代數(shù)算子體系都需要考慮引入事務(wù)提交的概念和機制,從而避免中間狀態(tài)的對外部系統(tǒng)的發(fā)布。
這個問題其實比較好理解。現(xiàn)在有兩張表 join,對于左表的每一行數(shù)據(jù),對應(yīng)右表都有 n (n > 100) 條數(shù)據(jù)與之對應(yīng)。那么現(xiàn)在更新左表的任意一行,都會有 2n 的更新放大。
目前整套算法在全量同步階段的 Overhead 雖然可控,但是仍有優(yōu)化空間。我們目前實測,對于一張數(shù)據(jù)量在 1 億左右的表,在全量數(shù)據(jù)階段,需要峰值最大為 1.5G 左右的 State。我們打算在下個版本繼續(xù)優(yōu)化狀態(tài)大小,最直接的思路就是 BulkSource
通知 KeyedCoProcess
哪些主鍵集合是已經(jīng)處理完畢的,這樣可以使對應(yīng)的 Key 提早進入全量階段完成模式,從而進一步優(yōu)化狀態(tài)大小。
本文分析了基于 Flink 物化視圖實現(xiàn)的問題與挑戰(zhàn),著重介紹了處理生成完整的 Changelog DataStream 的算法與實現(xiàn)和在業(yè)務(wù)上的收益,也充分闡述了目前的限制與不足。
雖然這次實踐的結(jié)果稱不上完備,存在一些問題亟待解決,但是我們?nèi)钥吹搅司薮蟮耐黄婆c進步,不論是從技術(shù)還是業(yè)務(wù)使用上。我們充分相信未來這項技術(shù)會越來越成熟,越來越被更多人認(rèn)可和使用,也通過此次探索充分驗證了流處理和批處理的統(tǒng)一性。
我們目前的實現(xiàn)還處在早期版本,仍有著工程優(yōu)化和 bug fix 的空間與工作 (比如前文提到的兩表的推進的 skew 太大問題,可以嘗試引入 Coordinator 進行調(diào)節(jié)與對齊),但是相信隨著不斷的迭代與發(fā)展,這項工作會變得越來越穩(wěn)固,從而支撐更多業(yè)務(wù)場景,充分提升數(shù)據(jù)處理的質(zhì)量與效率!
特別鳴謝張茄子和云邪老師的幫助與勘誤。
[1] mp.weixin.qq.com/s/KQH-relbr…
[4] zhuanlan.zhihu.com/p/422931694