- 將現(xiàn)有的集群上任一個(gè)服務(wù)器上的kafka目錄拷貝到新的服務(wù)器上
- 修改config/server.properties中的broker.id、log.dirs、listeners
- 創(chuàng)建logs.dirs指定的目錄,并設(shè)定讀寫(xiě)權(quán)限(chomd -R 777 XXX)
broker.id=3log.dirs=kafka-logslisteners=PLAINTEXT://172.16.49.174:9092
bin/kafka-server-start.sh config/server.properties &
雖然經(jīng)過(guò)上面兩個(gè)步驟后已經(jīng)完成了集群的擴(kuò)容;但是集群上原有的topic的數(shù)據(jù)不會(huì)自動(dòng)遷移到新的broker上??梢栽谛碌腷roker所在的服務(wù)器上通過(guò)
ls /home/lxh/kafka_2.11-0.10.0.0/kafka-logs
查看到并沒(méi)有一原有的topic名稱(chēng)的文件目錄(因?yàn)閯?chuàng)建topic后會(huì)在config/server.properties中的配置的log.dirs 目錄中生產(chǎn)以topic名稱(chēng)+分區(qū)編號(hào)的文件目錄);那么就需要手動(dòng)的區(qū)遷移數(shù)據(jù)
創(chuàng)建編輯要遷移的topic的 json文件
vi topic-to-move.json
比如要將topic名稱(chēng)為test和paritioer_test的數(shù)據(jù)重新平衡到集群中,就可以新增以下內(nèi)容
{"topics": [{"topic": "test"}, {"topic": "paritioer_test"}], "version":1}
生成遷移分配規(guī)則json文件
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topic-to-move.json --broker-list "0,1,2" --generate
得到的結(jié)果為
Current partition replica assignment {"version":1,"partitions":[{"topic":"test","partition":4,"replicas":[0,1]},{"topic":"test","partition":1,"replicas":[1,0]},{"topic":"paritioer_test","partition":0,"replicas":[0]},{"topic":"test","partition":2,"replicas":[0,1]},{"topic":"test","partition":0,"replicas":[0,1]},{"topic":"test","partition":3,"replicas":[1,0]}]}Proposed partition reassignment configuration{"version":1,"partitions":[{"topic":"test","partition":4,"replicas":[1,0]},{"topic":"test","partition":1,"replicas":[1,2]},{"topic":"test","partition":2,"replicas":[2,0]},{"topic":"paritioer_test","partition":0,"replicas":[0]},{"topic":"test","partition":0,"replicas":[0,1]},{"topic":"test","partition":3,"replicas":[0,2]}]}
其中的Current partition replica assignment指的是遷移前的partition replica;Proposed partition reassignment configuration 指的就是遷移分配規(guī)則json。需要將該json文件保存到j(luò)son文件中(如expand-cluster-reassignment.json)
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
注意:在遷移過(guò)程中不能人為的結(jié)束或停止kafka服務(wù),不然會(huì)有數(shù)據(jù)不一致的問(wèn)題
在執(zhí)行的過(guò)程中,可以新開(kāi)一個(gè)終端執(zhí)行以下命令來(lái)查看執(zhí)行是否正確完成
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
輸出
Status of partition reassignment:Reassignment of partition [test,4] completed successfullyReassignment of partition [test,0] completed successfullyReassignment of partition [paritioer_test,0] completed successfullyReassignment of partition [test,3] completed successfullyReassignment of partition [test,2] completed successfullyReassignment of partition [test,1] completed successfully
在遷移完成過(guò)程后,可以使用以下命令看下topic的每個(gè)partitions的分布情況
bin/kafka-topics.sh --zookeeper 172.16.49.173:2181 --describe --topic test
Topic:test PartitionCount:5 ReplicationFactor:2 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: test Partition: 2 Leader: 2 Replicas: 2,0 Isr: 0,2 Topic: test Partition: 3 Leader: 0 Replicas: 0,2 Isr: 0,2 Topic: test Partition: 4 Leader: 0 Replicas: 1,0 Isr: 0,1
可以看到名為test的topic的有的數(shù)據(jù)以及存在于編號(hào)為2的新broker上了
聯(lián)系客服