Return-Path: X-Original-To: apmail-kafka-dev-archive@www.apache.org Delivered-To: apmail-kafka-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C6572100AA for ; Tue, 25 Mar 2014 00:30:24 +0000 (UTC) Received: (qmail 37644 invoked by uid 500); 25 Mar 2014 00:30:22 -0000 Delivered-To: apmail-kafka-dev-archive@kafka.apache.org Received: (qmail 37598 invoked by uid 500); 25 Mar 2014 00:30:22 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 37583 invoked by uid 99); 25 Mar 2014 00:30:22 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Mar 2014 00:30:22 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of neha.narkhede@gmail.com designates 209.85.212.171 as permitted sender) Received: from [209.85.212.171] (HELO mail-wi0-f171.google.com) (209.85.212.171) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Mar 2014 00:30:17 +0000 Received: by mail-wi0-f171.google.com with SMTP id hr14so1475131wib.4 for ; Mon, 24 Mar 2014 17:29:56 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :cc:content-type; bh=IA7B+tKZ8jTLF8CUlHImtqanJOlS9hfsCzqnjMfUlc4=; b=i2iCG4XCY40YENAnTe1ySQbPWdCtXl+fFcQFNhudW1uqv/ziVifZE2KHTOEZda5jtM X+iXENzBysoey4V7/uJ84nwPPUvmBYkQpQ+jwCSl28MIOZbGZxR2iccg+ScDsQvDBV85 x2V78URZ/AGEZxG0PfRhFm/o9AaM6WJrZ6CpsaqUaFa6Vf6xLiJYOZuG8cp1yCoII1+h WklQIr1EpULTp7jE4s7Dt+G64H0Cf+H3uKQJaA9MvcT8q21DLkVn5zegN/PctbQtRJa2 MVB0DD1HbaPpAPLxlO8k7uQ510oLyioGwqccO5II5d9Wdead0wjD9k332sg6mGwNLmvQ N/5Q== MIME-Version: 1.0 X-Received: by 10.180.92.196 with SMTP id co4mr18298374wib.50.1395707396483; Mon, 24 Mar 2014 17:29:56 -0700 (PDT) Received: by 10.216.37.73 with HTTP; Mon, 24 Mar 2014 17:29:56 -0700 (PDT) In-Reply-To: References: Date: Mon, 24 Mar 2014 17:29:56 -0700 Message-ID: Subject: Re: New Consumer API discussion From: Neha Narkhede To: "dev@kafka.apache.org" Cc: "users@kafka.apache.org" Content-Type: multipart/alternative; boundary=f46d0435c07693f0ec04f5637306 X-Virus-Checked: Checked by ClamAV on apache.org --f46d0435c07693f0ec04f5637306 Content-Type: text/plain; charset=ISO-8859-1 Hey Chris, Really sorry for the late reply, wonder how this fell through the cracks. Anyhow, thanks for the great feedback! Here are my comments - 1. Why is the config String->Object instead of String->String? This is probably more of a feedback about the new config management that we adopted in the new clients. I think it is more convenient to write configs.put("a", 42); instead of configs.put("a", Integer.toString(42)); 2. Are these Java docs correct? KafkaConsumer(java.util.Map< java.lang.String,java.lang.Object> configs) A consumer is instantiated by providing a set of key-value pairs as configuration and a ConsumerRebalanceCallback implementation There is no ConsumerRebalanceCallback parameter. Fixed. 3. Would like to have a method: poll(long timeout, java.util.concurrent.TimeUnit timeUnit, TopicPartition... topicAndPartitionsToPoll) I see I can effectively do this by just fiddling with subscribe and unsubscribe before each poll. Is this a low-overhead operation? Can I just unsubscribe from everything after each poll, then re-subscribe to a topic the next iteration. I would probably be doing this in a fairly tight loop. The subscribe and unsubscribe will be very lightweight in-memory operations, so it shouldn't be a problem to just use those APIs directly. Let me know if you think otherwise. 4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there are use cases for decoupling "what to do when no offset exists" from "what to do when I'm out of range". I might want to start from smallest the first time I run, but fail if I ever get offset out of range. How about adding a third option "disable" to "auto.offset.reset"? What this says is that never automatically reset the offset, either if one is not found or if the offset falls out of range. Presumably, you would want to turn this off when you want to control the offsets yourself and use custom rewind/replay logic to reset the consumer's offset. In this case, you would want to turn this feature off so Kafka does not accidentally reset the offset to something else. I'm not so sure when you would want to make the distinction regarding startup and offset falling out of range. Presumably, if you don't trust Kafka to reset the offset, then you can always turn this off and use commit/commitAsync and seek() to set the consumer to the right offset on startup and every time your consumer falls out of range. Does that make sense? 5. ENABLE_JMX could use Java docs, even though it's fairly self-explanatory. Fixed. 6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or across all topic/partitions is useful. I believe it's per-topic/partition, right? That is, setting to 2 megs with two TopicAndPartitions would result in 4 megs worth of data coming in per fetch, right? Good point, clarified that. Take a look again to see if it makes sense now. 7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out? Retry, or throw exception? Throw a TimeoutException. Clarified that in the docs . 8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and fetch requests? Applies to all requests. Clarified that in the docs. 9. What does SESSION_TIMEOUT_MS default to? Defaults are largely TODO, but session.timeout.ms currently defaults to 1000. 10. Is this consumer thread-safe? It should be. Updated the docsto clarify that. 11. How do you use a different offset management strategy? Your email implies that it's pluggable, but I don't see how. "The offset management strategy defaults to Kafka based offset management and the API provides a way for the user to use a customized offset store to manage the consumer's offsets." 12. If I wish to decouple the consumer from the offset checkpointing, is it OK to use Joel's offset management stuff directly, rather than through the consumer's commit API? For #11 and #12, I updated the docsto include actual usage examples. Could you take a look and see if answers your questions? Thanks, Neha On Mon, Mar 3, 2014 at 10:28 AM, Chris Riccomini wrote: > Hey Guys, > > Also, for reference, we'll be looking to implement new Samza consumers > which have these APIs: > > http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or > g/apache/samza/system/SystemConsumer.html > > http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or > g/apache/samza/checkpoint/CheckpointManager.html > > > Question (3) below is a result of having Samza's SystemConsumers poll > allow specific topic/partitions to be specified. > > The split between consumer and checkpoint manager is the reason for > question (12) below. > > Cheers, > Chris > > On 3/3/14 10:19 AM, "Chris Riccomini" wrote: > > >Hey Guys, > > > >Sorry for the late follow up. Here are my questions/thoughts on the API: > > > >1. Why is the config String->Object instead of String->String? > > > >2. Are these Java docs correct? > > > > KafkaConsumer(java.util.Map configs) > > A consumer is instantiated by providing a set of key-value pairs as > >configuration and a ConsumerRebalanceCallback implementation > > > >There is no ConsumerRebalanceCallback parameter. > > > >3. Would like to have a method: > > > > poll(long timeout, java.util.concurrent.TimeUnit timeUnit, > >TopicPartition... topicAndPartitionsToPoll) > > > >I see I can effectively do this by just fiddling with subscribe and > >unsubscribe before each poll. Is this a low-overhead operation? Can I just > >unsubscribe from everything after each poll, then re-subscribe to a topic > >the next iteration. I would probably be doing this in a fairly tight loop. > > > >4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there > >are use cases for decoupling "what to do when no offset exists" from "what > >to do when I'm out of range". I might want to start from smallest the > >first time I run, but fail if I ever get offset out of range. > > > >5. ENABLE_JMX could use Java docs, even though it's fairly > >self-explanatory. > > > >6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or > >across all topic/partitions is useful. I believe it's per-topic/partition, > >right? That is, setting to 2 megs with two TopicAndPartitions would result > >in 4 megs worth of data coming in per fetch, right? > > > >7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out? > >Retry, or throw exception? > > > >8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and > >fetch requests? > > > >9. What does SESSION_TIMEOUT_MS default to? > > > >10. Is this consumer thread-safe? > > > >11. How do you use a different offset management strategy? Your email > >implies that it's pluggable, but I don't see how. "The offset management > >strategy defaults to Kafka based offset management and the API provides a > >way for the user to use a customized offset store to manage the consumer's > >offsets." > > > >12. If I wish to decouple the consumer from the offset checkpointing, is > >it OK to use Joel's offset management stuff directly, rather than through > >the consumer's commit API? > > > > > >Cheers, > >Chris > > > >On 2/10/14 10:54 AM, "Neha Narkhede" wrote: > > > >>As mentioned in previous emails, we are also working on a > >>re-implementation > >>of the consumer. I would like to use this email thread to discuss the > >>details of the public API. I would also like us to be picky about this > >>public api now so it is as good as possible and we don't need to break it > >>in the future. > >> > >>The best way to get a feel for the API is actually to take a look at the > >>javadoc< > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc > >>/ > >>doc/kafka/clients/consumer/KafkaConsumer.html>, > >>the hope is to get the api docs good enough so that it is > >>self-explanatory. > >>You can also take a look at the configs > >>here< > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/do > >>c > >>/kafka/clients/consumer/ConsumerConfig.html> > >> > >>Some background info on implementation: > >> > >>At a high level the primary difference in this consumer is that it > >>removes > >>the distinction between the "high-level" and "low-level" consumer. The > >>new > >>consumer API is non blocking and instead of returning a blocking > >>iterator, > >>the consumer provides a poll() API that returns a list of records. We > >>think > >>this is better compared to the blocking iterators since it effectively > >>decouples the threading strategy used for processing messages from the > >>consumer. It is worth noting that the consumer is entirely single > >>threaded > >>and runs in the user thread. The advantage is that it can be easily > >>rewritten in less multi-threading-friendly languages. The consumer > >>batches > >>data and multiplexes I/O over TCP connections to each of the brokers it > >>communicates with, for high throughput. The consumer also allows long > >>poll > >>to reduce the end-to-end message latency for low throughput data. > >> > >>The consumer provides a group management facility that supports the > >>concept > >>of a group with multiple consumer instances (just like the current > >>consumer). This is done through a custom heartbeat and group management > >>protocol transparent to the user. At the same time, it allows users the > >>option to subscribe to a fixed set of partitions and not use group > >>management at all. The offset management strategy defaults to Kafka based > >>offset management and the API provides a way for the user to use a > >>customized offset store to manage the consumer's offsets. > >> > >>A key difference in this consumer also is the fact that it does not > >>depend > >>on zookeeper at all. > >> > >>More details about the new consumer design are > >>here< > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer > >>+ > >>Rewrite+Design> > >> > >>Please take a look at the new > >>API< > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc > >>/ > >>kafka/clients/consumer/KafkaConsumer.html>and > >>give us any thoughts you may have. > >> > >>Thanks, > >>Neha > > > > --f46d0435c07693f0ec04f5637306--