展會信息港展會大全

開箱即用的解決方案分享:亞馬遜云科技海外服務(wù)器如何簡單實(shí)現(xiàn)數(shù)據(jù)備份與還原
來源:互聯(lián)網(wǎng)   發(fā)布日期:2023-12-29 10:19:34   瀏覽:17928次  

導(dǎo)讀:亞馬遜云科技Amazon MSK是Amazon云平臺提供的托管Kafka服務(wù)。在系統(tǒng)升級或遷移時,用戶常常需要將一個Amazon MSK集群中的數(shù)據(jù)導(dǎo)出(備份),然后在新集群或另一個集群中再將數(shù)據(jù)導(dǎo)入(還原)。通常,Kafka集群間的數(shù)據(jù)復(fù)制和同步多采用Kafka MirrorMaker,但...

亞馬遜云科技Amazon MSK是Amazon云平臺提供的托管Kafka服務(wù)。在系統(tǒng)升級或遷移時,用戶常常需要將一個Amazon MSK集群中的數(shù)據(jù)導(dǎo)出(備份),然后在新集群或另一個集群中再將數(shù)據(jù)導(dǎo)入(還原)。通常,Kafka集群間的數(shù)據(jù)復(fù)制和同步多采用Kafka MirrorMaker,但是,在某些場景中,受環(huán)境限制,兩個于Kafka集群之間的網(wǎng)絡(luò)可能無法連通,或者兩個亞馬遜云科技賬號相互隔離,亦或是需要將Kafka的數(shù)據(jù)沉淀為文件存儲以備他用。此時,基于Kafka Connect S3 Source/Sink Connector的方案會是一種較為合適的選擇,本文就將介紹一下這一方案的具體實(shí)現(xiàn)。

數(shù)據(jù)的導(dǎo)出、導(dǎo)入、備份、還原通常都是一次性操作,為此搭建完備持久的基礎(chǔ)設(shè)施并無太大必要,省時省力,簡單便捷才是優(yōu)先的考量因素。為此,本文將提供一套開箱即用的解決方案,方案使用Docker搭建Kafka Connect,所有操作均配備自動化Shell腳本,用戶只需設(shè)置一些環(huán)境變量并執(zhí)行相應(yīng)腳本即可完成全部工作。這種基于Docker的單體模式可以應(yīng)對中小型規(guī)模的數(shù)據(jù)同步和遷移,如果要尋求穩(wěn)定、健壯的解決方案,可以考慮將Docker版本的Kafka Connect遷移到Kubernetes或Amazon MSK Connect,實(shí)現(xiàn)集群化部署。

整體架構(gòu)

首先介紹一下方案的整體架構(gòu)。導(dǎo)出/導(dǎo)入和備份/還原其實(shí)是兩種高度類似的場景,但為了描述清晰,我們還是分開討論。先看一下導(dǎo)出/導(dǎo)入的架構(gòu)示意圖:

在這個架構(gòu)中,Source端的MSK是數(shù)據(jù)流的起點(diǎn),安裝了S3 Sink Connector的Kafka Connect會從Source端的MSK中提取指定Topic的數(shù)據(jù),然后以Json或Avro文件的形式存儲到S3上;同時,另一個安裝了S3 Source Connector的Kafka Connect會從S3上讀取這些Json或Avro文件,然后寫入到Sink端MSK的對應(yīng)Topic中。如果Source端和Sink端的MSK集群不在同一個Region,可以在各自的Region分別完成導(dǎo)入和導(dǎo)出,然后在兩個Region之間使用S3的Cross-Rejion Replication進(jìn)行數(shù)據(jù)同步。

該架構(gòu)只需進(jìn)行簡單的調(diào)整,即可用于MSK集群的備份/還原,如下圖所示:先將MSK集群的數(shù)據(jù)備份到S3上,待完成集群的升級、遷移或重建工作后,再從S3上將數(shù)據(jù)恢復(fù)到新建集群即可。

預(yù)設(shè)條件

本文聚焦于Kafka Connect的數(shù)據(jù)導(dǎo)出/導(dǎo)入和備份/還原操作,需要提前準(zhǔn)備:

一臺基于Amazon Linux2的EC2實(shí)例(建議新建純凈實(shí)例),本文所有的實(shí)操腳本都將在該實(shí)例上執(zhí)行,該實(shí)例也是運(yùn)行Kafka Connect Docker Container的宿主機(jī)。

兩個MSK集群,一個作為Source,一個作為Sink;如果只有一個MSK集群也可完成驗證,該集群將既作Source又作Sink。

為聚焦Kafka Connect S3 Source/Sink Connector的核心配置,預(yù)設(shè)MSK集群沒有開啟身份認(rèn)證(即認(rèn)證類型為Unauthenticated),數(shù)據(jù)傳輸方式為PLAINTEXT,以便簡化Kafka Connect的連接配置。

網(wǎng)絡(luò)連通性上要求EC2實(shí)例能訪問S3、Source端MSK集群、Sink端MSK集群。如果在實(shí)際環(huán)境中無法同時連通Source端和Sink端,則可以在兩臺分屬于不同網(wǎng)絡(luò)的EC2上進(jìn)行操作,但它們必須都能訪問S3。如果是跨Region或賬號隔離,則另需配置S3 Cross-Region Replication或手動拷貝數(shù)據(jù)文件。

全局配置

由于實(shí)際操作將不可避免地依賴到具體的亞馬遜云科技賬號以及本地環(huán)境里的各項信息(如AKSK,服務(wù)地址,各類路徑,Topic名稱等),為了保證本文給出的操作腳本具有良好的可移植性,將所有與環(huán)境相關(guān)的信息抽離出來,以全局變量的形式在實(shí)操前集中配置。以下就是全局變量的配置腳本,讀者需要根據(jù)個人環(huán)境設(shè)定這些變量的取值:

為了便于演示和解讀,本文將使用下面的全局配置,其中前6項配置與賬號和環(huán)境強(qiáng)相關(guān),仍需用戶自行修改,腳本中給出的僅為示意值,而后5項配置與MSK數(shù)據(jù)的導(dǎo)入導(dǎo)出息息相關(guān),不建議修改,因為后續(xù)的解讀將基于這里設(shè)定的值展開,待完成驗證后,您可再根據(jù)需要靈活修改后5項配置以完成實(shí)際的導(dǎo)入導(dǎo)出工作。

回到操作流程,登錄準(zhǔn)備好的EC2實(shí)例,修改下面腳本中與賬號和環(huán)境相關(guān)的前6項配置,然后執(zhí)行修改后的腳本。此外,需要提醒注意的是:在后續(xù)操作中,部分腳本執(zhí)行后將不再返回,而是持續(xù)占用當(dāng)前窗口輸出日志或Kafka消息,因此需要新開命令行窗口,每次新開窗口都需要執(zhí)行一次這里的全局配置腳本。

關(guān)于上述腳本中的后5項配置,有如下詳細(xì)說明:

我們就以腳本中設(shè)定的值為例,解讀一下這5項配置聯(lián)合起來將要實(shí)現(xiàn)的功能,同時也是本文將演示的主要內(nèi)容:

在Source端的MSK集群上存在兩個名為source-topic-1和source-topic-2的Topic,通過安裝有S3 Sink Connector的Kafka Connect(Docker容器)將兩個Topic的數(shù)據(jù)導(dǎo)出到S3的指定存儲桶中,然后再通過安裝有S3 Source Connector的Kafka Connect(Docker容器,可以和S3 Source Connector共存為一個Docker容器)將S3存儲桶中的數(shù)據(jù)寫入到Sink端的MSK集群上,其中原source-topic-1的數(shù)據(jù)將被寫入sink-topic-1,原source-topic-2的數(shù)據(jù)將被寫入sink-topic-2。

特別地,如果是備份/還原場景,需要保持導(dǎo)出/導(dǎo)入的Topic名稱一致,此時,可直接刪除S3 Source Connector中以transforms開頭的4項配置(將在下文中出現(xiàn)),或者將下面兩項改為:

如果只有一個MSK集群,同樣可以完成本文的驗證工作,只需將SOURCE_KAFKA_BOOTSTRAP_SEVERS和SINK_KAFKA_BOOTSTRAP_SEVERS同時設(shè)置為該集群即可,這樣,該集群既是Source端又是Sink端,由于配置中的Source Topics和Sink Topics并不同名,所以不會產(chǎn)生沖突。

環(huán)境準(zhǔn)備

安裝工具包

在EC2上執(zhí)行以下腳本,安裝并配置jq,yq,docker,jdk,kafka-console-client五個必須的軟件包,可以根據(jù)自身EC2的情況酌情選擇安裝全部或部分軟件。建議使用純凈的EC2實(shí)例,完成全部的軟件安裝:

創(chuàng)建S3存儲桶

整個方案以S3作為數(shù)據(jù)轉(zhuǎn)儲媒介,為此需要在S3上創(chuàng)建一個存儲桶。Source端MSK集群的數(shù)據(jù)將會導(dǎo)出到該桶中并以Json文件形式保存,向Sink端MSK集群導(dǎo)入數(shù)據(jù)時,讀取的也是存儲在該桶中的Json文件。

在源MSK上創(chuàng)建Source Topics

為了確保Topics數(shù)據(jù)能完整備份和還原,S3 Source Connector建議Sink Topics的分區(qū)數(shù)最好與Source Topics保持一致,如果讓MSK自動創(chuàng)建Topic,則很有可能會導(dǎo)致Source Topics和Sink Topics的分區(qū)數(shù)不對等,所以,選擇手動創(chuàng)建Source Topics和Sink Topics,并確保它們的分區(qū)數(shù)一致。以下腳本將創(chuàng)建source-topic-1和source-topic-2兩個Topic,各含9個分區(qū):

在目標(biāo)MSK上創(chuàng)建Sink Topics

原因同上,以下腳本將創(chuàng)建:sink-topic-1和sink-topic-2兩個Topic,各含9個分區(qū):

制作Kafka Connect鏡像

接下來是制作帶S3 Sink Connector和S3 Source Connector的Kafka Connect鏡像,鏡像和容器均以kafka-s3-syncer命名,以下是具體操作:

 

配置并啟動Kafka Connect

鏡像制作完成后,就可以啟動了Kafka Connect了。Kafka Connect有很多配置項,需要提醒注意的是:在下面的配置中,使用的是Kafka Connect內(nèi)置的消息轉(zhuǎn)換器:JsonConverter,如果你的輸入/輸出格式是Avro或Parquet,則需要另行安裝對應(yīng)插件并設(shè)置正確的Converter Class。

上述腳本執(zhí)行后,命令窗口將不再返回,而是會持續(xù)輸出容器日志,因此下一步操作需要新開一個命令行窗口。

配置并啟動S3 Sink Connector

在第5節(jié)的操作中,已經(jīng)將S3 Sink Connector安裝到了Kafka Connect的Docker鏡像中,但是還需要顯式地配置并啟動它。新開一個命令行窗口,先執(zhí)行一遍《實(shí)操步驟(1):全局配置》,聲明全局變量,然后執(zhí)行以下腳本:

 

配置并啟動S3 Source Connector

同上,在第5節(jié)的操作中,已經(jīng)將S3 Source Connector安裝到了Kafka Connect的Docker鏡像中,同樣需要顯式地配置并啟動它:

至此,整個環(huán)境搭建完畢,一個以S3作為中轉(zhuǎn)媒介的MSK數(shù)據(jù)導(dǎo)出、導(dǎo)入、備份、還原鏈路已經(jīng)處于運(yùn)行狀態(tài)。

測試

現(xiàn)在,來驗證一下整個鏈路是否能正常工作。首先,使用kafka-console-consumer.sh監(jiān)控source-topic-1和sink-topic-1兩個Topic,然后使用腳本向source-topic-1持續(xù)寫入數(shù)據(jù),如果在sink-topic-1看到了相同的數(shù)據(jù)輸出,就說明數(shù)據(jù)成功地從source-topic-1導(dǎo)出然后又導(dǎo)入到了sink-topic-1中,相應(yīng)的,在S3存儲桶中也能看到“沉淀”的數(shù)據(jù)文件。

打開Source Topic

新開一個命令行窗口,先執(zhí)行一遍《實(shí)操步驟(1):全局配置》,聲明全局變量,然后使用如下命令持續(xù)監(jiān)控source-topic-1中的數(shù)據(jù):

打開Sink Topic

新開一個命令行窗口,先執(zhí)行一遍《實(shí)操步驟(1):全局配置》,聲明全局變量,然后使用如下命令持續(xù)監(jiān)控sink-topic-1中的數(shù)據(jù):

向Source Topic寫入數(shù)據(jù)

新開一個命令行窗口,先執(zhí)行一遍《實(shí)操步驟(1:全局配置》,聲明全局變量,然后使用如下命令向source-topic-1中寫入數(shù)據(jù):

現(xiàn)象與結(jié)論

執(zhí)行上述寫入操作后,從監(jiān)控source-topic-1的命令行窗口中可以很快看到寫入的數(shù)據(jù),這說明Source端MSK已經(jīng)開始持續(xù)產(chǎn)生數(shù)據(jù)了,隨后(約1分鐘),即可在監(jiān)控sink-topic-1的命令行窗口中看到相同的輸出數(shù)據(jù),這說明目標(biāo)端的數(shù)據(jù)同步也已開始正常工作。此時,打開S3的存儲桶會發(fā)現(xiàn)大量Json文件,這些Json是由S3 Sink Connector從source-topic-1導(dǎo)出并存放到S3上的,然后S3 Source Connector又讀取了這些Json并寫入到了sink-topic-1中,至此,整個方案的演示與驗證工作全部結(jié)束。

清理

在驗證過程中,可能需要多次調(diào)整并重試,每次重試最好恢復(fù)到初始狀態(tài),以下腳本會幫助清理所有已創(chuàng)建的資源:

小結(jié)

本方案主要定位于輕便易用,在S3 Sink Connector和S3 Source Connector中還有很多與性能、吞吐量相關(guān)的配置,例如:s3.part.size, flush.size, s3.poll.interval.ms, tasks.max等,可以在實(shí)際需要自行調(diào)整,此外,Kafka Connect也可以方便地遷移到Kubernetes或Amazon MSK Connect中以實(shí)現(xiàn)集群化部署。

贊助本站

人工智能實(shí)驗室
相關(guān)內(nèi)容
AiLab云推薦
推薦內(nèi)容
展開

熱門欄目HotCates

Copyright © 2010-2024 AiLab Team. 人工智能實(shí)驗室 版權(quán)所有    關(guān)于我們 | 聯(lián)系我們 | 廣告服務(wù) | 公司動態(tài) | 免責(zé)聲明 | 隱私條款 | 工作機(jī)會 | 展會港