kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2478: Fix manual committing example in javadoc
Date Tue, 26 Jan 2016 19:05:09 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 920326a01 -> b84fef2ae


KAFKA-2478: Fix manual committing example in javadoc

Committing before inserting all records into the database
might lead to some records being lost.

I've changed the example to commit only after all records
returned by `poll` are inserted into the database.

Author: Dmitry Stratiychuk <dstratiychuk@yammer-inc.com>

Reviewers: Jason Gustafson, Guozhang Wang

Closes #210 from shtratos/KAFKA-2478

(cherry picked from commit 82c219149027e8d96840af98d32fb1b877ab4ec2)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b84fef2a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b84fef2a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b84fef2a

Branch: refs/heads/0.9.0
Commit: b84fef2ae092e57724e144a6299bade4113c80a8
Parents: 920326a
Author: Dmitry Stratiychuk <dstratiychuk@yammer-inc.com>
Authored: Tue Jan 26 11:04:58 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jan 26 11:05:05 2016 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java     | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b84fef2a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 04045af..10fd8b9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -145,7 +145,7 @@ import java.util.regex.Pattern;
  *     props.put(&quot;session.timeout.ms&quot;, &quot;30000&quot;);
  *     props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
  *     props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
- *     KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;String,
String&gt;(props);
+ *     KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;&gt;(props);
  *     consumer.subscribe(Arrays.asList(&quot;foo&quot;, &quot;bar&quot;));
  *     while (true) {
  *         ConsumerRecords&lt;String, String&gt; records = consumer.poll(100);
@@ -200,19 +200,19 @@ import java.util.regex.Pattern;
  *     props.put(&quot;session.timeout.ms&quot;, &quot;30000&quot;);
  *     props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
  *     props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
- *     KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;String,
String&gt;(props);
+ *     KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;&gt;(props);
  *     consumer.subscribe(Arrays.asList(&quot;foo&quot;, &quot;bar&quot;));
- *     int commitInterval = 200;
- *     List&lt;ConsumerRecord&lt;String, String&gt;&gt; buffer = new ArrayList&lt;ConsumerRecord&lt;String,
String&gt;&gt;();
+ *     final int minBatchSize = 200;
+ *     List&lt;ConsumerRecord&lt;String, String&gt;&gt; buffer = new ArrayList&lt;&gt;();
  *     while (true) {
  *         ConsumerRecords&lt;String, String&gt; records = consumer.poll(100);
  *         for (ConsumerRecord&lt;String, String&gt; record : records) {
  *             buffer.add(record);
- *             if (buffer.size() &gt;= commitInterval) {
- *                 insertIntoDb(buffer);
- *                 consumer.commitSync();
- *                 buffer.clear();
- *             }
+ *         }
+ *         if (buffer.size() &gt;= minBatchSize) {
+ *             insertIntoDb(buffer);
+ *             consumer.commitSync();
+ *             buffer.clear();
  *         }
  *     }
  * </pre>


Mime
View raw message