国产一级a片免费看高清,亚洲熟女中文字幕在线视频,黄三级高清在线播放,免费黄色视频在线看

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
Kafka+Log4j實現(xiàn)日志集中管理

引言

前段時間寫的《Spring+Log4j+ActiveMQ實現(xiàn)遠程記錄日志——實戰(zhàn)+分析》得到了許多同學(xué)的認可,在認可的同時,也有同學(xué)提出可以使用Kafka來集中管理日志,于是今天就來學(xué)習(xí)一下。

特別說明,由于網(wǎng)絡(luò)上關(guān)于Kafka+Log4j的完整例子并不多,我也是一邊學(xué)習(xí)一邊使用,因此如果有解釋得不好或者錯誤的地方,歡迎批評指正,如果你有好的想法,也歡迎留言探討。

第一部分 搭建Kafka環(huán)境

安裝Kafka

下載:http://kafka.apache.org/downloads.html

tar zxf kafka-<version>.tgzcd kafka-<VERSION>

啟動Zookeeper

啟動Zookeeper前需要配置一下config/zookeeper.properties:

接下來啟動Zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

啟動Kafka Server

啟動Kafka Server前需要配置一下config/server.properties。主要配置以下幾項,內(nèi)容就不說了,注釋里都很詳細:

然后啟動Kafka Server

bin/kafka-server-start.sh config/server.properties

 創(chuàng)建Topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看創(chuàng)建的Topic

>bin/kafka-topics.sh --list --zookeeper localhost:2181

啟動控制臺Producer,向Kafka發(fā)送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testThis is a messageThis is another message^C

啟動控制臺Consumer,消費剛剛發(fā)送的消息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginningThis is a messageThis is another message

刪除Topic

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

注:只有當(dāng)delete.topic.enable=true時,該操作才有效

配置Kafka集群(單臺機器上)

首先拷貝server.properties文件為多份(這里演示4個節(jié)點Kafka集群,因此還需要拷貝3配置文件):

cp config/server.properties config/server1.propertiescp config/server.properties config/server2.propertiescp config/server.properties config/server3.properties

修改server1.properties的以下內(nèi)容:

broker.id=1port=9093log.dir=/tmp/kafka-logs-1

同理修改server2.propertiesserver3.properties的這些內(nèi)容,并保持所有配置文件的zookeeper.connect屬性都指向運行在本機的zookeeper地址localhost:2181。注意,由于這幾個Kafka節(jié)點都將運行在同一臺機器上,因此需要保證這幾個值不同,這里以累加的方式處理。例如在server2.properties上:

broker.id=2port=9094log.dir=/tmp/kafka-logs-2

server3.properties也配置好以后,依次啟動這些節(jié)點:

bin/kafka-server-start.sh config/server1.properties &bin/kafka-server-start.sh config/server2.properties &bin/kafka-server-start.sh config/server3.properties &

Topic & Partition

Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把這條消息放進哪個queue里。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個Partition,每個Partition在物理上對應(yīng)一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件

現(xiàn)在在Kafka集群上創(chuàng)建備份因子為3,分區(qū)數(shù)為4Topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic kafka

說明:備份因子replication-factor越大,則說明集群容錯性越強,就是當(dāng)集群down掉后,數(shù)據(jù)恢復(fù)的可能性越大。所有的分區(qū)數(shù)里的內(nèi)容共同組成了一份數(shù)據(jù),分區(qū)數(shù)partions越大,則該topic的消息就越分散,集群中的消息分布就越均勻。

然后使用kafka-topics.sh--describe參數(shù)查看一下Topickafka的詳情:


輸出的第一行是所有分區(qū)的概要,接下來的每一行是一個分區(qū)的描述??梢钥吹?/span>Topickafka的消息,PartionCount=4,ReplicationFactor=3正是我們創(chuàng)建時指定的分區(qū)數(shù)和備份因子。

另外:Leader是指負責(zé)這個分區(qū)所有讀寫的節(jié)點;Replicas是指這個分區(qū)所在的所有節(jié)點(不論它是否活著);ISRReplicas的子集,代表存有這個分區(qū)信息而且當(dāng)前活著的節(jié)點。

partition:0這個分區(qū)來說,該分區(qū)的Leaderserver0,分布在id01,2這三個節(jié)點上,而且這三個節(jié)點都活著。

再來看下Kafka集群的日志:


其中kafka-logs-0代表server0的日志,kafka-logs-1代表server1的日志,以此類推。

從上面的配置可知,id0,12,3的節(jié)點分別對應(yīng)server0, server1, server2, server3。而上例中的partition:0分布在id0, 1, 2這三個節(jié)點上,因此可以在server0, server1, server2這三個節(jié)點上看到有kafka-0這個文件夾。這個kafka-0就代表Topickafkapartion0。

第二部分 Kafka+Log4j項目整合

先來看下Maven項目結(jié)構(gòu)圖:


作為Demo,文件不多。先看看pom.xml引入了哪些jar包:

<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka_2.9.2</artifactId>    <version>0.8.2.1</version></dependency><dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>0.8.2.1</version></dependency><dependency>    <groupId>com.Google.guava</groupId>    <artifactId>guava</artifactId>    <version>18.0</version></dependency>

重要的內(nèi)容是log4j.properties:

log4j.rootLogger=INFO,console# for package com.demo.kafka, log would be sent to kafka appender.log4j.logger.com.demo.kafka=DEBUG,kafka# appender kafkalog4j.appender.kafka=kafka.producer.KafkaLog4jAppenderlog4j.appender.kafka.topic=kafka# multiple brokers are separated by comma ",".log4j.appender.kafka.brokerList=localhost:9092, localhost:9093, localhost:9094, localhost:9095log4j.appender.kafka.compressionType=nonelog4j.appender.kafka.syncSend=truelog4j.appender.kafka.layout=org.apache.log4j.PatternLayoutlog4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n # appender consolelog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.target=system.outlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n

App.Java里面就很簡單啦,主要是通過log4j輸出日志:

package com.demo.kafka;import org.apache.log4j.Logger;public class App {    private static final Logger LOGGER = Logger.getLogger(App.class);    public static void main(String[] args) throws InterruptedException {        for (int i = 0; i < 20; i++) {            LOGGER.info("Info [" + i + "]");            Thread.sleep(1000);        }    }}

MyConsumer.java用于消費kafka中的信息:

package com.demo.kafka;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import com.google.common.collect.ImmutableMap;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.message.MessageAndMetadata;public class MyConsumer {    private static final String ZOOKEEPER = "localhost:2181";    //groupName可以隨意給,因為對于kafka里的每條消息,每個group都會完整的處理一遍    private static final String GROUP_NAME = "test_group";    private static final String TOPIC_NAME = "kafka";    private static final int CONSUMER_NUM = 4;    private static final int PARTITION_NUM = 4;    public static void main(String[] args) {        // specify some consumer properties        Properties props = new Properties();        props.put("zookeeper.connect", ZOOKEEPER);        props.put("zookeeper.connectiontimeout.ms", "1000000");        props.put("group.id", GROUP_NAME);        // Create the connection to the cluster        ConsumerConfig consumerConfig = new ConsumerConfig(props);        ConsumerConnector consumerConnector =             Consumer.createJavaConsumerConnector(consumerConfig);        // create 4 partitions of the stream for topic “test”, to allow 4        // threads to consume        Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =             consumerConnector.createMessageStreams(                ImmutableMap.of(TOPIC_NAME, PARTITION_NUM));        List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get(TOPIC_NAME);        // create list of 4 threads to consume from each of the partitions        ExecutorService executor = Executors.newFixedThreadPool(CONSUMER_NUM);        // consume the messages in the threads        for (final KafkaStream<byte[], byte[]> stream : streams) {            executor.submit(new Runnable() {                public void run() {                    for (MessageAndMetadata<byte[], byte[]> msgAndMetadata : stream) {                        // process message (msgAndMetadata.message())                        System.out.println(new String(msgAndMetadata.message()));                    }                }            });        }    }}

MyProducer.java用于向Kafka發(fā)送消息,但不通過log4jappender發(fā)送。此案例中可以不要。但是我還是放在這里:

package com.demo.kafka;import java.util.ArrayList;import java.util.List;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;public class MyProducer {    private static final String TOPIC = "kafka";    private static final String CONTENT = "This is a single message";    private static final String BROKER_LIST = "localhost:9092";    private static final String SERIALIZER_CLASS = "kafka.serializer.StringEncoder";        public static void main(String[] args) {        Properties props = new Properties();        props.put("serializer.class", SERIALIZER_CLASS);        props.put("metadata.broker.list", BROKER_LIST);                ProducerConfig config = new ProducerConfig(props);        Producer<String, String> producer = new Producer<String, String>(config);        //Send one message.        KeyedMessage<String, String> message =             new KeyedMessage<String, String>(TOPIC, CONTENT);        producer.send(message);                //Send multiple messages.        List<KeyedMessage<String,String>> messages =             new ArrayList<KeyedMessage<String, String>>();        for (int i = 0; i < 5; i++) {            messages.add(new KeyedMessage<String, String>                (TOPIC, "Multiple message at a time. " + i));        }        producer.send(messages);    }}

到這里,代碼就結(jié)束了。

第三部分 運行與驗證

先運行MyConsumer,使其處于監(jiān)聽狀態(tài)。同時,還可以啟動Kafka自帶的ConsoleConsumer來驗證是否跟MyConsumer的結(jié)果一致。最后運行App.java。

先來看看MyConsumer的輸出:

再來看看ConsoleConsumer的輸出:

可以看到,盡管發(fā)往Kafka的消息去往了不同的地方,但是內(nèi)容是一樣的,而且一條也不少。最后再來看看Kafka的日志。

我們知道,Topickafka的消息有4partion,從之前的截圖可知這4partion均勻分布在4kafka節(jié)點上,于是我對每一個partion隨機選取一個節(jié)點查看了日志內(nèi)容。

上圖中黃色選中部分依次代表在server0上查看partion0,在server1上查看partion1,以此類推。

紅色部分是日志內(nèi)容,由于在創(chuàng)建Topic時準(zhǔn)備將20條日志分成4個區(qū)存儲,可以很清楚的看到,這20條日志確實是很均勻的存儲在了幾個partion上。

摘一點Infoq上的話:每個日志文件都是一個log entrie序列,每個log entrie包含一個4字節(jié)整型數(shù)值(值為N+5),1個字節(jié)的"magic value",4個字節(jié)的CRC校驗碼,其后跟N個字節(jié)的消息體。每條消息都有一個當(dāng)前Partition下唯一的64字節(jié)的offset,它指明了這條消息的起始位置。磁盤上存儲的消息格式如下:

message length : 4 bytes (value: 1+4+n)"magic" value : 1 byte crc : 4 bytes payload : n bytes

這里我們看到的日志文件的每一行,就是一個log entrie,每一行前面無法顯示的字符(藍色選中部分),就是(message length + magic value + crc)了。而log entrie的后部分,則是消息體的內(nèi)容了


問題:如果要使用此種方式,有一種場景是提取某天或者某小時的日志,那么如何設(shè)計Topic呢?是不是要在Topic上帶入日期或者小時數(shù)?還有更好的設(shè)計方案嗎?----歡迎交流,共同進步。


樣例下載:百度網(wǎng)盤

鏈接: http://pan.baidu.com/s/1i400DZv 密碼: f25c


參考頁面:

http://kafka.apache.org/07/quickstart.html

http://kafka.apache.org/documentation.html#quickstart

http://www.infoq.com/cn/articles/kafka-analysis-part-1


本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊舉報。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
linux環(huán)境下 kafka 2.6.0 安裝及配置
三萬字 | Kafka 知識體系保姆級教程寶典
分布式消息系統(tǒng)之Kafka集群部署
kafka搭建入門(手把手教你搭建)
Kafka kafka在windows下的安裝與配置
Kafka 安裝和測試
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點擊這里聯(lián)系客服!

聯(lián)系客服