原文:http://blog.csdn.net/changong28/article/details/39325079
使用Kafka的同學(xué)都知道,我們每次創(chuàng)建Kafka主題(Topic)的時(shí)候可以指定分區(qū)數(shù)和副本數(shù)等信息,如果將這些屬性配置到server.properties文件中,以后調(diào)用Java API生成的主題將使用默認(rèn)值,先改變需要使用命令bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000顯示的修改,我們也希望將此過程在Producer調(diào)用之前通過API的方式進(jìn)行設(shè)定,無需在之前或之后使用腳本進(jìn)行操作,所以才了這篇文章。查看源碼發(fā)現(xiàn),其實(shí)內(nèi)部所有的實(shí)現(xiàn)都是通過TopicCommand的main方法,在此記錄兩種方式:
1、創(chuàng)建主題(Topic)
【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
【JAVA API方式】:
2、查看所有主題
【命令方式】:bin/kafka-topics.sh --list --zookeeper localhost:2181
【JAVA API方式】:
3、查看指定主題:
【命令方式】:bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
【JAVA API方式】:
4、修改主題:
【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
【JAVA API方式】:
5、刪除出題:
【命令方式】:無
【JAVA API方式】:
另:下文kafka刪除topic的方法(出自 “菜光光的博客” 博客,出處http://caiguangguang.blog.51cto.com/1652935/1548069)
0.8的官方文檔提供了一個(gè)刪除topic的命令:
kafka-topics.sh --delete 但是在運(yùn)行時(shí)會(huì)報(bào)錯(cuò)找不到這個(gè)方法。
kafka-topics.sh最終是運(yùn)行了kafka.admin.TopicCommand這個(gè)類,在0.8的源碼中這個(gè)類中沒有找到有delete topic相關(guān)的代碼。
在kafka的admin包下,提供了一個(gè)DeleteTopicCommand的類,可以實(shí)現(xiàn)刪除topic的功能。
kafka.admin.DeleteTopicCommand
其中刪除topic的具體實(shí)現(xiàn)代碼如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | import org.I0Itec.zkclient.ZkClient import kafka.utils.{Utils, ZKStringSerializer, ZkUtils} ....... val topic = options.valueOf(topicOpt) val zkConnect = options.valueOf(zkConnectOpt) var zkClient: ZkClient = null try { zkClient = new ZkClient(zkConnect, 30000 , 30000 , ZKStringSerializer) zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) //其實(shí)最終還是通過刪除zk里面對(duì)應(yīng)的路徑來實(shí)現(xiàn)刪除topic的功能 println( "deletion succeeded!" ) } catch { case e: Throwable => println( "delection failed because of " + e.getMessage) println(Utils.stackTrace(e)) } finally { if (zkClient != null ) zkClient.close() } |
因?yàn)檫@個(gè)命令只會(huì)刪除zk里面的信息,真實(shí)的數(shù)據(jù)還是沒有刪除,所以需要登錄各個(gè)broker,把對(duì)應(yīng)的topic的分區(qū)數(shù)據(jù)目錄刪除,也可能正因?yàn)檫@一點(diǎn),delete命令才沒有集成到kafka.admin.TopicCommand這個(gè)類。
聯(lián)系客服