camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject camel git commit: CAMEL-8923 Fixed the infinite loop by adding bridgeEndpoint option to kafka endpoint
Date Wed, 01 Jul 2015 15:05:18 GMT
Repository: camel
Updated Branches:
  refs/heads/master 0f636566e -> 19e70a6a3


CAMEL-8923 Fixed the infinite loop by adding bridgeEndpoint option to kafka endpoint


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/19e70a6a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/19e70a6a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/19e70a6a

Branch: refs/heads/master
Commit: 19e70a6a36b75a1e2c3291d9d21df24e263023fa
Parents: 0f63656
Author: Willem Jiang <willem.jiang@gmail.com>
Authored: Wed Jul 1 23:03:12 2015 +0800
Committer: Willem Jiang <willem.jiang@gmail.com>
Committed: Wed Jul 1 23:03:31 2015 +0800

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConfiguration.java     |  1 -
 .../apache/camel/component/kafka/KafkaEndpoint.java   | 14 +++++++++++---
 .../apache/camel/component/kafka/KafkaProducer.java   |  5 ++++-
 .../camel/component/kafka/KafkaProducerTest.java      | 13 +++++++++++++
 4 files changed, 28 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/19e70a6a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 5bd8ee6..4858af9 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -39,7 +39,6 @@ public class KafkaConfiguration {
     private String groupId;
     @UriParam(defaultValue = "DefaultPartitioner")
     private String partitioner = DefaultPartitioner.class.getCanonicalName();
-
     @UriParam(label = "consumer", defaultValue = "10")
     private int consumerStreams = 10;
     @UriParam(label = "consumer", defaultValue = "1")

http://git-wip-us.apache.org/repos/asf/camel/blob/19e70a6a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index df213f3..bbf7062 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.kafka;
 
-import java.net.URISyntaxException;
 import java.util.concurrent.ExecutorService;
 
 import kafka.message.MessageAndMetadata;
@@ -29,16 +28,17 @@ import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.DefaultMessage;
-import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
-import org.apache.camel.spi.UriPath;
 
 @UriEndpoint(scheme = "kafka", title = "Kafka", syntax = "kafka:brokers", consumerClass =
KafkaConsumer.class, label = "messaging")
 public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersSupport {
 
     @UriParam
     private KafkaConfiguration configuration = new KafkaConfiguration();
+    
+    @UriParam(defaultValue = "false")
+    private boolean bridgeEndpoint;
 
     public KafkaEndpoint() {
     }
@@ -458,4 +458,12 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
     public boolean isMultipleConsumersSupported() {
         return true;
     }
+
+    public boolean isBridgeEndpoint() {
+        return bridgeEndpoint;
+    }
+
+    public void setBridgeEndpoint(boolean bridgeEndpoint) {
+        this.bridgeEndpoint = bridgeEndpoint;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/19e70a6a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index c598cf9..c637652 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -61,7 +61,10 @@ public class KafkaProducer extends DefaultProducer {
 
     @Override
     public void process(Exchange exchange) throws CamelException {
-        String topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, endpoint.getTopic(),
String.class);
+        String topic = endpoint.getTopic();
+        if (!endpoint.isBridgeEndpoint()) {
+            topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, endpoint.getTopic(),
String.class);
+        }
         if (topic == null) {
             throw new CamelExchangeException("No topic key set", exchange);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/19e70a6a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 8e5b0f7..f2bcd6b 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -128,6 +128,19 @@ public class KafkaProducerTest {
 
         verifySendMessage("someKey", "someTopic", "someKey");
     }
+    
+    @Test
+    public void processSendMessageWithBridgeEndpoint() throws Exception {
+        endpoint.setTopic("someTopic");
+        endpoint.setBridgeEndpoint(true);
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
+        in.setHeader(KafkaConstants.KEY, "someKey");
+        
+        producer.process(exchange);
+        
+        verifySendMessage("someKey", "someTopic", "someKey");
+    }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
     protected void verifySendMessage(String partitionKey, String topic, String messageKey)
{


Mime
View raw message