From Chris Riccomini <criccom...@linkedin.com>
Subject Re: New Consumer API discussion
Date Mon, 03 Mar 2014 18:28:21 GMT
Hey Guys,

Also, for reference, we'll be looking to implement new Samza consumers
which have these APIs:



Question (3) below is a result of having Samza's SystemConsumers poll
allow specific topic/partitions to be specified.

The split between consumer and checkpoint manager is the reason for
question (12) below.


On 3/3/14 10:19 AM, "Chris Riccomini" <criccomini@linkedin.com> wrote:

>Hey Guys,
>Sorry for the late follow up. Here are my questions/thoughts on the API:
>1. Why is the config String->Object instead of String->String?
>2. Are these Java docs correct?
>  KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
>  A consumer is instantiated by providing a set of key-value pairs as
>configuration and a ConsumerRebalanceCallback implementation
>There is no ConsumerRebalanceCallback parameter.
>3. Would like to have a method:
>  poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
>TopicPartition... topicAndPartitionsToPoll)
>I see I can effectively do this by just fiddling with subscribe and
>unsubscribe before each poll. Is this a low-overhead operation? Can I just
>unsubscribe from everything after each poll, then re-subscribe to a topic
>the next iteration. I would probably be doing this in a fairly tight loop.
>4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
>are use cases for decoupling "what to do when no offset exists" from "what
>to do when I'm out of range". I might want to start from smallest the
>first time I run, but fail if I ever get offset out of range.
>5. ENABLE_JMX could use Java docs, even though it's fairly
>6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
>across all topic/partitions is useful. I believe it's per-topic/partition,
>right? That is, setting to 2 megs with two TopicAndPartitions would result
>in 4 megs worth of data coming in per fetch, right?
>7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
>Retry, or throw exception?
>8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
>fetch requests?
>9. What does SESSION_TIMEOUT_MS default to?
>10. Is this consumer thread-safe?
>11. How do you use a different offset management strategy? Your email
>implies that it's pluggable, but I don't see how. "The offset management
>strategy defaults to Kafka based offset management and the API provides a
>way for the user to use a customized offset store to manage the consumer's
>12. If I wish to decouple the consumer from the offset checkpointing, is
>it OK to use Joel's offset management stuff directly, rather than through
>the consumer's commit API?
>On 2/10/14 10:54 AM, "Neha Narkhede" <neha.narkhede@gmail.com> wrote:
>>As mentioned in previous emails, we are also working on a
>>of the consumer. I would like to use this email thread to discuss the
>>details of the public API. I would also like us to be picky about this
>>public api now so it is as good as possible and we don't need to break it
>>in the future.
>>The best way to get a feel for the API is actually to take a look at the
>>the hope is to get the api docs good enough so that it is
>>You can also take a look at the configs
>>Some background info on implementation:
>>At a high level the primary difference in this consumer is that it
>>the distinction between the "high-level" and "low-level" consumer. The
>>consumer API is non blocking and instead of returning a blocking
>>the consumer provides a poll() API that returns a list of records. We
>>this is better compared to the blocking iterators since it effectively
>>decouples the threading strategy used for processing messages from the
>>consumer. It is worth noting that the consumer is entirely single
>>and runs in the user thread. The advantage is that it can be easily
>>rewritten in less multi-threading-friendly languages. The consumer
>>data and multiplexes I/O over TCP connections to each of the brokers it
>>communicates with, for high throughput. The consumer also allows long
>>to reduce the end-to-end message latency for low throughput data.
>>The consumer provides a group management facility that supports the
>>of a group with multiple consumer instances (just like the current
>>consumer). This is done through a custom heartbeat and group management
>>protocol transparent to the user. At the same time, it allows users the
>>option to subscribe to a fixed set of partitions and not use group
>>management at all. The offset management strategy defaults to Kafka based
>>offset management and the API provides a way for the user to use a
>>customized offset store to manage the consumer's offsets.
>>A key difference in this consumer also is the fact that it does not
>>on zookeeper at all.
>>More details about the new consumer design are
>>Please take a look at the new
>>give us any thoughts you may have.

