camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [2/2] camel git commit: Component docs
Date Mon, 18 May 2015 15:04:05 GMT
Component docs


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c3d0f2f2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c3d0f2f2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c3d0f2f2

Branch: refs/heads/master
Commit: c3d0f2f222aaaa60a4bc0300aa4eb6582e7143e7
Parents: ee5c7a0
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Mon May 18 17:06:16 2015 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Mon May 18 17:07:31 2015 +0200

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaComponent.java   |   7 +-
 .../component/kafka/KafkaConfiguration.java     | 162 ++++++++++++++++---
 .../camel/component/kafka/KafkaEndpoint.java    |  13 +-
 .../component/kafka/KafkaEndpointTest.java      |   6 +-
 .../component/kafka/KafkaProducerTest.java      |   4 +-
 5 files changed, 154 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c3d0f2f2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index bf33a03..b659e73 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -35,7 +35,12 @@ public class KafkaComponent extends UriEndpointComponent {
     protected KafkaEndpoint createEndpoint(String uri,
                                            String remaining,
                                            Map<String, Object> params) throws Exception
{
-        KafkaEndpoint endpoint = new KafkaEndpoint(uri, remaining, this);
+
+        KafkaEndpoint endpoint = new KafkaEndpoint(uri, this);
+        String brokers = remaining.split("\\?")[0];
+        if (brokers != null) {
+            endpoint.getConfiguration().setBrokers(brokers);
+        }
         setProperties(endpoint, params);
         return endpoint;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/c3d0f2f2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 368b10f..4a2e1ea 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -19,8 +19,10 @@ package org.apache.camel.component.kafka;
 import java.util.Properties;
 
 import kafka.producer.DefaultPartitioner;
+import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
+import org.apache.camel.spi.UriPath;
 
 @UriParams
 public class KafkaConfiguration {
@@ -31,7 +33,7 @@ public class KafkaConfiguration {
     private String zookeeperHost;
     @UriParam(defaultValue = "2181")
     private int zookeeperPort = 2181;
-    @UriParam
+    @UriParam @Metadata(required = "true")
     private String topic;
     @UriParam
     private String groupId;
@@ -90,39 +92,41 @@ public class KafkaConfiguration {
     private Integer zookeeperSyncTimeMs;
 
     //Producer configuration properties
-    @UriParam
-    private String producerType;
-    @UriParam
-    private String compressionCodec;
-    @UriParam
+    @UriPath
+    private String brokers;
+    @UriParam(label = "producer", defaultValue = "sync", enums = "async,sync")
+    private String producerType = "sync";
+    @UriParam(label = "producer", defaultValue = "none", enums = "none,gzip,snappy")
+    private String compressionCodec = "none";
+    @UriParam(label = "producer")
     private String compressedTopics;
-    @UriParam
-    private Integer messageSendMaxRetries;
-    @UriParam
-    private Integer retryBackoffMs;
-    @UriParam
-    private Integer topicMetadataRefreshIntervalMs;
+    @UriParam(label = "producer", defaultValue = "3")
+    private Integer messageSendMaxRetries = 3;
+    @UriParam(label = "producer", defaultValue = "100")
+    private Integer retryBackoffMs = 100;
+    @UriParam(label = "producer", defaultValue = "600000")
+    private Integer topicMetadataRefreshIntervalMs = 600 * 1000;
 
     //Sync producer config
-    @UriParam
-    private Integer sendBufferBytes;
-    @UriParam
+    @UriParam(label = "producer", defaultValue = "" + 100 * 1024)
+    private Integer sendBufferBytes = 100 * 1024;
+    @UriParam(label = "producer", defaultValue = "0")
     private short requestRequiredAcks;
-    @UriParam
-    private Integer requestTimeoutMs;
+    @UriParam(label = "producer", defaultValue = "10000")
+    private Integer requestTimeoutMs = 10000;
 
     //Async producer config
-    @UriParam
-    private Integer queueBufferingMaxMs;
-    @UriParam
-    private Integer queueBufferingMaxMessages;
-    @UriParam
+    @UriParam(label = "producer", defaultValue = "5000")
+    private Integer queueBufferingMaxMs = 5000;
+    @UriParam(label = "producer", defaultValue = "10000")
+    private Integer queueBufferingMaxMessages = 10000;
+    @UriParam(label = "producer")
     private Integer queueEnqueueTimeoutMs;
-    @UriParam
-    private Integer batchNumMessages;
-    @UriParam
+    @UriParam(label = "producer", defaultValue = "200")
+    private Integer batchNumMessages = 200;
+    @UriParam(label = "producer")
     private String serializerClass;
-    @UriParam
+    @UriParam(label = "producer")
     private String keySerializerClass;
 
     public KafkaConfiguration() {
@@ -256,6 +260,9 @@ public class KafkaConfiguration {
         return partitioner;
     }
 
+    /**
+     * The partitioner class for partitioning messages amongst sub-topics. The default partitioner
is based on the hash of the key.
+     */
     public void setPartitioner(String partitioner) {
         this.partitioner = partitioner;
     }
@@ -264,6 +271,9 @@ public class KafkaConfiguration {
         return topic;
     }
 
+    /**
+     * Name of the topic to use
+     */
     public void setTopic(String topic) {
         this.topic = topic;
     }
@@ -272,6 +282,9 @@ public class KafkaConfiguration {
         return consumerStreams;
     }
 
+    /**
+     * Number of concurrent consumers on the consumer
+     */
     public void setConsumerStreams(int consumerStreams) {
         this.consumerStreams = consumerStreams;
     }
@@ -395,6 +408,9 @@ public class KafkaConfiguration {
         return queuedMaxMessageChunks;
     }
 
+    /**
+     * Max number of message chunks buffered for consumption. Each chunk can be up to fetch.message.max.bytes.
+     */
     public void setQueuedMaxMessageChunks(Integer queuedMaxMessageChunks) {
         this.queuedMaxMessageChunks = queuedMaxMessageChunks;
     }
@@ -515,10 +531,31 @@ public class KafkaConfiguration {
         this.zookeeperSyncTimeMs = zookeeperSyncTimeMs;
     }
 
+    public String getBrokers() {
+        return brokers;
+    }
+
+    /**
+     * This is for bootstrapping and the producer will only use it for getting metadata (topics,
partitions and replicas).
+     * The socket connections for sending the actual data will be established based on the
broker information returned in the metadata.
+     * The format is host1:port1,host2:port2, and the list can be a subset of brokers or
a VIP pointing to a subset of brokers.
+     * <p/>
+     * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation.
+     */
+    public void setBrokers(String brokers) {
+        this.brokers = brokers;
+    }
+
     public String getProducerType() {
         return producerType;
     }
 
+    /**
+     * This parameter specifies whether the messages are sent asynchronously in a background
thread.
+     * Valid values are (1) async for asynchronous send and (2) sync for synchronous send.
+     * By setting the producer to async we allow batching together of requests (which is
great for throughput)
+     * but open the possibility of a failure of the client machine dropping unsent data.
+     */
     public void setProducerType(String producerType) {
         this.producerType = producerType;
     }
@@ -527,6 +564,9 @@ public class KafkaConfiguration {
         return compressionCodec;
     }
 
+    /**
+     * This parameter allows you to specify the compression codec for all data generated
by this producer. Valid values are "none", "gzip" and "snappy".
+     */
     public void setCompressionCodec(String compressionCodec) {
         this.compressionCodec = compressionCodec;
     }
@@ -535,6 +575,12 @@ public class KafkaConfiguration {
         return compressedTopics;
     }
 
+    /**
+     * This parameter allows you to set whether compression should be turned on for particular
topics.
+     * If the compression codec is anything other than NoCompressionCodec, enable compression
only for specified topics if any.
+     * If the list of compressed topics is empty, then enable the specified compression codec
for all topics.
+     * If the compression codec is NoCompressionCodec, compression is disabled for all topics
+     */
     public void setCompressedTopics(String compressedTopics) {
         this.compressedTopics = compressedTopics;
     }
@@ -543,6 +589,11 @@ public class KafkaConfiguration {
         return messageSendMaxRetries;
     }
 
+    /**
+     * This property will cause the producer to automatically retry a failed send request.
+     * This property specifies the number of retries when such failures occur. Note that
setting a non-zero value here
+     * can lead to duplicates in the case of network errors that cause a message to be sent
but the acknowledgement to be lost.
+     */
     public void setMessageSendMaxRetries(Integer messageSendMaxRetries) {
         this.messageSendMaxRetries = messageSendMaxRetries;
     }
@@ -551,6 +602,10 @@ public class KafkaConfiguration {
         return retryBackoffMs;
     }
 
+    /**
+     * Before each retry, the producer refreshes the metadata of relevant topics to see if
a new leader has been elected.
+     * Since leader election takes a bit of time, this property specifies the amount of time
that the producer waits before refreshing the metadata.
+     */
     public void setRetryBackoffMs(Integer retryBackoffMs) {
         this.retryBackoffMs = retryBackoffMs;
     }
@@ -559,6 +614,14 @@ public class KafkaConfiguration {
         return topicMetadataRefreshIntervalMs;
     }
 
+    /**
+     * The producer generally refreshes the topic metadata from brokers when there is a failure
(partition missing,
+     * leader not available...). It will also poll regularly (default: every 10min so 600000ms).
+     * If you set this to a negative value, metadata will only get refreshed on failure.
+     * If you set this to zero, the metadata will get refreshed after each message sent (not
recommended).
+     * Important note: the refresh happen only AFTER the message is sent, so if the producer
never
+     * sends a message the metadata is never refreshed
+     */
     public void setTopicMetadataRefreshIntervalMs(Integer topicMetadataRefreshIntervalMs)
{
         this.topicMetadataRefreshIntervalMs = topicMetadataRefreshIntervalMs;
     }
@@ -567,6 +630,9 @@ public class KafkaConfiguration {
         return sendBufferBytes;
     }
 
+    /**
+     * Socket write buffer size
+     */
     public void setSendBufferBytes(Integer sendBufferBytes) {
         this.sendBufferBytes = sendBufferBytes;
     }
@@ -575,6 +641,22 @@ public class KafkaConfiguration {
         return requestRequiredAcks;
     }
 
+    /**
+     * This value controls when a produce request is considered completed. Specifically,
+     * how many other brokers must have committed the data to their log and acknowledged
this to the leader?
+     * Typical values are (0, 1 or -1):
+     * 0, which means that the producer never waits for an acknowledgement from the broker
(the same behavior as 0.7).
+     * This option provides the lowest latency but the weakest durability guarantees (some
data will be lost when a server fails).
+     * 1, which means that the producer gets an acknowledgement after the leader replica
has received the data.
+     * This option provides better durability as the client waits until the server acknowledges
the request as successful
+     * (only messages that were written to the now-dead leader but not yet replicated will
be lost).
+     * -1, The producer gets an acknowledgement after all in-sync replicas have received
the data.
+     * This option provides the greatest level of durability.
+     * However, it does not completely eliminate the risk of message loss because the number
of in sync replicas may,
+     * in rare cases, shrink to 1. If you want to ensure that some minimum number of replicas
+     * (typically a majority) receive a write, then you must set the topic-level min.insync.replicas
setting.
+     * Please read the Replication section of the design documentation for a more in-depth
discussion.
+     */
     public void setRequestRequiredAcks(short requestRequiredAcks) {
         this.requestRequiredAcks = requestRequiredAcks;
     }
@@ -583,6 +665,9 @@ public class KafkaConfiguration {
         return requestTimeoutMs;
     }
 
+    /**
+     * The amount of time the broker will wait trying to meet the request.required.acks requirement
before sending back an error to the client.
+     */
     public void setRequestTimeoutMs(Integer requestTimeoutMs) {
         this.requestTimeoutMs = requestTimeoutMs;
     }
@@ -591,6 +676,11 @@ public class KafkaConfiguration {
         return queueBufferingMaxMs;
     }
 
+    /**
+     * Maximum time to buffer data when using async mode.
+     * For example a setting of 100 will try to batch together 100ms of messages to send
at once.
+     * This will improve throughput but adds message delivery latency due to the buffering.
+     */
     public void setQueueBufferingMaxMs(Integer queueBufferingMaxMs) {
         this.queueBufferingMaxMs = queueBufferingMaxMs;
     }
@@ -599,6 +689,10 @@ public class KafkaConfiguration {
         return queueBufferingMaxMessages;
     }
 
+    /**
+     * The maximum number of unsent messages that can be queued up the producer when using
async
+     * mode before either the producer must be blocked or data must be dropped.
+     */
     public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) {
         this.queueBufferingMaxMessages = queueBufferingMaxMessages;
     }
@@ -607,6 +701,11 @@ public class KafkaConfiguration {
         return queueEnqueueTimeoutMs;
     }
 
+    /**
+     * The amount of time to block before dropping messages when running in async mode and
the buffer has reached
+     * queue.buffering.max.messages. If set to 0 events will be enqueued immediately or dropped
if the queue is full
+     * (the producer send call will never block). If set to -1 the producer will block indefinitely
and never willingly drop a send.
+     */
     public void setQueueEnqueueTimeoutMs(Integer queueEnqueueTimeoutMs) {
         this.queueEnqueueTimeoutMs = queueEnqueueTimeoutMs;
     }
@@ -615,6 +714,10 @@ public class KafkaConfiguration {
         return batchNumMessages;
     }
 
+    /**
+     * The number of messages to send in one batch when using async mode.
+     * The producer will wait until either this number of messages are ready to send or queue.buffer.max.ms
is reached.
+     */
     public void setBatchNumMessages(Integer batchNumMessages) {
         this.batchNumMessages = batchNumMessages;
     }
@@ -623,6 +726,10 @@ public class KafkaConfiguration {
         return serializerClass;
     }
 
+    /**
+     * The serializer class for messages. The default encoder takes a byte[] and returns
the same byte[].
+     * The default class is kafka.serializer.DefaultEncoder
+     */
     public void setSerializerClass(String serializerClass) {
         this.serializerClass = serializerClass;
     }
@@ -631,6 +738,9 @@ public class KafkaConfiguration {
         return keySerializerClass;
     }
 
+    /**
+     * The serializer class for keys (defaults to the same as for messages if nothing is
given).
+     */
     public void setKeySerializerClass(String keySerializerClass) {
         this.keySerializerClass = keySerializerClass;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/c3d0f2f2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index a424955..bebe6d7 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -37,19 +37,14 @@ import org.apache.camel.spi.UriPath;
 @UriEndpoint(scheme = "kafka", title = "Kafka", syntax = "kafka:brokers", consumerClass =
KafkaConsumer.class, label = "messaging")
 public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersSupport {
 
-    @UriPath @Metadata(required = "true")
-    private String brokers;
     @UriParam
     private KafkaConfiguration configuration = new KafkaConfiguration();
 
     public KafkaEndpoint() {
     }
 
-    public KafkaEndpoint(String endpointUri,
-                         String remaining,
-                         KafkaComponent component) throws URISyntaxException {
+    public KafkaEndpoint(String endpointUri, KafkaComponent component) {
         super(endpointUri, component);
-        this.brokers = remaining.split("\\?")[0];
     }
 
     public KafkaConfiguration getConfiguration() {
@@ -156,7 +151,11 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
     }
 
     public String getBrokers() {
-        return brokers;
+        return configuration.getBrokers();
+    }
+
+    public void setBrokers(String brokers) {
+        configuration.setBrokers(brokers);
     }
 
     public int getConsumerStreams() {

http://git-wip-us.apache.org/repos/asf/camel/blob/c3d0f2f2/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
index 6ac6f81..ed4a6d1 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
@@ -32,7 +32,8 @@ public class KafkaEndpointTest {
 
     @Test
     public void testCreatingKafkaExchangeSetsHeaders() throws URISyntaxException {
-        KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", "localhost", new KafkaComponent());
+        KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", new KafkaComponent());
+        endpoint.setBrokers("localhost");
 
         Message message = new Message("mymessage".getBytes(), "somekey".getBytes());
         DefaultDecoder decoder = new DefaultDecoder(null);
@@ -47,7 +48,8 @@ public class KafkaEndpointTest {
 
     @Test
     public void assertSingleton() throws URISyntaxException {
-        KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", "localhost", new KafkaComponent());
+        KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", new KafkaComponent());
+        endpoint.setBrokers("localhost");
         assertTrue(endpoint.isSingleton());
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/c3d0f2f2/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index d989c96..8e5b0f7 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -41,8 +41,8 @@ public class KafkaProducerTest {
 
     @SuppressWarnings({"unchecked"})
     public KafkaProducerTest() throws Exception {
-        endpoint = new KafkaEndpoint("kafka:broker1:1234,broker2:4567?topic=sometopic",
-                "broker1:1234," + "broker2:4567?topic=sometopic", null);
+        endpoint = new KafkaEndpoint("kafka:broker1:1234,broker2:4567?topic=sometopic", null);
+        endpoint.setBrokers("broker1:1234,broker2:4567");
         producer = new KafkaProducer(endpoint);
         producer.producer = Mockito.mock(Producer.class);
     }


Mime
View raw message