Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D672F18778 for ; Tue, 26 Jan 2016 19:05:09 +0000 (UTC) Received: (qmail 32616 invoked by uid 500); 26 Jan 2016 19:05:09 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 32585 invoked by uid 500); 26 Jan 2016 19:05:09 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 32576 invoked by uid 99); 26 Jan 2016 19:05:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jan 2016 19:05:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 311BFE00D6; Tue, 26 Jan 2016 19:05:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Message-Id: <91ef1672ed17494dab18cf20c7447a0c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-2478: Fix manual committing example in javadoc Date: Tue, 26 Jan 2016 19:05:09 +0000 (UTC) Repository: kafka Updated Branches: refs/heads/0.9.0 920326a01 -> b84fef2ae KAFKA-2478: Fix manual committing example in javadoc Committing before inserting all records into the database might lead to some records being lost. I've changed the example to commit only after all records returned by `poll` are inserted into the database. Author: Dmitry Stratiychuk Reviewers: Jason Gustafson, Guozhang Wang Closes #210 from shtratos/KAFKA-2478 (cherry picked from commit 82c219149027e8d96840af98d32fb1b877ab4ec2) Signed-off-by: Guozhang Wang Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b84fef2a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b84fef2a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b84fef2a Branch: refs/heads/0.9.0 Commit: b84fef2ae092e57724e144a6299bade4113c80a8 Parents: 920326a Author: Dmitry Stratiychuk Authored: Tue Jan 26 11:04:58 2016 -0800 Committer: Guozhang Wang Committed: Tue Jan 26 11:05:05 2016 -0800 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b84fef2a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 04045af..10fd8b9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -145,7 +145,7 @@ import java.util.regex.Pattern; * props.put("session.timeout.ms", "30000"); * props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); * props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - * KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); + * KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); * consumer.subscribe(Arrays.asList("foo", "bar")); * while (true) { * ConsumerRecords<String, String> records = consumer.poll(100); @@ -200,19 +200,19 @@ import java.util.regex.Pattern; * props.put("session.timeout.ms", "30000"); * props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); * props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - * KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); + * KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); * consumer.subscribe(Arrays.asList("foo", "bar")); - * int commitInterval = 200; - * List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>(); + * final int minBatchSize = 200; + * List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); * while (true) { * ConsumerRecords<String, String> records = consumer.poll(100); * for (ConsumerRecord<String, String> record : records) { * buffer.add(record); - * if (buffer.size() >= commitInterval) { - * insertIntoDb(buffer); - * consumer.commitSync(); - * buffer.clear(); - * } + * } + * if (buffer.size() >= minBatchSize) { + * insertIntoDb(buffer); + * consumer.commitSync(); + * buffer.clear(); * } * } *