Return-Path: X-Original-To: apmail-kafka-dev-archive@www.apache.org Delivered-To: apmail-kafka-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4785410A34 for ; Mon, 3 Mar 2014 18:19:34 +0000 (UTC) Received: (qmail 85026 invoked by uid 500); 3 Mar 2014 18:19:29 -0000 Delivered-To: apmail-kafka-dev-archive@kafka.apache.org Received: (qmail 84959 invoked by uid 500); 3 Mar 2014 18:19:28 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 84922 invoked by uid 99); 3 Mar 2014 18:19:28 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Mar 2014 18:19:28 +0000 X-ASF-Spam-Status: No, hits=-2.3 required=5.0 tests=RCVD_IN_DNSWL_MED,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of prvs=132b54cec=criccomini@linkedin.com designates 69.28.149.81 as permitted sender) Received: from [69.28.149.81] (HELO esv4-mav05.corp.linkedin.com) (69.28.149.81) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Mar 2014 18:19:24 +0000 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=linkedin.com; i=@linkedin.com; q=dns/txt; s=proddkim1024; t=1393870764; x=1425406764; h=from:to:subject:date:message-id:in-reply-to:content-id: content-transfer-encoding:mime-version; bh=R9nQUTgk+BJHNmEFqrhykHk6UF9O7GaJ93+MyQPZ8Bw=; b=1S0Qi+rpPhD2YJYE/Zb41Jx1VK/bkphv7K8wo2H3ZzFH97so6J8qKx2C TKKaIswXh5bn5yb17/rxWl4lRdn45DXk7W2XCgeTPGq9JVPLY44UqD7i/ IPms5HaFST8y8ThNAqXSk0lyZGiseoMdHXSe199JExlt4hlbAolR7KW4r M=; X-IronPort-AV: E=Sophos;i="4.97,579,1389772800"; d="scan'208";a="95524332" Received: from esv4-exctest.linkedin.biz (172.18.46.60) by esv4-cas02.linkedin.biz (172.18.46.142) with Microsoft SMTP Server (TLS) id 14.3.174.1; Mon, 3 Mar 2014 10:19:03 -0800 Received: from ESV4-MBX01.linkedin.biz ([fe80::d029:a1fa:62c4:2641]) by esv4-exctest.linkedin.biz ([::1]) with mapi id 14.03.0174.001; Mon, 3 Mar 2014 10:19:03 -0800 From: Chris Riccomini To: "users@kafka.apache.org" , "dev@kafka.apache.org" Subject: Re: New Consumer API discussion Thread-Topic: New Consumer API discussion Thread-Index: AQHPJpGvC/mA7vcNc0KxvUH8m7b5iZrPzPSA Date: Mon, 3 Mar 2014 18:19:03 +0000 Message-ID: In-Reply-To: Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: user-agent: Microsoft-MacOutlook/14.3.6.130613 x-originating-ip: [172.18.46.254] Content-Type: text/plain; charset="us-ascii" Content-ID: Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 X-Virus-Checked: Checked by ClamAV on apache.org Hey Guys, Sorry for the late follow up. Here are my questions/thoughts on the API: 1. Why is the config String->Object instead of String->String? 2. Are these Java docs correct? KafkaConsumer(java.util.Map configs) A consumer is instantiated by providing a set of key-value pairs as configuration and a ConsumerRebalanceCallback implementation There is no ConsumerRebalanceCallback parameter. 3. Would like to have a method: poll(long timeout, java.util.concurrent.TimeUnit timeUnit, TopicPartition... topicAndPartitionsToPoll) I see I can effectively do this by just fiddling with subscribe and unsubscribe before each poll. Is this a low-overhead operation? Can I just unsubscribe from everything after each poll, then re-subscribe to a topic the next iteration. I would probably be doing this in a fairly tight loop. 4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there are use cases for decoupling "what to do when no offset exists" from "what to do when I'm out of range". I might want to start from smallest the first time I run, but fail if I ever get offset out of range. 5. ENABLE_JMX could use Java docs, even though it's fairly self-explanatory. 6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or across all topic/partitions is useful. I believe it's per-topic/partition, right? That is, setting to 2 megs with two TopicAndPartitions would result in 4 megs worth of data coming in per fetch, right? 7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out? Retry, or throw exception? 8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and fetch requests? 9. What does SESSION_TIMEOUT_MS default to? 10. Is this consumer thread-safe? 11. How do you use a different offset management strategy? Your email implies that it's pluggable, but I don't see how. "The offset management strategy defaults to Kafka based offset management and the API provides a way for the user to use a customized offset store to manage the consumer's offsets." 12. If I wish to decouple the consumer from the offset checkpointing, is it OK to use Joel's offset management stuff directly, rather than through the consumer's commit API? Cheers, Chris On 2/10/14 10:54 AM, "Neha Narkhede" wrote: >As mentioned in previous emails, we are also working on a >re-implementation >of the consumer. I would like to use this email thread to discuss the >details of the public API. I would also like us to be picky about this >public api now so it is as good as possible and we don't need to break it >in the future. > >The best way to get a feel for the API is actually to take a look at the >javadocdoc/kafka/clients/consumer/KafkaConsumer.html>, >the hope is to get the api docs good enough so that it is >self-explanatory. >You can also take a look at the configs >here/kafka/clients/consumer/ConsumerConfig.html> > >Some background info on implementation: > >At a high level the primary difference in this consumer is that it removes >the distinction between the "high-level" and "low-level" consumer. The new >consumer API is non blocking and instead of returning a blocking iterator, >the consumer provides a poll() API that returns a list of records. We >think >this is better compared to the blocking iterators since it effectively >decouples the threading strategy used for processing messages from the >consumer. It is worth noting that the consumer is entirely single threaded >and runs in the user thread. The advantage is that it can be easily >rewritten in less multi-threading-friendly languages. The consumer batches >data and multiplexes I/O over TCP connections to each of the brokers it >communicates with, for high throughput. The consumer also allows long poll >to reduce the end-to-end message latency for low throughput data. > >The consumer provides a group management facility that supports the >concept >of a group with multiple consumer instances (just like the current >consumer). This is done through a custom heartbeat and group management >protocol transparent to the user. At the same time, it allows users the >option to subscribe to a fixed set of partitions and not use group >management at all. The offset management strategy defaults to Kafka based >offset management and the API provides a way for the user to use a >customized offset store to manage the consumer's offsets. > >A key difference in this consumer also is the fact that it does not depend >on zookeeper at all. > >More details about the new consumer design are >hereRewrite+Design> > >Please take a look at the new >APIkafka/clients/consumer/KafkaConsumer.html>and >give us any thoughts you may have. > >Thanks, >Neha