camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [11/14] camel git commit: Refactored initialisation to allow property-based construction via XML DSL. Enabled consumer poll duration to be externally defined.
Date Fri, 03 Mar 2017 19:36:34 GMT
Refactored initialisation to allow property-based construction via XML DSL. Enabled consumer
poll duration to be externally defined.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ff7fec34
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ff7fec34
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ff7fec34

Branch: refs/heads/master
Commit: ff7fec34141a00a7c1415675b4dffc38dfca62ed
Parents: e068386
Author: Jakub Korab <jakub.korab@gmail.com>
Authored: Fri Mar 3 13:44:15 2017 +0000
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Fri Mar 3 20:03:08 2017 +0100

----------------------------------------------------------------------
 .../kafka/KafkaIdempotentRepository.java        | 163 ++++++++++++++++---
 1 file changed, 138 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ff7fec34/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index 95c2ddd..ad0b670 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -75,15 +75,22 @@ import org.slf4j.LoggerFactory;
 public class KafkaIdempotentRepository extends ServiceSupport implements IdempotentRepository<String>,
CamelContextAware {
 
     private static final int DEFAULT_MAXIMUM_CACHE_SIZE = 1000;
+    private static final int DEFAULT_POLL_DURATION_MS = 100;
 
     private final Logger log = LoggerFactory.getLogger(this.getClass());
-    private final Map<String, Object> cache;
 
     private final AtomicLong duplicateCount = new AtomicLong(0);
-    private final String topic;
-    private final Properties producerConfig;
-    private final Properties consumerConfig;
 
+    // configurable
+    private String topic;
+    private String bootstrapServers;
+    private Properties producerConfig;
+    private Properties consumerConfig;
+    private int maxCacheSize = DEFAULT_MAXIMUM_CACHE_SIZE;
+    private int pollDurationMs = DEFAULT_POLL_DURATION_MS;
+
+    // internal properties
+    private Map<String, Object> cache;
     private Consumer<String, String> consumer;
     private Producer<String, String> producer;
     private TopicPoller topicPoller;
@@ -98,41 +105,147 @@ public class KafkaIdempotentRepository extends ServiceSupport implements
Idempot
         clear
     }
 
+    /**
+     * No-op constructor for XML/property-based object initialisation. From Java, prefer
one of the other constructors.
+     */
+    public KafkaIdempotentRepository() {
+    }
+
     public KafkaIdempotentRepository(String topic, String bootstrapServers) {
-        this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE);
+        this(topic, bootstrapServers, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS);
     }
 
-    public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize)
{
+    public KafkaIdempotentRepository(String topic, String bootstrapServers, int maxCacheSize,
int pollDurationMs) {
         StringHelper.notEmpty(topic, "topic");
         StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
-        Properties consumerConfig = new Properties();
-        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
 
-        Properties producerConfig = new Properties();
-        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        this.topic = topic;
+        this.bootstrapServers = bootstrapServers;
+        this.maxCacheSize = maxCacheSize;
+        this.pollDurationMs = pollDurationMs;
+    }
+
+    public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties
producerConfig) {
+        this(topic, consumerConfig, producerConfig, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS);
+    }
 
+    public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties
producerConfig, int maxCacheSize, int pollDurationMs) {
         this.topic = topic;
         this.consumerConfig = consumerConfig;
         this.producerConfig = producerConfig;
-        this.cache = Collections.synchronizedMap(new LRUCache<>(maxCacheSize));
+        this.maxCacheSize = maxCacheSize;
+        this.pollDurationMs = pollDurationMs;
     }
 
-    public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties
producerConfig) {
-        this(topic, consumerConfig, producerConfig, DEFAULT_MAXIMUM_CACHE_SIZE);
+    public String getTopic() {
+        return topic;
     }
 
-    public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties
producerConfig, int maxCacheSize) {
-        StringHelper.notEmpty(topic, "topic");
+    /**
+     * Sets the name of the Kafka topic used by this idempotent repository. Each functionally-separate
repository
+     * should use a different topic.
+     * @param topic The topic name.
+     */
+    public void setTopic(String topic) {
         this.topic = topic;
-        ObjectHelper.notNull(consumerConfig, "consumerConfig");
-        this.consumerConfig = consumerConfig;
-        ObjectHelper.notNull(producerConfig, "producerConfig");
+    }
+
+    public String getBootstrapServers() {
+        return bootstrapServers;
+    }
+
+    /**
+     * Sets the <pre>bootstrap.servers</pre> property on the internal Kafka producer
and consumer. Use this as shorthand
+     * if not setting {@link #consumerConfig} and {@link #producerConfig}. If used, this
component will apply sensible
+     * default configurations for the producer and consumer.
+     * @param bootstrapServers The <pre>bootstrap.servers</pre> value to use.
+     */
+    public void setBootstrapServers(String bootstrapServers) {
+        this.bootstrapServers = bootstrapServers;
+    }
+
+    public Properties getProducerConfig() {
+        return producerConfig;
+    }
+
+    /**
+     * Sets the properties that will be used by the Kafka producer. Overrides {@link #bootstrapServers},
so must define
+     * the <pre>bootstrap.servers</pre> property itself.
+     *
+     * Prefer using {@link #bootstrapServers} for default configuration unless you specifically
need non-standard
+     * configuration options such as SSL/SASL.
+     * @param producerConfig The producer configuration properties.
+     */
+    public void setProducerConfig(Properties producerConfig) {
         this.producerConfig = producerConfig;
-        this.cache = Collections.synchronizedMap(new LRUCache<>(maxCacheSize));
+    }
+
+    public Properties getConsumerConfig() {
+        return consumerConfig;
+    }
+
+    /**
+     * Sets the properties that will be used by the Kafka consumer. Overrides {@link #bootstrapServers},
so must define
+     * the <pre>bootstrap.servers</pre> property itself.
+     *
+     * Prefer using {@link #bootstrapServers} for default configuration unless you specifically
need non-standard
+     * configuration options such as SSL/SASL.
+     * @param consumerConfig The consumer configuration properties.
+     */
+    public void setConsumerConfig(Properties consumerConfig) {
+        this.consumerConfig = consumerConfig;
+    }
+
+    public int getMaxCacheSize() {
+        return maxCacheSize;
+    }
+
+    /**
+     * Sets the maximum size of the local key cache.
+     * @param maxCacheSize The maximum key cache size.
+     */
+    public void setMaxCacheSize(int maxCacheSize) {
+        this.maxCacheSize = maxCacheSize;
+    }
+
+    public int getPollDurationMs() {
+        return pollDurationMs;
+    }
+
+    /**
+     * Sets the poll duration of the Kafka consumer. The local caches are updated immediately;
this value will affect
+     * how far behind other peers in the cluster are, which are updating their caches from
the topic, relative to the
+     * idempotent consumer instance issued the cache action message.
+     *
+     * The default value of this is {@link #DEFAULT_POLL_DURATION_MS}. If setting this value
explicitly, be aware that
+     * there is a tradeoff between the remote cache liveness and the volume of network traffic
between this repository's
+     * consumer and the Kafka brokers.
+     * @param pollDurationMs The poll duration in milliseconds.
+     */
+    public void setPollDurationMs(int pollDurationMs) {
+        this.pollDurationMs = pollDurationMs;
     }
 
     @Override
     protected void doStart() throws Exception {
+        StringHelper.notEmpty(topic, "topic");
+        this.cache = Collections.synchronizedMap(new LRUCache<>(maxCacheSize));
+
+        if (consumerConfig == null) {
+            consumerConfig = new Properties();
+            StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
+            consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        }
+
+        if (producerConfig == null) {
+            producerConfig = new Properties();
+            StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
+            producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        }
+
+        ObjectHelper.notNull(consumerConfig, "consumerConfig");
+        ObjectHelper.notNull(producerConfig, "producerConfig");
+
         // each consumer instance must have control over its own offset, so assign a groupID
at random
         String groupId = UUID.randomUUID().toString();
         log.debug("Creating consumer with {}[{}]", ConsumerConfig.GROUP_ID_CONFIG, groupId);
@@ -146,13 +259,13 @@ public class KafkaIdempotentRepository extends ServiceSupport implements
Idempot
 
         producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        // set up the producer to remove all batching on send
+        // set up the producer to remove all batching on send, we want all sends to be fully
synchronous
         producerConfig.putIfAbsent(ProducerConfig.ACKS_CONFIG, "1");
         producerConfig.putIfAbsent(ProducerConfig.BATCH_SIZE_CONFIG, "0");
         producer = new KafkaProducer<>(producerConfig);
 
         cacheReadyLatch = new CountDownLatch(1);
-        topicPoller = new TopicPoller(consumer, cacheReadyLatch);
+        topicPoller = new TopicPoller(consumer, cacheReadyLatch, pollDurationMs);
     }
 
     @Override
@@ -264,18 +377,18 @@ public class KafkaIdempotentRepository extends ServiceSupport implements
Idempot
 
     private class TopicPoller implements Runnable {
 
-        private static final int POLL_DURATION_MS = 100;
-
         private final Logger log = LoggerFactory.getLogger(this.getClass());
         private final Consumer<String, String> consumer;
         private final CountDownLatch cacheReadyLatch;
+        private final int pollDurationMs;
 
         private CountDownLatch shutdownLatch = new CountDownLatch(1);
         private AtomicBoolean running = new AtomicBoolean(true);
 
-        TopicPoller(Consumer<String, String> consumer, CountDownLatch cacheReadyLatch)
{
+        TopicPoller(Consumer<String, String> consumer, CountDownLatch cacheReadyLatch,
int pollDurationMs) {
             this.consumer = consumer;
             this.cacheReadyLatch = cacheReadyLatch;
+            this.pollDurationMs = pollDurationMs;
         }
 
         @Override
@@ -287,7 +400,7 @@ public class KafkaIdempotentRepository extends ServiceSupport implements
Idempot
 
             POLL_LOOP: while (running.get()) {
                 log.trace("Polling");
-                ConsumerRecords<String, String> consumerRecords = consumer.poll(POLL_DURATION_MS);
+                ConsumerRecords<String, String> consumerRecords = consumer.poll(pollDurationMs);
                 if (consumerRecords.isEmpty()) {
                     // the first time this happens, we can assume that we have consumed all
                     // messages up to this point


Mime
View raw message