camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject camel git commit: CAMEL-8636 Committed the last batch of message when the auto commit is false
Date Wed, 15 Apr 2015 06:46:11 GMT
Repository: camel
Updated Branches:
  refs/heads/camel-2.15.x 31cbc3c1c -> 9912202b4


CAMEL-8636 Committed the last batch of message when  the auto commit is false


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9912202b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9912202b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9912202b

Branch: refs/heads/camel-2.15.x
Commit: 9912202b4e1b184dcae69d806d43842fa4af2ecd
Parents: 31cbc3c
Author: Willem Jiang <willem.jiang@gmail.com>
Authored: Wed Apr 15 14:39:41 2015 +0800
Committer: Willem Jiang <willem.jiang@gmail.com>
Committed: Wed Apr 15 14:39:41 2015 +0800

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    | 24 +++++++++++---------
 1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9912202b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index d6b49d2..46d258d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -136,29 +136,31 @@ public class KafkaConsumer extends DefaultConsumer {
             boolean consumerTimeout;
             MessageAndMetadata<byte[], byte[]> mm = null;
             ConsumerIterator<byte[], byte[]> it = stream.iterator();
-
-            while (true) {
+            boolean hasNext = true;
+            while (hasNext) {
 
                 try {
                     consumerTimeout = false;
                     if (it.hasNext()) {
                         mm = it.next();
+                        Exchange exchange = endpoint.createKafkaExchange(mm);
+                        try {
+                            processor.process(exchange);
+                        } catch (Exception e) {
+                            LOG.error(e.getMessage(), e);
+                        }
+                        processed++;
                     } else {
-                        break;
-                    }
-                    Exchange exchange = endpoint.createKafkaExchange(mm);
-                    try {
-                        processor.process(exchange);
-                    } catch (Exception e) {
-                        LOG.error(e.getMessage(), e);
+                        // we don't need to process the message
+                        hasNext = false;
                     }
-                    processed++;
                 } catch (ConsumerTimeoutException e) {
                     LOG.debug(e.getMessage(), e);
                     consumerTimeout = true;
                 }
 
-                if (processed >= endpoint.getBatchSize() || consumerTimeout) {
+                if (processed >= endpoint.getBatchSize() || consumerTimeout 
+                    || (processed > 0 && !hasNext)) { // Need to commit the offset
for the last round
                     try {
                         berrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
                         if (!consumerTimeout) {


Mime
View raw message