camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [14/14] camel git commit: Polished
Date Fri, 03 Mar 2017 19:36:37 GMT
Polished


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c2f424c2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c2f424c2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c2f424c2

Branch: refs/heads/master
Commit: c2f424c211373514289722919230dc716828ee7f
Parents: 992af11
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Fri Mar 3 20:34:40 2017 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Fri Mar 3 20:34:40 2017 +0100

----------------------------------------------------------------------
 .../kafka/KafkaIdempotentRepository.java        | 53 +++++++++++---------
 .../src/test/resources/log4j2.properties        |  2 +-
 2 files changed, 31 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c2f424c2/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index cfb2ec5..4a5b98c 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -31,7 +31,6 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.IOHelper;
@@ -53,7 +52,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * A Kafka topic-based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
  *
@@ -68,7 +66,7 @@ import org.slf4j.LoggerFactory;
  * same time), or replication factor of the topic.
  *
  * Each repository instance that uses the topic (e.g. typically on different machines running
in parallel) controls its own
- * consumer group, so in a cluster of 10 camel processes using the same topic each will control
its own offset.
+ * consumer group, so in a cluster of 10 Camel processes using the same topic each will control
its own offset.
  *
  * On startup, the instance subscribes to the topic and rewinds the offset to the beginning,
rebuilding the cache to the
  * latest state. The cache will not be considered warmed up until one poll of {@link #pollDurationMs}
in length
@@ -232,8 +230,20 @@ public class KafkaIdempotentRepository extends ServiceSupport implements
Idempot
     }
 
     @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return this.camelContext;
+    }
+
+    @Override
     protected void doStart() throws Exception {
+        ObjectHelper.notNull(camelContext, "camelContext");
         StringHelper.notEmpty(topic, "topic");
+
         this.cache = Collections.synchronizedMap(new LRUCache<>(maxCacheSize));
 
         if (consumerConfig == null) {
@@ -271,32 +281,21 @@ public class KafkaIdempotentRepository extends ServiceSupport implements
Idempot
 
         cacheReadyLatch = new CountDownLatch(1);
         topicPoller = new TopicPoller(consumer, cacheReadyLatch, pollDurationMs);
-    }
 
-    @Override
-    public void setCamelContext(CamelContext camelContext) {
-        // doStart() has already been called at this point
-        this.camelContext = camelContext;
-        ExecutorServiceManager executorServiceManager = camelContext.getExecutorServiceManager();
-        executorService = executorServiceManager.newSingleThreadExecutor(this, "KafkaIdempotentRepository");
+        // warm up the cache
+        executorService = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this,
"KafkaIdempotentRepository");
         executorService.submit(topicPoller);
-        log.info("Warming up cache");
+        log.info("Warming up cache from topic {}", topic);
         try {
             if (cacheReadyLatch.await(30, TimeUnit.SECONDS)) {
                 log.info("Cache OK");
             } else {
                 log.warn("Timeout waiting for cache warm-up from topic {}. Proceeding anyway.
"
-                        + "Duplicate records may not be detected.", topic);
+                    + "Duplicate records may not be detected.", topic);
             }
         } catch (InterruptedException e) {
-            log.error("Interrupted: {}", e.getMessage());
+            log.warn("Interrupted while warming up cache. This exception is ignored.", e.getMessage());
         }
-
-    }
-
-    @Override
-    public CamelContext getCamelContext() {
-        return this.camelContext;
     }
 
     @Override
@@ -305,10 +304,12 @@ public class KafkaIdempotentRepository extends ServiceSupport implements
Idempot
         topicPoller.setRunning(false);
         try {
             if (topicPoller.getShutdownLatch().await(30, TimeUnit.SECONDS)) {
-                log.info("Expired waiting on topicPoller to shut down");
+                log.info("Cache from topic {} shutdown successfully", topic);
+            } else {
+                log.warn("Timeout waiting for cache to shutdown from topic {}. Proceeding
anyway.", topic);
             }
         } catch (InterruptedException e) {
-            log.info("Interrupted waiting on latch: {}", e.getMessage());
+            log.warn("Interrupted waiting on shutting down cache due {}. This exception is
ignored.", e.getMessage());
         }
         camelContext.getExecutorServiceManager().shutdown(executorService);
 
@@ -387,8 +388,8 @@ public class KafkaIdempotentRepository extends ServiceSupport implements
Idempot
         private final CountDownLatch cacheReadyLatch;
         private final int pollDurationMs;
 
-        private CountDownLatch shutdownLatch = new CountDownLatch(1);
-        private AtomicBoolean running = new AtomicBoolean(true);
+        private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+        private final AtomicBoolean running = new AtomicBoolean(true);
 
         TopicPoller(Consumer<String, String> consumer, CountDownLatch cacheReadyLatch,
int pollDurationMs) {
             this.consumer = consumer;
@@ -458,5 +459,11 @@ public class KafkaIdempotentRepository extends ServiceSupport implements
Idempot
         boolean isRunning() {
             return running.get();
         }
+
+        @Override
+        public String toString() {
+            return "TopicPoller[" + topic + "]";
+        }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/c2f424c2/components/camel-kafka/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/resources/log4j2.properties b/components/camel-kafka/src/test/resources/log4j2.properties
index 07bcc08..ced0200 100644
--- a/components/camel-kafka/src/test/resources/log4j2.properties
+++ b/components/camel-kafka/src/test/resources/log4j2.properties
@@ -26,7 +26,7 @@ appender.stdout.layout.type = PatternLayout
 appender.stdout.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
 
 rootLogger.level = WARN
-rootLogger.appenderRef.out.ref = stdout
+rootLogger.appenderRef.out.ref = out
 
 logger.camel.name=org.apache.camel
 logger.camel.level=INFO


Mime
View raw message