国产一级a片免费看高清,亚洲熟女中文字幕在线视频,黄三级高清在线播放,免费黄色视频在线看

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
使用Java API創(chuàng)建(create),查看(describe),列舉(list),刪除(delete)Kafka主題(Topic)

原文:http://blog.csdn.net/changong28/article/details/39325079

使用Kafka的同學(xué)都知道,我們每次創(chuàng)建Kafka主題(Topic)的時(shí)候可以指定分區(qū)數(shù)和副本數(shù)等信息,如果將這些屬性配置到server.properties文件中,以后調(diào)用Java API生成的主題將使用默認(rèn)值,先改變需要使用命令bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000顯示的修改,我們也希望將此過程在Producer調(diào)用之前通過API的方式進(jìn)行設(shè)定,無需在之前或之后使用腳本進(jìn)行操作,所以才了這篇文章。查看源碼發(fā)現(xiàn),其實(shí)內(nèi)部所有的實(shí)現(xiàn)都是通過TopicCommand的main方法,在此記錄兩種方式:

1、創(chuàng)建主題(Topic)

【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y

【JAVA API方式】:

  1. String[] options = new String[]{  
  2.     "--create",  
  3.     "--zookeeper",  
  4.     "zk_host:port/chroot",  
  5.     "--partitions",  
  6.     "20",  
  7.     "--topic",  
  8.     "my_topic_name",  
  9.     "--replication-factor",  
  10.     "3",  
  11.     "--config",  
  12.     "x=y"  
  13. };  
  14. TopicCommand.main(options);  

2、查看所有主題

 

【命令方式】:bin/kafka-topics.sh --list --zookeeper localhost:2181

【JAVA API方式】:

  1. String[] options = new String[]{  
  2.     "--list",  
  3.     "--zookeeper",  
  4.     "localhost:2181"  
  5. };  
  6. TopicCommand.main(options);  


3、查看指定主題:

 

【命令方式】:bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

【JAVA API方式】: 

  1. String[] options = new String[]{  
  2.     "--describe",  
  3.     "--zookeeper",  
  4.     "localhost:2181",  
  5.     "--topic",  
  6.     "my-replicated-topic",  
  7. };  
  8. TopicCommand.main(options);  


4、修改主題:

 

【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
【JAVA API方式】:

  1. String[] options = new String[]{  
  2.     "--alter",  
  3.     "--zookeeper",  
  4.     "zk_host:port/chroot",  
  5.     "--topic",  
  6.     "my_topic_name",  
  7.     "--deleteConfig",  
  8.     "x"  
  9. };  
  10. TopicCommand.main(options);  



5、刪除出題:

   【命令方式】:無

   【JAVA API方式】:

    1. String[] options = new String[]{  
    2.     "--zookeeper",  
    3.     "zk_host:port/chroot",  
    4.     "--topic",  
    5.     "my_topic_name"  
    6. };  
    7. DeleteTopicCommand.main(options);  

另:下文kafka刪除topic的方法(出自 “菜光光的博客” 博客,出處http://caiguangguang.blog.51cto.com/1652935/1548069)

0.8的官方文檔提供了一個(gè)刪除topic的命令:

kafka-topics.sh --delete 但是在運(yùn)行時(shí)會(huì)報(bào)錯(cuò)找不到這個(gè)方法。

kafka-topics.sh最終是運(yùn)行了kafka.admin.TopicCommand這個(gè)類,在0.8的源碼中這個(gè)類中沒有找到有delete topic相關(guān)的代碼。

在kafka的admin包下,提供了一個(gè)DeleteTopicCommand的類,可以實(shí)現(xiàn)刪除topic的功能。 

kafka.admin.DeleteTopicCommand 

其中刪除topic的具體實(shí)現(xiàn)代碼如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
.......
    val topic = options.valueOf(topicOpt)
    val zkConnect = options.valueOf(zkConnectOpt)
    var zkClient: ZkClient = null
    try {
      zkClient = new ZkClient(zkConnect, 3000030000, ZKStringSerializer)
      zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))  //其實(shí)最終還是通過刪除zk里面對(duì)應(yīng)的路徑來實(shí)現(xiàn)刪除topic的功能
      println("deletion succeeded!")
    }
    catch {
      case e: Throwable =>
        println("delection failed because of " + e.getMessage)
        println(Utils.stackTrace(e))
    }
    finally {
      if (zkClient != null)
        zkClient.close()
    }

因?yàn)檫@個(gè)命令只會(huì)刪除zk里面的信息,真實(shí)的數(shù)據(jù)還是沒有刪除,所以需要登錄各個(gè)broker,把對(duì)應(yīng)的topic的分區(qū)數(shù)據(jù)目錄刪除,也可能正因?yàn)檫@一點(diǎn),delete命令才沒有集成到kafka.admin.TopicCommand這個(gè)類。

 

本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
storm筆記:Storm+Kafka簡(jiǎn)單應(yīng)用
520瘋狂之后我徹底蒙了,老板讓我做技術(shù)選型,數(shù)據(jù)處理選kafka還是RocketMQ?
Apache Kafka:下一代分布式消息系統(tǒng)
Kafka基本原理和java簡(jiǎn)單使用教程
微服務(wù)之服務(wù)發(fā)現(xiàn)-從原理到實(shí)現(xiàn)
如何使用Curator操作zookeeper
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長(zhǎng)圖 關(guān)注 下載文章
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服