camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/2] camel git commit: CAMEL-8409: Kafka producer: when no message key specified, use partition key. Thanks to Mark Mindenhall for the patch.
Date Thu, 26 Feb 2015 09:12:41 GMT
Repository: camel
Updated Branches:
  refs/heads/camel-2.14.x a219cc3d1 -> 108dbfca9
  refs/heads/master 028700818 -> a7dfc4509


CAMEL-8409: Kafka producer: when no message key specified, use partition key. Thanks to Mark
Mindenhall for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a7dfc450
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a7dfc450
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a7dfc450

Branch: refs/heads/master
Commit: a7dfc45097a610dca1a39c86f481182efc20e152
Parents: 0287008
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Thu Feb 26 10:12:39 2015 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Thu Feb 26 10:12:39 2015 +0100

----------------------------------------------------------------------
 .../apache/camel/component/kafka/KafkaProducer.java  | 14 +++++++++++++-
 .../camel/component/kafka/KafkaProducerTest.java     | 15 +++++++++++++--
 2 files changed, 26 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a7dfc450/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 2918ffc..c598cf9 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -66,9 +66,21 @@ public class KafkaProducer extends DefaultProducer {
             throw new CamelExchangeException("No topic key set", exchange);
         }
         String partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, String.class);
+        boolean hasPartitionKey = partitionKey != null;
         String messageKey = exchange.getIn().getHeader(KafkaConstants.KEY, String.class);
+        boolean hasMessageKey = messageKey != null;
         String msg = exchange.getIn().getBody(String.class);
-        KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic,
messageKey, partitionKey, msg);
+        KeyedMessage<String, String> data;
+        if (hasPartitionKey && hasMessageKey) {
+            data = new KeyedMessage<String, String>(topic, messageKey, partitionKey,
msg);
+        } else if (hasPartitionKey) {
+            data = new KeyedMessage<String, String>(topic, partitionKey, msg);
+        } else if (hasMessageKey) {
+            data = new KeyedMessage<String, String>(topic, messageKey, msg);
+        } else {
+            log.warn("No message key or partition key set");
+            data = new KeyedMessage<String, String>(topic, messageKey, partitionKey,
msg);
+        }
         producer.send(data);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/a7dfc450/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index b0f516f..d989c96 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -76,7 +76,7 @@ public class KafkaProducerTest {
 
         producer.process(exchange);
 
-        verifySendMessage("4", "anotherTopic", null);
+        verifySendMessage("4", "anotherTopic", "4");
     }
 
     @Test
@@ -115,7 +115,18 @@ public class KafkaProducerTest {
 
         producer.process(exchange);
 
-        verifySendMessage("4", "someTopic", null);
+        verifySendMessage("4", "someTopic", "4");
+    }
+
+    @Test
+    public void processSendsMesssageWithMessageKeyHeader() throws Exception {
+        endpoint.setTopic("someTopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        in.setHeader(KafkaConstants.KEY, "someKey");
+
+        producer.process(exchange);
+
+        verifySendMessage("someKey", "someTopic", "someKey");
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})


Mime
View raw message