我們往已經(jīng)部署好的Kafka集群里面添加機(jī)器是最正常不過的需求,而且添加起來非常地方便,我們需要做的事是從已經(jīng)部署好的Kafka節(jié)點(diǎn)中復(fù)制相應(yīng)的配置文件,然后把里面的broker id修改成全局唯一的,最后啟動(dòng)這個(gè)節(jié)點(diǎn)即可將它加入到現(xiàn)有Kafka集群中。
但是問題來了,新添加的Kafka節(jié)點(diǎn)并不會(huì)自動(dòng)地分配數(shù)據(jù),所以無法分擔(dān)集群的負(fù)載,除非我們新建一個(gè)topic。但是現(xiàn)在我們想手動(dòng)將部分分區(qū)移到新添加的Kafka節(jié)點(diǎn)上,Kafka內(nèi)部提供了相關(guān)的工具來重新分布某個(gè)topic的分區(qū)。在重新分布topic分區(qū)之前,我們先來看看現(xiàn)在topic的各個(gè)分區(qū)的分布位置:
. /bin/kafka-topics .sh --topic iteblog --describe --zookeeper www.iteblog.com:2181 Topic:iteblog PartitionCount:7 ReplicationFactor:2 Configs: Topic: iteblog Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: iteblog Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: iteblog Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4 Topic: iteblog Partition: 3 Leader: 4 Replicas: 4,1 Isr: 4,1 Topic: iteblog Partition: 4 Leader: 1 Replicas: 1,3 Isr: 1,3 Topic: iteblog Partition: 5 Leader: 2 Replicas: 2,4 Isr: 2,4 Topic: iteblog Partition: 6 Leader: 3 Replicas: 3,1 Isr: 3,1 |
從上面的輸出可以看出,iteblog主題一共有7個(gè)分區(qū),但是我們broker的個(gè)數(shù)只有4個(gè),所以會(huì)導(dǎo)致某些broker維護(hù)更多的分區(qū)?,F(xiàn)在我們在現(xiàn)有集群的基礎(chǔ)上再添加一個(gè)Kafka節(jié)點(diǎn),然后使用Kafka自帶的kafka-reassign-partitions.sh
工具來重新分布分區(qū)。該工具有三種使用模式:
1、generate模式,給定需要重新分配的Topic,自動(dòng)生成reassign plan(并不執(zhí)行)
2、execute模式,根據(jù)指定的reassign plan重新分配Partition
3、verify模式,驗(yàn)證重新分配Partition是否成功
現(xiàn)在我們需要將原先分布在broker 1-4節(jié)點(diǎn)上的分區(qū)重新分布到broker 1-5節(jié)點(diǎn)上,借助kafka-reassign-partitions.sh工具生成reassign plan,不過我們先得按照要求定義一個(gè)文件,里面說明哪些topic需要重新分區(qū),文件內(nèi)容如下:
[iteblog@www.iteblog.com ~]$ cat topics-to-move.json { "topics" : [{ "topic" : "iteblog" }], "version" :1 } |
然后使用kafka-reassign-partitions.sh
工具生成reassign plan
[iteblog@www.iteblog.com ~]$ bin/kafka-reassign-partitions.sh --zookeeper www.iteblog.com:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2,3,4,5" --generateCurrent partition replica assignment{"version":1,"partitions":[{"topic":"iteblog","partition":3,"replicas":[4,1]},{"topic":"iteblog","partition":5,"replicas":[2,4]},{"topic":"iteblog","partition":4,"replicas":[1,3]},{"topic":"iteblog","partition":0,"replicas":[1,2]},{"topic":"iteblog","partition":6,"replicas":[3,1]},{"topic":"iteblog","partition":1,"replicas":[2,3]},{"topic":"iteblog","partition":2,"replicas":[3,4]}]}Proposed partition reassignment configuration{"version":1,"partitions":[{"topic":"iteblog","partition":3,"replicas":[3,5]},{"topic":"iteblog","partition":5,"replicas":[5,3]},{"topic":"iteblog","partition":4,"replicas":[4,1]},{"topic":"iteblog","partition":0,"replicas":[5,2]},{"topic":"iteblog","partition":6,"replicas":[1,4]},{"topic":"iteblog","partition":1,"replicas":[1,3]},{"topic":"iteblog","partition":2,"replicas":[2,4]}]}
Proposed partition reassignment configuration下面生成的就是將分區(qū)重新分布到broker 1-5上的結(jié)果。我們將這些內(nèi)容保存到名為result.json文件里面(文件名不重要,文件格式也不一定要以json為結(jié)尾,只要保證內(nèi)容是json即可),然后執(zhí)行這些reassign plan:
[iteblog@www.iteblog.com ~]$ bin/kafka-reassign-partitions.sh --zookeeper www.iteblog.com:2181 --reassignment-json-file result.json --executeCurrent partition replica assignment{"version":1,"partitions":[{"topic":"iteblog","partition":3,"replicas":[4,1]},{"topic":"iteblog","partition":5,"replicas":[2,4]},{"topic":"iteblog","partition":4,"replicas":[1,3]},{"topic":"iteblog","partition":0,"replicas":[1,2]},{"topic":"iteblog","partition":6,"replicas":[3,1]},{"topic":"iteblog","partition":1,"replicas":[2,3]},{"topic":"iteblog","partition":2,"replicas":[3,4]}]}Save this to use as the --reassignment-json-file option during rollbackSuccessfully started reassignment of partitions {"version":1,"partitions":[{"topic":"iteblog","partition":1,"replicas":[1,3]},{"topic":"iteblog","partition":5,"replicas":[5,3]},{"topic":"iteblog","partition":4,"replicas":[4,1]},{"topic":"iteblog","partition":6,"replicas":[1,4]},{"topic":"iteblog","partition":2,"replicas":[2,4]},{"topic":"iteblog","partition":0,"replicas":[5,2]},{"topic":"iteblog","partition":3,"replicas":[3,5]}]}
這樣Kafka就在執(zhí)行reassign plan,我們可以校驗(yàn)reassign plan是否執(zhí)行完成:
[iteblog@www.iteblog.com ~]$ bin/kafka-reassign-partitions.sh --zookeeper www.iteblog.com:2181 --reassignment-json-file result.json --verifyStatus of partition reassignment:Reassignment of partition [iteblog,1] completed successfullyReassignment of partition [iteblog,5] is still in progressReassignment of partition [iteblog,4] completed successfullyReassignment of partition [iteblog,6] completed successfullyReassignment of partition [iteblog,2] completed successfullyReassignment of partition [iteblog,0] is still in progressReassignment of partition [iteblog,3] completed successfully[iteblog@www.iteblog.com ~]$ bin/kafka-reassign-partitions.sh --zookeeper www.iteblog.com:2181 --reassignment-json-file result.json --verifyStatus of partition reassignment:Reassignment of partition [iteblog,1] completed successfullyReassignment of partition [iteblog,5] completed successfullyReassignment of partition [iteblog,4] completed successfullyReassignment of partition [iteblog,6] completed successfullyReassignment of partition [iteblog,2] completed successfullyReassignment of partition [iteblog,0] completed successfullyReassignment of partition [iteblog,3] completed successfully
可以看出,分區(qū)正在Reassignment的狀態(tài)是still in progress;如果分區(qū)Reassignment完成則completed successfully,然后我們就可以看到分區(qū)已經(jīng)按照生成的reassign plan進(jìn)行,我們可以看下topic各個(gè)分區(qū)現(xiàn)在的分布情況:
[iteblog@www.iteblog.com ~]$ ./bin/kafka-topics.sh --topic iteblog --describe --zookeeper www.iteblog.com:2181Topic:iteblog PartitionCount:7 ReplicationFactor:2 Configs: Topic: iteblog Partition: 0 Leader: 5 Replicas: 5,2 Isr: 2,5 Topic: iteblog Partition: 1 Leader: 1 Replicas: 1,3 Isr: 3,1 Topic: iteblog Partition: 2 Leader: 2 Replicas: 2,4 Isr: 4,2 Topic: iteblog Partition: 3 Leader: 3 Replicas: 3,5 Isr: 3,5 Topic: iteblog Partition: 4 Leader: 1 Replicas: 4,1 Isr: 1,4 Topic: iteblog Partition: 5 Leader: 5 Replicas: 5,3 Isr: 3,5 Topic: iteblog Partition: 6 Leader: 1 Replicas: 1,4 Isr: 1,4
分區(qū)的分布的確和操作之前不一樣了,broker 5上已經(jīng)有分區(qū)分布上去了。但是仔細(xì)的同學(xué)應(yīng)該可以發(fā)現(xiàn),broker 4上居然沒有分區(qū)的Leader,這肯定不是我們想要的!所以使用kafka-reassign-partitions.sh
工具生成的reassign plan只是一個(gè)建議,方便大家而已。其實(shí)我們自己完全可以編輯一個(gè)reassign plan,然后執(zhí)行它,如下:
{ "version" : 1, "partitions" : [ { "topic" : "iteblog" , "partition" : 0, "replicas" : [ 1, 2 ] }, { "topic" : "iteblog" , "partition" : 1, "replicas" : [ 2, 3 ] }, { "topic" : "iteblog" , "partition" : 2, "replicas" : [ 3, 4 ] }, { "topic" : "iteblog" , "partition" : 3, "replicas" : [ 4, 5 ] }, { "topic" : "iteblog" , "partition" : 4, "replicas" : [ 5, 1 ] }, { "topic" : "iteblog" , "partition" : 5, "replicas" : [ 1, 3 ] }, { "topic" : "iteblog" , "partition" : 6, "replicas" : [ 2, 4 ] } ] } |
將上面的json數(shù)據(jù)文件保存到result.json文件中,然后也是執(zhí)行它:
[iteblog@www.iteblog.com ~]$ bin/kafka-reassign-partitions.sh --zookeeper www.iteblog.com:2181 --reassignment-json-file result.json --execute
等這個(gè)reassign plan執(zhí)行完,我們再來看看分區(qū)的分布:
[iteblog@www.iteblog.com ~]$ ./bin/kafka-topics.sh --topic iteblog --describe --zookeeper www.iteblog.com:2181Topic:iteblog PartitionCount:7 ReplicationFactor:2 Configs: Topic: iteblog Partition: 0 Leader: 1 Replicas: 1,2 Isr: 2,1 Topic: iteblog Partition: 1 Leader: 2 Replicas: 2,3 Isr: 3,2 Topic: iteblog Partition: 2 Leader: 3 Replicas: 3,4 Isr: 4,3 Topic: iteblog Partition: 3 Leader: 4 Replicas: 4,5 Isr: 5,4 Topic: iteblog Partition: 4 Leader: 5 Replicas: 5,1 Isr: 1,5 Topic: iteblog Partition: 5 Leader: 1 Replicas: 1,3 Isr: 3,1 Topic: iteblog Partition: 6 Leader: 2 Replicas: 2,4 Isr: 4,2
聯(lián)系客服