国产一级a片免费看高清,亚洲熟女中文字幕在线视频,黄三级高清在线播放,免费黄色视频在线看

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
Kafka FAQ

原文:https://cwiki.apache.org/confluence/display/KAFKA/FAQ

(翻譯中……)

Kafka FAQ1

Producers
How should I set metadata.broker.list?

The broker list provided to the producer is only used for fetching metadata. Once the metadata response is received, the producer will send produce requests to the broker hosting the corresponding topic/partition directly, using the ip/port the broker registered in ZK. Any broker can serve metadata requests. The client is responsible for making sure that at least one of the brokers in metadata.broker.list is accessible. One way to achieve this is to use a VIP in a load balancer. If brokers change in a cluster, one can just update the hosts associated with the VIP

我該怎么去設(shè)置metadata.broker.list?

broker list 提供給 producer 僅僅是用來獲取元數(shù)據(jù)。一旦成功獲取到元數(shù)據(jù),producer 會(huì)使用 broker 在zoopeeker中注冊(cè)的ip/port 將 send produce requests 直接發(fā)送至于相關(guān)topic/partition 所在broker 主機(jī)。任何broker都可以服務(wù)元數(shù)據(jù)請(qǐng)求??蛻舳说穆氊?zé)就是確保metadata.broker.list中至少有一個(gè)broker是可用的。 One way to achieve this is to use a VIP in a load balancer. If brokers change in a cluster, one can just update the hosts associated with the VIP

Why do I get QueueFullException in my producer when running in async mode?

This typically happens when the producer is trying to send messages quicker than the broker can handle. If the producer can’t block, one will have to add enough brokers so that they jointly can handle the load. If the producer can block, one can set queue.enqueueTimeout.ms in producer config to -1. This way, if the queue is full, the producer will block instead of dropping messages.

為什么我的producer在async模式運(yùn)行中出現(xiàn)了QueueFullException?

這個(gè)問題典型的出現(xiàn)在producer發(fā)送消息的速度大于broker處理消息的速度。如果producer不允許被堵塞,我們不得不添加足夠多的brokers使他們一同處理。如果producer允許被堵塞,我們可以在producer中設(shè)置queue.enqueueTimeout.ms為-1。這樣,如果隊(duì)列滿了,producer 將會(huì)被堵塞而不是拋棄消息。

I am using the ZK-based producer in 0.7 and I see data only produced on some of the brokers, but not all, why?

This is related to an issue in Kafka 0.7.x (see the discussion in http://apache.markmail.org/thread/c7tdalfketpusqkg). Basically, for a new topic, the producer bootstraps using all existing brokers. However, if a topic already exists on some brokers, the producer never bootstraps again when new brokers are added to the cluster. This means that the producer won’t see those new broker. A workaround is to manually create the log directory for that topic on the new brokers.

Why are my brokers not receiving producer sent messages?

This happened when I tried to enable gzip compression by setting compression.codec to 1. With the code change, not a single message was received by the brokers even though I had called producer.send() 1 million times. No error printed by producer and no error could be found in broker’s kafka-request.log. By adding log4j.properties to my producer’s classpath and switching the log level to DEBUG, I captured the Java.lang.NoClassDefFoundError: org/xerial/snappy/SnappyInputStream thrown at the producer side. Now I can see this error can be resolved by adding snappy jar to the producer’s classpath.

Why is data not evenly distributed among partitions when a partitioning key is not specified?

In Kafka producer, a partition key can be specified to indicate the destination partition of the message. By default, a hashing-based partitioner is used to determine the partition id given the key, and people can use customized partitioners also.

To reduce # of open sockets, in 0.8.0 (https://issues.apache.org/jira/browse/KAFKA-1017), when the partitioning key is not specified or null, a producer will pick a random partition and stick to it for some time (default is 10 mins) before switching to another one. So, if there are fewer producers than partitions, at a given point of time, some partitions may not receive any data. To alleviate this problem, one can either reduce the metadata refresh interval or specify a message key and a customized random partitioner. For more detail see this thread http://mail-archives.apache.org/mod_mbox/kafka-dev/201310.mbox/%3CCAFbh0Q0aVh%2Bvqxfy7H-%2BMnRFBt6BnyoZk1LWBoMspwSmTqUKMg%40mail.gmail.com%3E

Is it possible to delete a topic?

In the current version, 0.8.0, no. (You could clear the entire Kafka and zookeeper states to delete all topics and data.) But upcoming releases are expected to include a delete topic tool

在當(dāng)前版本中,0.8.0,不可以。(你可以通過清理整個(gè)kafka及zookeeper中的狀態(tài)去刪除topics 及其數(shù)據(jù)。

Consumers

Why does my consumer never get any data?

By default, when a consumer is started for the very first time, it ignores all existing data in a topic and will only consume new data coming in after the consumer is started. If this is the case, try sending some more data after the consumer is started. Alternatively, you can configure the consumer by setting auto.offset.reset to “smallest”.

為什么我的consumer從來沒有獲取到數(shù)據(jù)?

默認(rèn)情況下,當(dāng)一個(gè)consumer第一次啟動(dòng)是,它會(huì)忽略topic上已經(jīng)存在的所有數(shù)據(jù)而只是消費(fèi)其啟動(dòng)之后生產(chǎn)的數(shù)據(jù)。如果是這種情況,嘗試著在consumer啟動(dòng)后發(fā)送更多的新的數(shù)據(jù)到topic中?;蛘?,你可以在consumer設(shè)置auto.offset.reset為”smallest”

Why does my consumer get InvalidMessageSize Exception?

This typically means that the “fetch size” of the consumer is too small. Each time the consumer pulls data from the broker, it reads bytes up to a configured limit. If that limit is smaller than the largest single message stored in Kafka, the consumer can’t decode the message properly and will throw an InvalidMessageSizeException. To fix this, increase the limit by setting the property “fetch.size” (0.7) / “fetch.message.max.bytes” (0.8) properly in config/consumer.properties. The default fetch.size is 300,000 bytes.

為什么我的consumer出現(xiàn)了InvalidMessageSizeException?

這通常意味著consumer的”fetch size”太小。consumer每次從broker讀取消息的最大字節(jié)數(shù)取決于配置的限制。如果這種限制小于存儲(chǔ)在kafka中最大的單一消息大小,consumer將不能正確的對(duì)消息進(jìn)行解碼從而拋出InvalidMessageSizeException異常。為了解決這個(gè)問題,我們可以在 config/consumer.properties 中增大”fetch.size” (0.7) / “fetch.message.max.bytes” (0.8) 屬性的設(shè)置。默認(rèn)情況下,fetch.size 被設(shè)置為 300,000 字節(jié)。

Should I choose multiple group ids or a single one for the consumers?

If all consumers use the same group id, messages in a topic are distributed among those consumers. In other words, each consumer will get a non-overlapping subset of the messages. Having more consumers in the same group increases the degree of parallelism and the overall throughput of consumption. See the next question for the choice of the number of consumer instances. On the other hand, if each consumer is in its own group, each consumer will get a full copy of all messages.

我改為consumers選擇多個(gè)group.id還是單一的group.id?

如果所有consumers使用相同的group.id,那么topic中的消息將分布在這些消費(fèi)者中。換句話說,每一個(gè)消費(fèi)者將得到一個(gè)不重疊的子集。多個(gè)消費(fèi)者在同一個(gè)消費(fèi)組中可以提高并行度和整體的消費(fèi)量。請(qǐng)看所選的consumer實(shí)例中的下一個(gè)問題。另一方面,如果每個(gè)消費(fèi)者都在一個(gè)屬于自己的獨(dú)立的消費(fèi)者組,那么每個(gè)消費(fèi)者將得到topic中所有消息的一個(gè)副本。

Why some of the consumers in a consumer group never receive any message?

Currently, a topic partition is the smallest unit that we distribute messages among consumers in the same consumer group. So, if the number of consumers is larger than the total number of partitions in a Kafka cluster (across all brokers), some consumers will never get any data. The solution is to increase the number of partitions on the broker.

為什么一個(gè)消費(fèi)組中的某些消費(fèi)者一直沒有獲取到消息?

目前,一個(gè)topic partition是我們?cè)谝粋€(gè)消費(fèi)組中分發(fā)消息到消費(fèi)組的最小單元。所以,如果一個(gè)消費(fèi)組中消費(fèi)者的數(shù)目大于kafka 集群中partitions的總數(shù),那么將會(huì)有部分consumer永遠(yuǎn)獲取不到消息。這個(gè)問題的解決辦法就是增加kafka中partition的數(shù)目。

Why are there many rebalances in my consumer log?

A typical reason for many rebalances is the consumer side GC. If so, you will see Zookeeper session expirations in the consumer log (grep for Expired). Occasional rebalances are fine. Too many rebalances can slow down the consumption and one will need to tune the Java GC setting.

為什么我的消費(fèi)者日志中有很多的再平衡?

這種問題的一個(gè)典型原因是consumer端的GC導(dǎo)致。如果這樣,你會(huì)在consumer日志中看到消費(fèi)者注冊(cè)到zookeeper的會(huì)話過期。偶爾的再平衡是好的。但是,過多的在平衡將會(huì)降低消耗和我們需要調(diào)整設(shè)置java GC。

Can I predict the results of the consumer rebalance?

During the rebalance process, each consumer will execute the same deterministic algorithm to range partition a sorted list of topic-partitions over a sorted list of consumer instances. This makes the whole rebalancing process deterministic. For example, if you only have one partition for a specific topic and going to have two consumers consuming this topic, only one consumer will get the data from the partition of the topic; and even if the consumer named “Consumer1” is registered after the other consumer named “Consumer2”, it will replace “Consumer2” gaining the ownership of the partition in the rebalance.

我是否可以預(yù)測消費(fèi)者再平衡的結(jié)果?

在再平衡過程中,each consumer will execute the same deterministic algorithm to range partition a sorted list of topic-partitions over a sorted list of consumer instances。這使得整個(gè)在平衡過程是確定的。例如,如果你有一個(gè)一個(gè)分區(qū)的特定topic,將有兩個(gè)消費(fèi)者消費(fèi)這個(gè)topic,即使名為”Consumer1”的消費(fèi)者在名為”Consumer2”后注冊(cè),它也會(huì)在再平衡過程中取代”Consumer2”獲取這個(gè)topic分區(qū)的消費(fèi)權(quán)。

Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of consumer streams (threads) to determine the number of partitions to allocate to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C1 and C2 with two streams each, and there are five available partitions (p0, p1, p2, p3, p4). So each consumer thread will get at least one partition and the first consumer thread will get one extra partition. So the assignment will be: p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1

Range partitioning 是基于每一個(gè)topic。對(duì)于每一個(gè)topic,我們列出根據(jù)數(shù)字排序的可用分區(qū)和根據(jù)字典排序的消費(fèi)者線程, 我們根據(jù)消費(fèi)流的總數(shù)來決定給每個(gè)消費(fèi)者分配幾個(gè)分區(qū)。如果劃分不均衡,前面的幾個(gè)消費(fèi)者將獲得一個(gè)額外的分區(qū)(p0,p1,p2,p3,p4,p5). 所以、每個(gè)消費(fèi)流將至少得到一個(gè)分區(qū),并且第一個(gè)消費(fèi)線程將獲取一個(gè)額外的分區(qū)。最后,任務(wù)分配將是這樣的:p0 -> C1-0, P1 - >C1-0,P2 -> C1-2, P3 -> p4->C2-1, P4 -> p5->C2-2.

My consumer seems to have stopped, why?

First, try to figure out if the consumer has really stopped or is just slow. You can use our tool

ConsumerOffsetChecker

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group consumer-group1 --zkconnect zkhost:zkport --topic topic1consumer-group1,topic1,0-0 (Group,Topic,BrokerId-PartitionId)Owner = consumer-group1-consumer1Consumer offset = 70121994703= 70,121,994,703 (65.31G)Log size = 70122018287= 70,122,018,287 (65.31G)Consumer lag = 23584= 23,584 (0.00G)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

In 0.8, you can also monitor the MaxLag and the MinFetch jmx bean (see http://kafka.apache.org/documentation.html#monitoring).

If consumer offset is not moving after some time, then consumer is likely to have stopped. If consumer offset is moving, but consumer lag (difference between the end of the log and the consumer offset) is increasing, the consumer is slower than the producer. If the consumer is slow, the typical solution is to increase the degree of parallelism in the consumer. This may require increasing the number of partitions of a topic.

我的consumer看起來好像停止了,為什么?

首先,嘗試著找出consumer是真的停止了還是僅僅慢而已。你可以使用我們的工具

ConsumerOffsetChecker

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group consumer-group1 --zkconnect zkhost:zkport --topic topic1consumer-group1,topic1,0-0 (Group,Topic,BrokerId-PartitionId)Owner = consumer-group1-consumer1Consumer offset = 70121994703= 70,121,994,703 (65.31G)Log size = 70122018287= 70,122,018,287 (65.31G)Consumer lag = 23584= 23,584 (0.00G)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

在0.8版本中,你還可以監(jiān)控MaxLag jmx bean和MinFetch jmx bean(查看http://kafka.apache.org/documentation.html#monitoring)

如果consumer offset在一段時(shí)間后還是沒移動(dòng),那么consumer可能已經(jīng)停止了。如果consumer offset在移動(dòng),但是滯后消費(fèi)(end of the log和consumer offset的差距)在增加,那就是consumer消費(fèi)比produer生產(chǎn)的慢。如果consumer慢了,典型的解決辦法就是提高consumers的并行度。這很可能就要去我們加大topic的分區(qū)。

The high-level consumer will block if

  • there are no more messages available

    • The ConsumerOffsetChecker will show that the log offset of the partitions being consumed does not change on the broker
  • the next message available is larger than the maximum fetch size you have specified

    • One possibility of a stalled consumer is that the fetch size in the consumer is smaller than the largest message in the broker. You can use the DumpLogSegments tool to figure out the largest message size and set fetch.size in the consumer config accordingly.
  • your client code simply stops pulling messages from the iterator (the blocking queue will fill up).
    • One of the typical causes is that the application code that consumes messages somehow died and therefore killed the consumer thread. We recommend using a try/catch clause to log all Throwable in the consumer logic.
  • consumer rebalancing fails (you will see ConsumerRebalanceFailedException): This is due to conflicts when two consumers are trying to own the same topic partition. The log will show you what caused the conflict (search for “conflict in “).
  • If your consumer subscribes to many topics and your ZK server is busy, this could be caused by consumers not having enough time to see a consistent view of all consumers in the same group. If this is the case, try Increasing rebalance.max.retries and rebalance.backoff.ms.
  • Another reason could be that one of the consumers is hard killed. Other consumers during rebalancing won’t realize that consumer is gone after zookeeper.session.timeout.ms time. In the case, make sure that rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms.

以下情況,高級(jí)consumer可能會(huì)堵塞:

  • 沒有可用的消息

    • ConsumerOffsetChecker 將會(huì)顯示在broker上已消費(fèi)的分區(qū)log offset 將不再變化。
  • 下一條可用消息大于你指定的最大fetch size .

    • 一種消費(fèi)者停滯的原因是consumer端的fetch size小于borker上最大的消息。你可以是呀DumpLogSegments工具找出最大消息大小以便在consumer端配置相應(yīng)的fetch size。
  • 你的客戶端代碼停止從迭代器中拉消息。

    • 一個(gè)典型的原因是消費(fèi)消息的應(yīng)用程序代碼以某種原因死掉了,因此導(dǎo)致消費(fèi)者線程被殺死。我們建議使用try/catch字句來log消費(fèi)邏輯中所有拋出的異常。
  • 消費(fèi)者在平衡失?。銜?huì)看到ConsumerRebalanceFailedException):這是由于兩個(gè)消費(fèi)者想擁有同一個(gè)topic分區(qū)而引起沖突導(dǎo)致的,日志會(huì)告訴你沖突的原因(查找”conflict in “)

  • 如果你的消費(fèi)者訂閱量許多topic而你的zookeeper又比較繁忙,這可能是消費(fèi)者沒有足夠的時(shí)間去看到消費(fèi)組中一致的消費(fèi)者視圖。如果這樣的話,可以嘗試增加rebalance.max.retries 和 rebalance.backoff.ms配置。

  • 另一個(gè)原因是消費(fèi)者組中一個(gè)消費(fèi)者很難被殺死。在再平衡過程中其他消費(fèi)者沒有意識(shí)到這個(gè)消費(fèi)者將在zookeeper.session.timeout.ms時(shí)長后死亡。在這種情況下,確保rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms.

Why messages are delayed in my consumer?

This could be a general throughput issue. If so, you can use more consumer streams (may need to increase # partitions) or make the consumption logic more efficient.

這可能是一個(gè)總吞吐量的問題。如果是的話,你需要更多的消費(fèi)流(你需要加大你的分區(qū)數(shù))或者提供消費(fèi)邏輯的效率。

Another potential issue is when multiple topics are consumed in the same consumer connector. Internally, we have an in-memory queue for each topic, which feed the consumer iterators. We have a single fetcher thread per broker that issues multi-fetch requests for all topics. The fetcher thread iterates the fetched data and tries to put the data for different topics into its own in-memory queue. If one of the consumer is slow, eventually its corresponding in-memory queue will be full. As a result, the fetcher thread will block on putting data into that queue. Until that queue has more space, no data will be put into the queue for other topics. Therefore, those other topics, even if they have less volume, their consumption will be delayed because of that. To address this issue, either making sure that all consumers can keep up, or using separate consumer connectors for different topics。

另一個(gè)潛在的問題是使用同一個(gè)連接器消費(fèi)多個(gè)topics。在kafka內(nèi)部,我們每一個(gè)topic都有一個(gè)供消費(fèi)者迭代器使用的內(nèi)存隊(duì)列,……

How to improve the throughput of aremote consumer?

If the consumer is in a different data center from the broker, you may need to tune the socket buffer size to amortize the long network latency. Specifically, for Kafka 0.7, you can increase socket.receive.buffer in the broker, and socket.buffersize and fetch.size in the consumer. For Kafka 0.8, the consumer properties are socket.receive.buffer.bytes and fetch.message.max.bytes

如果consumer與broker在不同的數(shù)據(jù)中心,你可以需要調(diào)整sokect緩沖區(qū)大小來分?jǐn)偩W(wǎng)絡(luò)延遲。具體的說,在kafka0.7中,你可以增大broker端的socket.receive.buffer和consumer端的ocket.buffersize 及fetch.size配置。對(duì)于kafka0.8,consumer端配置熟悉是socket.receive.buffer.bytes 和fetch.message.max.bytes

How can I rewind the offset in the consumer?

If you are using the high level consumer, currently there is no api to reset the offsets in the consumer. The only way is to stop all consumers and reset the offsets for that consumer group in ZK manually. We do have an import/export offset tool that you can use (bin/kafka-run-class.sh kafka.tools.ImportZkOffsets and bin/kafka-run-class.sh

kafka.tools.ExportZkOffsets). To get the offsets for importing, we have a GetOffsetShell tool (bin/kafka-run-class.sh kafka.tools.GetOffsetShell) that allows you to get the offsets before a give timestamp. The offsets returned there are the offsets corresponding to the first message of each log segment. So the granularity is very coarse.

我可以在consumer端重置offset么?

如果你使用的是高級(jí)consumer,目前沒有API可以重置offset。唯一的辦法是停止所有的消費(fèi)者然后手動(dòng)重置該消費(fèi)者組在zookeeper上的offset值。我們有一個(gè)導(dǎo)入/導(dǎo)出工具可以供你使用(bin/kafka-run-class.sh kafka.tools.ImportZkOffsets 和bin/kafka-run-class.sh kafka.tools.GetOffsetShell).為了得到需要導(dǎo)入的offsets,我們可以使用工具(bin/kafka-run-class.sh kafka.tools.GetOffsetShell)得到某個(gè)時(shí)間撮之前的offsets。這里返回的offsets是對(duì)應(yīng)每個(gè)日志段的第一條消息,因此粒度很粗。

I don’t want my consumer’s offsets to be committed automatically. Can I manually manage my consumer’s offsets?

You can turn off the autocommit behavior (which is on by default) by setting auto.commit.enable=false in your consumer’s config. There are a couple of caveats to keep in mind when doing this:

  • You will manually commit offsets using the consumer’s commitOffsets API. Note that this will commit offsets for all partitions that the consumer currently owns. The consumer connector does not currently provide a more fine-grained commit API.

  • If a consumer rebalances for any reason it will fetch the last committed offsets for any partitions that it ends up owning. If you have not yet committed any offsets for these partitions, then it will use the latest or earliest offset depending on whether auto.offset.reset is set to largest or smallest (respectively).

我不想我的消費(fèi)者自動(dòng)提交offsets,我可以手動(dòng)管理我的消費(fèi)者offsets么?

你可以在你的consumer的配置文件設(shè)置auto.commit.enable=false來關(guān)閉自動(dòng)提交行為(這是默認(rèn)的)。當(dāng)你這么做時(shí),需要注意以下兩點(diǎn):

  • 你可以使用consumer端的commitOffsets API來手動(dòng)提交offsets。需要注意的是你將提交屬于該消費(fèi)者所有分區(qū)的offsets。目前消費(fèi)者連接器尚未提供更細(xì)粒度的提交API。

  • 如果消費(fèi)者因?yàn)槟撤N原因發(fā)再均衡,最后提交的offsets將會(huì)被其分區(qū)最終所屬的消費(fèi)者再次獲取。如果這些分區(qū)沒有提交任務(wù)offsets,那么將根據(jù)auto.offset.reset 被設(shè)置為largest 或smallest 分別取最新的或最早的offsets。

What is the relationship between fetch.wait.max.ms and socket.timeout.ms on the consumer?

fetch.wait.max.ms controls how long a fetch request will wait on the broker in the normal case. The issue is that if there is a hard crash on the broker (host is down), the client may not realize this immediately since TCP will try very hard to maintain the socket connection. By setting socket.timeout.ms, we allow the client to break out sooner in this case. Typically, socket.timeout.ms should be set to be at least fetch.wait.max.ms or a bit larger. It’s possible to specify an indefinite long poll by setting fetch.wait.max.ms to a very large value. It’s not recommended right now due to https://issues.apache.org/jira/browse/KAFKA-1016. The consumer-config documentation states that “The actual timeout set will be max.fetch.wait + socket.timeout.ms.” - however, in the code a while ago. https://issues.apache.org/jira/browse/KAFKA-1147 is filed to fix it.

fetch.wait.max.ms控制著正常情況下從broker上fetch請(qǐng)求將等待多久。問題是如果broker崩潰了(主機(jī)倒了),由于TCP將努力維持連接,所以客戶端可能不會(huì)立即發(fā)現(xiàn)這個(gè)問題。通過配置socket.timeout.ms,我們?cè)试S客戶端盡早擺脫這種情況。通常情況下,socket.timeout.ms需要設(shè)置為大于或等于fetch.wait.max.ms。通過設(shè)置fetch.wait.max.ms為一個(gè)非常大的值來指定一個(gè)無限長的poll是可行的,但是由于問題https://issues.apache.org/jira/browse/KAFKA-1016.,所以不推薦。用戶配置文件指出"實(shí)際超時(shí)時(shí)間將會(huì)被設(shè)為ax.fetch.wait + socket.timeout.ms".owever, in the code a while ago. https://issues.apache.org/jira/browse/KAFKA-1147 is filed to fix it.

How do I get exactly-once messaging from Kafka?

Exactly once semantics has two parts: avoiding duplication during data production and avoiding duplicates during data consumption.

There are two approaches to getting exactly once semantics during data production:

1.Use a single-writer per partition and every time you get a network error check the last message in that partition to see if your last write succeeded

2.Include a primary key (UUID or something) in the message and deduplicate on the consumer.

If you do one of these things, the log that Kafka hosts will be duplicate-free. However, reading without duplicates depends on some co-operation from the consumer too. If the consumer is periodically checkpointing its position then if it fails and restarts it will restart from the checkpointed position. Thus if the data output and the checkpoint are not written atomically it will be possible to get duplicates here as well. This problem is particular to your storage system. For example, if you are using a database you could commit these together in a transaction. The HDFS loader Camus that LinkedIn wrote does something like this for Hadoop loads. The other alternative that doesn’t require a transaction is to store the offset with the data loaded and deduplicate using the topic/partition/offset combination.

I think there are two improvements that would make this a lot easier:

1.Producer idempotence could be done automatically and much more cheaply by optionally integrating support for this on the server.

2.The existing high-level consumer doesn’t expose a lot of the more fine grained control of offsets (e.g. to reset your position). We will be working on that soon

Why can’t I specify the number of streams parallelism per topic map using wildcard stream as I use static stream handler?

The reason we do not have per-topic parallelism specification in wildcard is that with the wildcard topicFilter, we will not know exactly which topics to consume at the construction time, hence no way to specify per-topic specs.

How to consume large messages?

First you need to make sure these large messages can be acceptted at Kafka brokers:

{code}

message.max.bytes

{code}

controls the maximum size of a message that can be accepted at the broker, and any single message (including the wrapper message for compressed message set) whose size is larger than this value will be rejected for producing.

Then you need to make sure consumers can fetch such large messages on brokers:

{code}

fetch.message.max.bytes

{code}

controls the maximum number of bytes a consumer issues in one fetch. If it is less than a message’s size, the fetching will be blocked on that message keep retrying.

How do we migrate to committing offsets to Kafka (rather than Zookeeper) in 0.8.2?

(Answer provided by Jon Bringhurst on mailing list)

A summary of the migration procedure is:

1) Upgrade your brokers and set dual.commit.enabled=false and offsets.storage=zookeeper (Commit offsets to Zookeeper Only).

2) Set dual.commit.enabled=true and offsets.storage=kafka and restart (Commit offsets to Zookeeper and Kafka).

3) Set dual.commit.enabled=false and offsets.storage=kafka and restart (Commit offsets to Kafka only).

Brokers

How does Kafka depend on Zookeeper?

Starting from 0.9, we are removing all the Zookeeper dependency from the clients (for details one can check this page). However, the brokers will continue to be heavily depend on Zookeeper for:

1.Server failure detection.

2.Data partitioning.

3.In-sync data replication.

4.Consumer membership management.

Once the Zookeeper quorum is down, brokers could result in a bad state and could not normally serve client requests, etc. Although when Zookeeper quorum recovers, the Kafka brokers should be able to resume to normal state automatically, there are still a few corner cases the they cannot and a hard kill-and-recovery is required to bring it back to normal. Hence it is recommended to closely monitor your zookeeper cluster and provision it so that it is performant.

Also note that if Zookeeper was hard killed previously, upon restart it may not successfully load all the data and update their creation timestamp. To resolve this you can clean-up the data directory of the Zookeeper before restarting (if you have critical metadata such as consumer offsets you would need to export / import them before / after you cleanup the Zookeeper data and restart the server).

Why do I see error “Should not set log end offset on partition” in the broker log?

Typically, you will see errors like the following.

kafka.common.KafkaException: Should not set log end offset on partition [test,22]’s local replica 4

ERROR [ReplicaFetcherThread-0-6], Error for partition [test,22] to broker 6:class kafka.common.UnknownException(kafka.server.ReplicaFetcherThread)

A common problem is that more than one broker registered the same host/port in Zookeeper. As a result, the replica fetcher is confused when fetching data from the leader. To verify that, you can use a Zookeeper client shell to list the registration info of each broker. The Zookeeper path and the format of the broker registration is described in Kafka data structures in Zookeeper. You want to make sure that all the registered brokers have unique host/port.

Why does controlled shutdown fail?

If a controlled shutdown attempt fails, you will see error messages like the following in your broker logs

WARN [Kafka Server 0], Retrying controlled shutdown after the previous attempt failed… (kafka.server.KafkaServer)

WARN [Kafka Server 0], Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed

In addition to these error messages, if you also see SocketTimeoutExceptions, it indicates that the controller could not finish moving the leaders for all partitions on the broker within controller.socket.timeout.ms. The solution is to increase controller.socket.timeout.ms as well as increase controlled.shutdown.retry.backoff.ms and controlled.shutdown.max.retries to give enough time for the controlled shutdown to complete. If you don’t see SocketTimeoutExceptions, it could indicate a problem in your cluster state or a bug as this happens when the controller is not able to move the leaders to another broker for several retries.

Why can’t my consumers/producers connect to the brokers?

When a broker starts up, it registers its ip/port in ZK. You need to make sure the registered ip is consistent with what’s listed in metadata.broker.list in the producer config. By default, the registered ip is given by InetAddress.getLocalHost.getHostAddress. Typically, this should return the real ip of the host. However, sometimes (e.g., in EC2), the returned ip is an internal one and can’t be connected to from outside. The solution is to explicitly set the host ip to be registered in ZK by setting the “hostname” property in server.properties. In another rare case where the binding host/port is different from the host/port for client connection, you can set advertised.host.name and advertised.port for client connection.

Why partition leaders migrate themselves some times?

During a broker soft failure, e.g., a long GC, its session on ZooKeeper may timeout and hence be treated as failed. Upon detecting this situation, Kafka will migrate all the partition leaderships it currently hosts to other replicas. And once the broker resumes from the soft failure, it can only act as the follower replica of the partitions it originally leads.

To move the leadership back to the brokers, one can use the preferred-leader-election tool here. Also, in 0.8.2 a new feature will be added which periodically trigger this functionality (details here).

To reduce Zookeeper session expiration, either tune the GC or increase zookeeper.session.timeout.ms in the broker config.

How many topics can I have?

Unlike many messaging systems Kafka topics are meant to scale up arbitrarily. Hence we encourage fewer large topics rather than many small topics. So for example if we were storing notifications for users we would encourage a design with a single notifications topic partitioned by user id rather than a separate topic per user.

The actual scalability is for the most part determined by the number of total partitions across all topics not the number of topics itself (see the question below for details).

How do I choose the number of partitions for a topic?

There isn’t really a right answer, we expose this as an option because it is a tradeoff. The simple answer is that the partition count determines the maximum consumer parallelism and so you should set a partition count based on the maximum consumer parallelism you would expect to need (i.e. over-provision). Clusters with up to 10k total partitions are quite workable. Beyond that we don’t aggressively test (it should work, but we can’t guarantee it).

Here is a more complete list of tradeoffs to consider:

n A partition is basically a directory of log files.

n Each partition must fit entirely on one machine. So if you have only one partition in your topic you cannot scale your write rate or retention beyond the capability of a single machine. If you have 1000 partitions you could potentially use 1000 machines.

n Each partition is totally ordered. If you want a total order over all writes you probably want to have just one partition.

n Each partition is not consumed by more than one consumer thread/process in each consumer group. This allows to have each process consume in a single threaded fashion to guarantee ordering to the consumer within the partition (if we split up a partition of ordered messages and handed them out to multiple consumers even though the messages were stored in order they would be processed out of order at times).

n Many partitions can be consumed by a single process, though. So you can have 1000 partitions all consumed by a single process.

n Another way to say the above is that the partition count is a bound on the maximum consumer parallelism.

n More partitions will mean more files and hence can lead to smaller writes if you don’t have enough memory to properly buffer the writes and coalesce them into larger writes

n Each partition corresponds to several znodes in zookeeper. Zookeeper keeps everything in memory so this can eventually get out of hand.

n More partitions means longer leader fail-over time. Each partition can be handled quickly (milliseconds) but with thousands of partitions this can add up.

n When we checkpoint the consumer position we store one offset per partition so the more partitions the more expensive the position checkpoint is.

n It is possible to later expand the number of partitions BUT when we do so we do not attempt to reorganize the data in the topic. So if you are depending on key-based semantic partitioning in your processing you will have to manually copy data from the old low partition topic to a new higher partition topic if you later need to expand.

Note that I/O and file counts are really about #partitions/#brokers, so adding brokers will fix problems there; but zookeeper handles all partitions for the whole cluster so adding machines doesn’t help.

Why do I see lots of Leader not local exceptions on the broker during controlled shutdown?

This happens when the producer clients are using num.acks=0. When the leadership for a partition is changed, the clients (producer and consumer) gets an error when they try to produce or consume from the old leader when they wait for a response. The client then refreshes the partition metadata from zookeeper and gets the new leader for the partition and retries. This does not work for the producer client when ack = 0. This is because the producer does not wait for a response and hence does not know about the leadership change. The client would end up loosing messages till the shutdown broker is brought back up. This issue is fixed in KAFKA-955

How to reduce churns in ISR? When does a broker leave the ISR ?

ISR is a set of replicas that are fully sync-ed up with the leader. In other words, every replica in ISR has all messages that are committed. In an ideal system, ISR should always include all replicas unless there is a real failure. A replica will be dropped out of ISR if it diverges from the leader. This is controlled by two parameters: replica.lag.time.max.ms and replica.lag.max.messages. The former is typically set to a value that reliably detects the failure of a broker. We have a min fetch rate JMX in the broker. If that rate is n, set the former to a value larger than 1/n * 1000. The latter is typically set to the observed max lag (a JMX bean) in the follower. Note that if replica.lag.max.messages is too large, it can increase the time to commit a message. If latency becomes a problem, you can increase the number of partitions in a topic.

If a replica constantly drops out of and rejoins isr, you may need to increase replica.lag.max.messages. If a replica stays out of ISR for a long time, it may indicate that the follower is not able to fetch data as fast as data is accumulated at the leader. You can increase the follower’s fetch throughput by setting a larger value for num.replica.fetchers.

After bouncing a broker, why do I see LeaderNotAvailable or NotLeaderForPartition exceptions on startup?

If you don’t use controlled shutdown, some partitions that had leaders on the broker being bounced Go offline immediately. The controller takes some time to elect leaders and notify the brokers to assume the new leader role. Following this, clients take some time to send metadata requests and discover the new leaders. If the broker is stopped and restarted quickly, clients that have not discovered the new leader keep sending requests to the newly restarted broker. The exceptions are throws since the newly restarted broker is not the leader for any partition.

Can I add new brokers dynamically to a cluster?

Yes, new brokers can be added online to a cluster. Those new brokers won’t have any data initially until either some new topics are created or some replicas are moved to them using the partition reassignment tool.

How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?

Kafka allows querying offsets of messages by time and it does so at segment granularity. The timestamp parameter is the unix timestamp and querying the offset by timestamp returns the latest possible offset of the message that is appended no later than the given timestamp. There are 2 special values of the timestamp - latest and earliest. For any other value of the unix timestamp, Kafka will get the starting offset of the log segment that is created no later than the given timestamp. Due to this, and since the offset request is served only at segment granularity, the offset fetch request returns less accurate results for larger segment sizes.

For more accurate results, you may configure the log segment size based on time (log.roll.ms) instead of size (log.segment.bytes). However care should be taken since doing so might increase the number of file handlers due to frequent log segment rolling.

本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
kafka文檔(5)----0.8.2-C/C++客戶端介紹
漫游Kafka實(shí)戰(zhàn)篇之客戶端編程實(shí)例
Fluentd-kafka插件用法詳解
KAFKA分布式消息系統(tǒng)
Kafka十大常用命令查看修改偏移量創(chuàng)建刪除topic等
Kafka 源碼剖析
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服