Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AB0CA200C2C for ; Fri, 3 Mar 2017 20:36:26 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A9923160B5E; Fri, 3 Mar 2017 19:36:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A2A90160B85 for ; Fri, 3 Mar 2017 20:36:25 +0100 (CET) Received: (qmail 73838 invoked by uid 500); 3 Mar 2017 19:36:24 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 73253 invoked by uid 99); 3 Mar 2017 19:36:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Mar 2017 19:36:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 49282E009E; Fri, 3 Mar 2017 19:36:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Fri, 03 Mar 2017 19:36:34 -0000 Message-Id: In-Reply-To: <299e5ecd3ad34bdbbc279b8566e1c4a1@git.apache.org> References: <299e5ecd3ad34bdbbc279b8566e1c4a1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/14] camel git commit: Refactored initialisation to allow property-based construction via XML DSL. Enabled consumer poll duration to be externally defined. archived-at: Fri, 03 Mar 2017 19:36:26 -0000 Refactored initialisation to allow property-based construction via XML DSL. Enabled consumer poll duration to be externally defined. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ff7fec34 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ff7fec34 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ff7fec34 Branch: refs/heads/master Commit: ff7fec34141a00a7c1415675b4dffc38dfca62ed Parents: e068386 Author: Jakub Korab Authored: Fri Mar 3 13:44:15 2017 +0000 Committer: Claus Ibsen Committed: Fri Mar 3 20:03:08 2017 +0100 ---------------------------------------------------------------------- .../kafka/KafkaIdempotentRepository.java | 163 ++++++++++++++++--- 1 file changed, 138 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ff7fec34/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java index 95c2ddd..ad0b670 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java @@ -75,15 +75,22 @@ import org.slf4j.LoggerFactory; public class KafkaIdempotentRepository extends ServiceSupport implements IdempotentRepository, CamelContextAware { private static final int DEFAULT_MAXIMUM_CACHE_SIZE = 1000; + private static final int DEFAULT_POLL_DURATION_MS = 100; private final Logger log = LoggerFactory.getLogger(this.getClass()); - private final Map cache; private final AtomicLong duplicateCount = new AtomicLong(0); - private final String topic; - private final Properties producerConfig; - private final Properties consumerConfig; + // configurable + private String topic; + private String bootstrapServers; + private Properties producerConfig; + private Properties consumerConfig; + private int maxCacheSize = DEFAULT_MAXIMUM_CACHE_SIZE; + private int pollDurationMs = DEFAULT_POLL_DURATION_MS; + + // internal properties + private Map cache; private Consumer consumer; private Producer producer; private TopicPoller topicPoller; @@ -98,41 +105,147 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot clear } + /** + * No-op constructor for XML/property-based object initialisation. From Java, prefer one of the other constructors. + */ + public KafkaIdempotentRepository() { + } + public KafkaIdempotentRepository(String topic, String bootstrapServers) { - this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE); + this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS); } - public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize) { + public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize, int pollDurationMs) { StringHelper.notEmpty(topic, "topic"); StringHelper.notEmpty(bootstrapServers, "bootstrapServers"); - Properties consumerConfig = new Properties(); - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - Properties producerConfig = new Properties(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + this.topic = topic; + this.bootstrapServers = bootstrapServers; + this.maxCacheSize = maxCacheSize; + this.pollDurationMs = pollDurationMs; + } + + public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig) { + this(topic, consumerConfig, producerConfig, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS); + } + public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize, int pollDurationMs) { this.topic = topic; this.consumerConfig = consumerConfig; this.producerConfig = producerConfig; - this.cache = Collections.synchronizedMap(new LRUCache<>(maxCacheSize)); + this.maxCacheSize = maxCacheSize; + this.pollDurationMs = pollDurationMs; } - public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig) { - this(topic, consumerConfig, producerConfig, DEFAULT_MAXIMUM_CACHE_SIZE); + public String getTopic() { + return topic; } - public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig, int maxCacheSize) { - StringHelper.notEmpty(topic, "topic"); + /** + * Sets the name of the Kafka topic used by this idempotent repository. Each functionally-separate repository + * should use a different topic. + * @param topic The topic name. + */ + public void setTopic(String topic) { this.topic = topic; - ObjectHelper.notNull(consumerConfig, "consumerConfig"); - this.consumerConfig = consumerConfig; - ObjectHelper.notNull(producerConfig, "producerConfig"); + } + + public String getBootstrapServers() { + return bootstrapServers; + } + + /** + * Sets the
bootstrap.servers
property on the internal Kafka producer and consumer. Use this as shorthand + * if not setting {@link #consumerConfig} and {@link #producerConfig}. If used, this component will apply sensible + * default configurations for the producer and consumer. + * @param bootstrapServers The
bootstrap.servers
value to use. + */ + public void setBootstrapServers(String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + } + + public Properties getProducerConfig() { + return producerConfig; + } + + /** + * Sets the properties that will be used by the Kafka producer. Overrides {@link #bootstrapServers}, so must define + * the
bootstrap.servers
property itself. + * + * Prefer using {@link #bootstrapServers} for default configuration unless you specifically need non-standard + * configuration options such as SSL/SASL. + * @param producerConfig The producer configuration properties. + */ + public void setProducerConfig(Properties producerConfig) { this.producerConfig = producerConfig; - this.cache = Collections.synchronizedMap(new LRUCache<>(maxCacheSize)); + } + + public Properties getConsumerConfig() { + return consumerConfig; + } + + /** + * Sets the properties that will be used by the Kafka consumer. Overrides {@link #bootstrapServers}, so must define + * the
bootstrap.servers
property itself. + * + * Prefer using {@link #bootstrapServers} for default configuration unless you specifically need non-standard + * configuration options such as SSL/SASL. + * @param consumerConfig The consumer configuration properties. + */ + public void setConsumerConfig(Properties consumerConfig) { + this.consumerConfig = consumerConfig; + } + + public int getMaxCacheSize() { + return maxCacheSize; + } + + /** + * Sets the maximum size of the local key cache. + * @param maxCacheSize The maximum key cache size. + */ + public void setMaxCacheSize(int maxCacheSize) { + this.maxCacheSize = maxCacheSize; + } + + public int getPollDurationMs() { + return pollDurationMs; + } + + /** + * Sets the poll duration of the Kafka consumer. The local caches are updated immediately; this value will affect + * how far behind other peers in the cluster are, which are updating their caches from the topic, relative to the + * idempotent consumer instance issued the cache action message. + * + * The default value of this is {@link #DEFAULT_POLL_DURATION_MS}. If setting this value explicitly, be aware that + * there is a tradeoff between the remote cache liveness and the volume of network traffic between this repository's + * consumer and the Kafka brokers. + * @param pollDurationMs The poll duration in milliseconds. + */ + public void setPollDurationMs(int pollDurationMs) { + this.pollDurationMs = pollDurationMs; } @Override protected void doStart() throws Exception { + StringHelper.notEmpty(topic, "topic"); + this.cache = Collections.synchronizedMap(new LRUCache<>(maxCacheSize)); + + if (consumerConfig == null) { + consumerConfig = new Properties(); + StringHelper.notEmpty(bootstrapServers, "bootstrapServers"); + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } + + if (producerConfig == null) { + producerConfig = new Properties(); + StringHelper.notEmpty(bootstrapServers, "bootstrapServers"); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } + + ObjectHelper.notNull(consumerConfig, "consumerConfig"); + ObjectHelper.notNull(producerConfig, "producerConfig"); + // each consumer instance must have control over its own offset, so assign a groupID at random String groupId = UUID.randomUUID().toString(); log.debug("Creating consumer with {}[{}]", ConsumerConfig.GROUP_ID_CONFIG, groupId); @@ -146,13 +259,13 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - // set up the producer to remove all batching on send + // set up the producer to remove all batching on send, we want all sends to be fully synchronous producerConfig.putIfAbsent(ProducerConfig.ACKS_CONFIG, "1"); producerConfig.putIfAbsent(ProducerConfig.BATCH_SIZE_CONFIG, "0"); producer = new KafkaProducer<>(producerConfig); cacheReadyLatch = new CountDownLatch(1); - topicPoller = new TopicPoller(consumer, cacheReadyLatch); + topicPoller = new TopicPoller(consumer, cacheReadyLatch, pollDurationMs); } @Override @@ -264,18 +377,18 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot private class TopicPoller implements Runnable { - private static final int POLL_DURATION_MS = 100; - private final Logger log = LoggerFactory.getLogger(this.getClass()); private final Consumer consumer; private final CountDownLatch cacheReadyLatch; + private final int pollDurationMs; private CountDownLatch shutdownLatch = new CountDownLatch(1); private AtomicBoolean running = new AtomicBoolean(true); - TopicPoller(Consumer consumer, CountDownLatch cacheReadyLatch) { + TopicPoller(Consumer consumer, CountDownLatch cacheReadyLatch, int pollDurationMs) { this.consumer = consumer; this.cacheReadyLatch = cacheReadyLatch; + this.pollDurationMs = pollDurationMs; } @Override @@ -287,7 +400,7 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot POLL_LOOP: while (running.get()) { log.trace("Polling"); - ConsumerRecords consumerRecords = consumer.poll(POLL_DURATION_MS); + ConsumerRecords consumerRecords = consumer.poll(pollDurationMs); if (consumerRecords.isEmpty()) { // the first time this happens, we can assume that we have consumed all // messages up to this point