camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/2] camel git commit: CAMEL-10586: make the kafka endpoint a little easier to use. The producer can now automatic convert to the serializer configured.
Date Sat, 04 Mar 2017 09:59:32 GMT
Repository: camel
Updated Branches:
  refs/heads/master 7445dbccb -> cc06080b0


CAMEL-10586: make the kafka endpoint a little easier to use. The producer can now automatic
convert to the serializer configured.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ec9b418a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ec9b418a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ec9b418a

Branch: refs/heads/master
Commit: ec9b418a5b00782f7edeaf766bc2d537ae30269d
Parents: 7445dbc
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Sat Mar 4 10:26:15 2017 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Sat Mar 4 10:26:15 2017 +0100

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaProducer.java    | 46 +++++++++++++++++---
 1 file changed, 39 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ec9b418a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index b5c192e..f78e369 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -131,11 +131,15 @@ public class KafkaProducer extends DefaultAsyncProducer {
         final boolean hasPartitionKey = partitionKey != null;
 
         // endpoint take precedence over header configuration
-        final Object messageKey = endpoint.getConfiguration().getKey() != null
+        Object key = endpoint.getConfiguration().getKey() != null
             ? endpoint.getConfiguration().getKey() : exchange.getIn().getHeader(KafkaConstants.KEY);
+        final Object messageKey = key != null
+            ? getMessageKey(exchange, key, endpoint.getConfiguration().getKeySerializerClass())
: null;
         final boolean hasMessageKey = messageKey != null;
 
         Object msg = exchange.getIn().getBody();
+
+        // is the message body a list or something that contains multiple values
         Iterator<Object> iterator = null;
         if (msg instanceof Iterable) {
             iterator = ((Iterable<Object>)msg).iterator();
@@ -153,12 +157,16 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
                 @Override
                 public ProducerRecord next() {
+                    // must convert each entry of the iterator into the value according to
the serializer
+                    Object next = msgList.next();
+                    Object value = getMessageValue(exchange, next, endpoint.getConfiguration().getSerializerClass());
+
                     if (hasPartitionKey && hasMessageKey) {
-                        return new ProducerRecord(msgTopic, partitionKey, messageKey, msgList.next());
+                        return new ProducerRecord(msgTopic, partitionKey, key, value);
                     } else if (hasMessageKey) {
-                        return new ProducerRecord(msgTopic, messageKey, msgList.next());
+                        return new ProducerRecord(msgTopic, key, value);
                     }
-                    return new ProducerRecord(msgTopic, msgList.next());
+                    return new ProducerRecord(msgTopic, value);
                 }
 
                 @Override
@@ -167,14 +175,18 @@ public class KafkaProducer extends DefaultAsyncProducer {
                 }
             };
         }
+
+        // must convert each entry of the iterator into the value according to the serializer
+        Object value = getMessageValue(exchange, msg, endpoint.getConfiguration().getSerializerClass());
+
         ProducerRecord record;
         if (hasPartitionKey && hasMessageKey) {
-            record = new ProducerRecord(topic, partitionKey, messageKey, msg);
+            record = new ProducerRecord(topic, partitionKey, key, value);
         } else if (hasMessageKey) {
-            record = new ProducerRecord(topic, messageKey, msg);
+            record = new ProducerRecord(topic, key, value);
         } else {
             log.warn("No message key or partition key set");
-            record = new ProducerRecord(topic, msg);
+            record = new ProducerRecord(topic, value);
         }
         return Collections.singletonList(record).iterator();
     }
@@ -222,6 +234,26 @@ public class KafkaProducer extends DefaultAsyncProducer {
         return true;
     }
 
+    protected Object getMessageKey(Exchange exchange, Object key, String keySerializer) {
+        Object answer = key;
+        if (KafkaConstants.KAFKA_DEFAULT_DESERIALIZER.equals(keySerializer)) {
+            // its string based so ensure key is string as well
+            answer = exchange.getContext().getTypeConverter().tryConvertTo(String.class,
exchange, key);
+        }
+        // TODO: other serializers
+        return answer;
+    }
+
+    protected Object getMessageValue(Exchange exchange, Object value, String valueSerializer)
{
+        Object answer = value;
+        if (KafkaConstants.KAFKA_DEFAULT_DESERIALIZER.equals(valueSerializer)) {
+            // its string based so ensure value is string as well
+            answer = exchange.getContext().getTypeConverter().tryConvertTo(String.class,
exchange, value);
+        }
+        // TODO: other serializers
+        return answer;
+    }
+
     private final class KafkaProducerCallBack implements Callback {
 
         private final Exchange exchange;


Mime
View raw message