camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [2/3] camel git commit: Make kafka bridgeEndpoint option on configuraion so they are all there and thus also available via spring boot
Date Mon, 24 Apr 2017 11:34:40 GMT
Make kafka bridgeEndpoint option on configuraion so they are all there and thus also available
via spring boot


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8b5e93ec
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8b5e93ec
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8b5e93ec

Branch: refs/heads/master
Commit: 8b5e93ec694ef31a2bd43248735d3bc274f4e9d3
Parents: 1d164d5
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Mon Apr 24 13:24:20 2017 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Mon Apr 24 13:24:20 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          |  2 +-
 .../component/kafka/KafkaConfiguration.java     | 32 ++++++++++++++++++++
 .../camel/component/kafka/KafkaEndpoint.java    | 29 ------------------
 .../camel/component/kafka/KafkaProducer.java    |  4 +--
 .../component/kafka/KafkaProducerTest.java      |  6 ++--
 .../springboot/KafkaComponentConfiguration.java | 32 ++++++++++++++++++++
 6 files changed, 70 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8b5e93ec/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index dceca6f..5637673 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -100,7 +100,7 @@ with the following path and query parameters:
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an
exchange. |  | ExchangePattern
 | **bridgeEndpoint** (producer) | If the option is true then KafkaProducer will ignore the
KafkaConstants.TOPIC header setting of the inbound message. | false | boolean
 | **bufferMemorySize** (producer) | The total bytes of memory the producer can use to buffer
records waiting to be sent to the server. If records are sent faster than they can be delivered
to the server the producer will either block or throw an exception based on the preference
specified by block.on.buffer.full.This setting should correspond roughly to the total memory
the producer will use but is not a hard bound since not all memory the producer uses is used
for buffering. Some additional memory will be used for compression (if compression is enabled)
as well as for maintaining in-flight requests. | 33554432 | Integer
-| **circularKeyDetection** (producer) | If the option is true then KafkaProducer will detect
if the message is attempted to be sent back to the same topic it may come from if the message
was original from a kafka consumer. If the KafkaConstants.TOPIC header is the same as the
original kafka consumer topic then the header setting is ignored and the topic of the producer
endpoint is used. In other words this avoids sending the same message back to where it came
from. This option is not in use if the option bridgeEndpoint is set to true. | true | boolean
+| **circularTopicDetection** (producer) | If the option is true then KafkaProducer will detect
if the message is attempted to be sent back to the same topic it may come from if the message
was original from a kafka consumer. If the KafkaConstants.TOPIC header is the same as the
original kafka consumer topic then the header setting is ignored and the topic of the producer
endpoint is used. In other words this avoids sending the same message back to where it came
from. This option is not in use if the option bridgeEndpoint is set to true. | true | boolean
 | **compressionCodec** (producer) | This parameter allows you to specify the compression
codec for all data generated by this producer. Valid values are none gzip and snappy. | none
| String
 | **connectionMaxIdleMs** (producer) | Close idle connections after the number of milliseconds
specified by this config. | 540000 | Integer
 | **key** (producer) | The record key (or null if no key is specified). If this option has
been configured then it take precedence over header link KafkaConstantsKEY |  | String

http://git-wip-us.apache.org/repos/asf/camel/blob/8b5e93ec/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index a609c79..bb4acbc 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -118,6 +118,12 @@ public class KafkaConfiguration implements Cloneable {
     @UriParam(label = "consumer")
     private StateRepository<String, String> offsetRepository;
 
+    //Producer Camel specific configuration properties
+    @UriParam(label = "producer")
+    private boolean bridgeEndpoint;
+    @UriParam(label = "producer", defaultValue = "true")
+    private boolean circularTopicDetection = true;
+
     //Producer configuration properties
     @UriParam(label = "producer", defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
     private String partitioner = KafkaConstants.KAFKA_DEFAULT_PARTITIONER;
@@ -493,6 +499,32 @@ public class KafkaConfiguration implements Cloneable {
         this.groupId = groupId;
     }
 
+    public boolean isBridgeEndpoint() {
+        return bridgeEndpoint;
+    }
+
+    /**
+     * If the option is true, then KafkaProducer will ignore the KafkaConstants.TOPIC header
setting of the inbound message.
+     */
+    public void setBridgeEndpoint(boolean bridgeEndpoint) {
+        this.bridgeEndpoint = bridgeEndpoint;
+    }
+
+    public boolean isCircularTopicDetection() {
+        return circularTopicDetection;
+    }
+
+    /**
+     * If the option is true, then KafkaProducer will detect if the message is attempted
to be sent back to the same topic
+     * it may come from, if the message was original from a kafka consumer. If the KafkaConstants.TOPIC
header is the
+     * same as the original kafka consumer topic, then the header setting is ignored, and
the topic of the producer
+     * endpoint is used. In other words this avoids sending the same message back to where
it came from.
+     * This option is not in use if the option bridgeEndpoint is set to true.
+     */
+    public void setCircularTopicDetection(boolean circularTopicDetection) {
+        this.circularTopicDetection = circularTopicDetection;
+    }
+
     public String getPartitioner() {
         return partitioner;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/8b5e93ec/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index b4c86ef..14932bf 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -50,10 +50,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
 
     @UriParam
     private KafkaConfiguration configuration = new KafkaConfiguration();
-    @UriParam(label = "producer")
-    private boolean bridgeEndpoint;
-    @UriParam(label = "producer", defaultValue = "true")
-    private boolean circularTopicDetection = true;
 
     public KafkaEndpoint() {
     }
@@ -187,29 +183,4 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         return new KafkaProducer(endpoint);
     }
 
-    public boolean isBridgeEndpoint() {
-        return bridgeEndpoint;
-    }
-
-    /**
-     * If the option is true, then KafkaProducer will ignore the KafkaConstants.TOPIC header
setting of the inbound message.
-     */
-    public void setBridgeEndpoint(boolean bridgeEndpoint) {
-        this.bridgeEndpoint = bridgeEndpoint;
-    }
-
-    public boolean isCircularTopicDetection() {
-        return circularTopicDetection;
-    }
-
-    /**
-     * If the option is true, then KafkaProducer will detect if the message is attempted
to be sent back to the same topic
-     * it may come from, if the message was original from a kafka consumer. If the KafkaConstants.TOPIC
header is the
-     * same as the original kafka consumer topic, then the header setting is ignored, and
the topic of the producer
-     * endpoint is used. In other words this avoids sending the same message back to where
it came from.
-     * This option is not in use if the option bridgeEndpoint is set to true.
-     */
-    public void setCircularTopicDetection(boolean circularTopicDetection) {
-        this.circularTopicDetection = circularTopicDetection;
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8b5e93ec/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 01d29b5..ede3d3e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -122,13 +122,13 @@ public class KafkaProducer extends DefaultAsyncProducer {
     protected Iterator<ProducerRecord> createRecorder(Exchange exchange) throws CamelException
{
         String topic = endpoint.getConfiguration().getTopic();
 
-        if (!endpoint.isBridgeEndpoint()) {
+        if (!endpoint.getConfiguration().isBridgeEndpoint()) {
             String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, String.class);
             boolean allowHeader = true;
 
             // when we do not bridge then detect if we try to send back to ourselves
             // which we most likely do not want to do
-            if (headerTopic != null && endpoint.isCircularTopicDetection()) {
+            if (headerTopic != null && endpoint.getConfiguration().isCircularTopicDetection())
{
                 Endpoint from = exchange.getFromEndpoint();
                 if (from instanceof KafkaEndpoint) {
                     String fromTopic = ((KafkaEndpoint) from).getConfiguration().getTopic();

http://git-wip-us.apache.org/repos/asf/camel/blob/8b5e93ec/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 9143017..41378b8 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -236,7 +236,7 @@ public class KafkaProducerTest {
     @Test
     public void processSendMessageWithBridgeEndpoint() throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
-        endpoint.setBridgeEndpoint(true);
+        endpoint.getConfiguration().setBridgeEndpoint(true);
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
         in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
@@ -252,7 +252,7 @@ public class KafkaProducerTest {
     @Test
     public void processSendMessageWithCircularDetected() throws Exception {
         endpoint.getConfiguration().setTopic("sometopic");
-        endpoint.setCircularTopicDetection(true); // enable by default
+        endpoint.getConfiguration().setCircularTopicDetection(true);
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
         Mockito.when(exchange.getFromEndpoint()).thenReturn(fromEndpoint);
@@ -269,7 +269,7 @@ public class KafkaProducerTest {
     @Test
     public void processSendMessageWithNoCircularDetected() throws Exception {
         endpoint.getConfiguration().setTopic("sometopic");
-        endpoint.setCircularTopicDetection(false); // enable by default
+        endpoint.getConfiguration().setCircularTopicDetection(false);
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getOut()).thenReturn(out);
         Mockito.when(exchange.getFromEndpoint()).thenReturn(fromEndpoint);

http://git-wip-us.apache.org/repos/asf/camel/blob/8b5e93ec/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
----------------------------------------------------------------------
diff --git a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index 160c552..f5dfddd 100644
--- a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -117,6 +117,22 @@ public class KafkaComponentConfiguration {
          */
         private String groupId;
         /**
+         * If the option is true, then KafkaProducer will ignore the
+         * KafkaConstants.TOPIC header setting of the inbound message.
+         */
+        private Boolean bridgeEndpoint = false;
+        /**
+         * If the option is true, then KafkaProducer will detect if the message
+         * is attempted to be sent back to the same topic it may come from, if
+         * the message was original from a kafka consumer. If the
+         * KafkaConstants.TOPIC header is the same as the original kafka
+         * consumer topic, then the header setting is ignored, and the topic of
+         * the producer endpoint is used. In other words this avoids sending the
+         * same message back to where it came from. This option is not in use if
+         * the option bridgeEndpoint is set to true.
+         */
+        private Boolean circularTopicDetection = true;
+        /**
          * The partitioner class for partitioning messages amongst sub-topics.
          * The default partitioner is based on the hash of the key.
          */
@@ -631,6 +647,22 @@ public class KafkaComponentConfiguration {
             this.groupId = groupId;
         }
 
+        public Boolean getBridgeEndpoint() {
+            return bridgeEndpoint;
+        }
+
+        public void setBridgeEndpoint(Boolean bridgeEndpoint) {
+            this.bridgeEndpoint = bridgeEndpoint;
+        }
+
+        public Boolean getCircularTopicDetection() {
+            return circularTopicDetection;
+        }
+
+        public void setCircularTopicDetection(Boolean circularTopicDetection) {
+            this.circularTopicDetection = circularTopicDetection;
+        }
+
         public String getPartitioner() {
             return partitioner;
         }


Mime
View raw message