Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E2D9F200C2C for ; Fri, 3 Mar 2017 20:36:26 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E18F4160B5E; Fri, 3 Mar 2017 19:36:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D7930160B89 for ; Fri, 3 Mar 2017 20:36:25 +0100 (CET) Received: (qmail 74034 invoked by uid 500); 3 Mar 2017 19:36:24 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 73270 invoked by uid 99); 3 Mar 2017 19:36:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Mar 2017 19:36:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 520BDE943C; Fri, 3 Mar 2017 19:36:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Fri, 03 Mar 2017 19:36:37 -0000 Message-Id: <9e5656f70e7545eba32d3503e7d8735b@git.apache.org> In-Reply-To: <299e5ecd3ad34bdbbc279b8566e1c4a1@git.apache.org> References: <299e5ecd3ad34bdbbc279b8566e1c4a1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [14/14] camel git commit: Polished archived-at: Fri, 03 Mar 2017 19:36:27 -0000 Polished Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c2f424c2 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c2f424c2 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c2f424c2 Branch: refs/heads/master Commit: c2f424c211373514289722919230dc716828ee7f Parents: 992af11 Author: Claus Ibsen Authored: Fri Mar 3 20:34:40 2017 +0100 Committer: Claus Ibsen Committed: Fri Mar 3 20:34:40 2017 +0100 ---------------------------------------------------------------------- .../kafka/KafkaIdempotentRepository.java | 53 +++++++++++--------- .../src/test/resources/log4j2.properties | 2 +- 2 files changed, 31 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c2f424c2/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java index cfb2ec5..4a5b98c 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java @@ -31,7 +31,6 @@ import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.api.management.ManagedOperation; import org.apache.camel.api.management.ManagedResource; -import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.IdempotentRepository; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.IOHelper; @@ -53,7 +52,6 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * A Kafka topic-based implementation of {@link org.apache.camel.spi.IdempotentRepository}. * @@ -68,7 +66,7 @@ import org.slf4j.LoggerFactory; * same time), or replication factor of the topic. * * Each repository instance that uses the topic (e.g. typically on different machines running in parallel) controls its own - * consumer group, so in a cluster of 10 camel processes using the same topic each will control its own offset. + * consumer group, so in a cluster of 10 Camel processes using the same topic each will control its own offset. * * On startup, the instance subscribes to the topic and rewinds the offset to the beginning, rebuilding the cache to the * latest state. The cache will not be considered warmed up until one poll of {@link #pollDurationMs} in length @@ -232,8 +230,20 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot } @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public CamelContext getCamelContext() { + return this.camelContext; + } + + @Override protected void doStart() throws Exception { + ObjectHelper.notNull(camelContext, "camelContext"); StringHelper.notEmpty(topic, "topic"); + this.cache = Collections.synchronizedMap(new LRUCache<>(maxCacheSize)); if (consumerConfig == null) { @@ -271,32 +281,21 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot cacheReadyLatch = new CountDownLatch(1); topicPoller = new TopicPoller(consumer, cacheReadyLatch, pollDurationMs); - } - @Override - public void setCamelContext(CamelContext camelContext) { - // doStart() has already been called at this point - this.camelContext = camelContext; - ExecutorServiceManager executorServiceManager = camelContext.getExecutorServiceManager(); - executorService = executorServiceManager.newSingleThreadExecutor(this, "KafkaIdempotentRepository"); + // warm up the cache + executorService = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "KafkaIdempotentRepository"); executorService.submit(topicPoller); - log.info("Warming up cache"); + log.info("Warming up cache from topic {}", topic); try { if (cacheReadyLatch.await(30, TimeUnit.SECONDS)) { log.info("Cache OK"); } else { log.warn("Timeout waiting for cache warm-up from topic {}. Proceeding anyway. " - + "Duplicate records may not be detected.", topic); + + "Duplicate records may not be detected.", topic); } } catch (InterruptedException e) { - log.error("Interrupted: {}", e.getMessage()); + log.warn("Interrupted while warming up cache. This exception is ignored.", e.getMessage()); } - - } - - @Override - public CamelContext getCamelContext() { - return this.camelContext; } @Override @@ -305,10 +304,12 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot topicPoller.setRunning(false); try { if (topicPoller.getShutdownLatch().await(30, TimeUnit.SECONDS)) { - log.info("Expired waiting on topicPoller to shut down"); + log.info("Cache from topic {} shutdown successfully", topic); + } else { + log.warn("Timeout waiting for cache to shutdown from topic {}. Proceeding anyway.", topic); } } catch (InterruptedException e) { - log.info("Interrupted waiting on latch: {}", e.getMessage()); + log.warn("Interrupted waiting on shutting down cache due {}. This exception is ignored.", e.getMessage()); } camelContext.getExecutorServiceManager().shutdown(executorService); @@ -387,8 +388,8 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot private final CountDownLatch cacheReadyLatch; private final int pollDurationMs; - private CountDownLatch shutdownLatch = new CountDownLatch(1); - private AtomicBoolean running = new AtomicBoolean(true); + private final CountDownLatch shutdownLatch = new CountDownLatch(1); + private final AtomicBoolean running = new AtomicBoolean(true); TopicPoller(Consumer consumer, CountDownLatch cacheReadyLatch, int pollDurationMs) { this.consumer = consumer; @@ -458,5 +459,11 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot boolean isRunning() { return running.get(); } + + @Override + public String toString() { + return "TopicPoller[" + topic + "]"; + } } + } http://git-wip-us.apache.org/repos/asf/camel/blob/c2f424c2/components/camel-kafka/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/resources/log4j2.properties b/components/camel-kafka/src/test/resources/log4j2.properties index 07bcc08..ced0200 100644 --- a/components/camel-kafka/src/test/resources/log4j2.properties +++ b/components/camel-kafka/src/test/resources/log4j2.properties @@ -26,7 +26,7 @@ appender.stdout.layout.type = PatternLayout appender.stdout.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n rootLogger.level = WARN -rootLogger.appenderRef.out.ref = stdout +rootLogger.appenderRef.out.ref = out logger.camel.name=org.apache.camel logger.camel.level=INFO