camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acosent...@apache.org
Subject [2/2] camel git commit: Fixed CS
Date Fri, 29 Apr 2016 08:52:50 GMT
Fixed CS


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1eeba05d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1eeba05d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1eeba05d

Branch: refs/heads/master
Commit: 1eeba05d48a0ea12cc8bb4741bb7163d73281022
Parents: a107781
Author: Andrea Cosentino <ancosen@gmail.com>
Authored: Fri Apr 29 10:50:57 2016 +0200
Committer: Andrea Cosentino <ancosen@gmail.com>
Committed: Fri Apr 29 10:52:24 2016 +0200

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    | 51 +++++++++-----------
 1 file changed, 23 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1eeba05d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 82600e7..8649a46 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -94,7 +94,7 @@ public class KafkaConsumer extends DefaultConsumer {
         private final String threadId;
         private final Properties kafkaProps;
 
-        public KafkaFetchRecords(String topicName, String id, Properties kafkaProps) {
+        KafkaFetchRecords(String topicName, String id, Properties kafkaProps) {
             this.topicName = topicName;
             this.threadId = topicName + "-" + "Thread " + id;
             this.kafkaProps = kafkaProps;
@@ -117,33 +117,28 @@ public class KafkaConsumer extends DefaultConsumer {
                 }
                 while (isRunAllowed() && !isSuspendingOrSuspended()) {
                     ConsumerRecords<Object, Object> allRecords = consumer.poll(Long.MAX_VALUE);
-                    // START : CAMEL-9823
-					for (TopicPartition partition : allRecords.partitions()) {
-						List<ConsumerRecord<Object, Object>> partitionRecords = allRecords
-								.records(partition);
-	                    for (ConsumerRecord<Object, Object> record : partitionRecords)
{
-	                        if (LOG.isTraceEnabled()) {
-	                            LOG.trace("partition = {}, offset = {}, key = {}, value = {}",
record.partition(), record.offset(), record.key(), record.value());
-	                        }
-	                        Exchange exchange = endpoint.createKafkaExchange(record);
-	                        try {
-	                            processor.process(exchange);
-	                        } catch (Exception e) {
-	                            getExceptionHandler().handleException("Error during processing",
exchange, e);
-	                        }
-	                    }
-						// if autocommit is false
-						if (endpoint.isAutoCommitEnable() != null
-								&& !endpoint.isAutoCommitEnable()) {
-							long partitionLastoffset = partitionRecords.get(
-									partitionRecords.size() - 1).offset();
-							consumer.commitSync(Collections.singletonMap(
-									partition, new OffsetAndMetadata(
-											partitionLastoffset + 1)));
-						}
-	                    
-					}
-					// END : CAMEL-9823
+                    for (TopicPartition partition : allRecords.partitions()) {
+                        List<ConsumerRecord<Object, Object>> partitionRecords
= allRecords
+                            .records(partition);
+                        for (ConsumerRecord<Object, Object> record : partitionRecords)
{
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("partition = {}, offset = {}, key = {}, value =
{}", record.partition(), record.offset(), record.key(), record.value());
+                            }
+                            Exchange exchange = endpoint.createKafkaExchange(record);
+                            try {
+                                processor.process(exchange);
+                            } catch (Exception e) {
+                                getExceptionHandler().handleException("Error during processing",
exchange, e);
+                            }
+                        }
+                        // if autocommit is false
+                        if (endpoint.isAutoCommitEnable() != null
+                            && !endpoint.isAutoCommitEnable()) {
+                            long partitionLastoffset = partitionRecords.get(partitionRecords.size()
- 1).offset();
+                            consumer.commitSync(Collections.singletonMap(
+                                partition, new OffsetAndMetadata(partitionLastoffset + 1)));
+                        }
+                    }
                 }
                 LOG.debug("Unsubscribing {} from topic {}", threadId, topicName);
                 consumer.unsubscribe();


Mime
View raw message