camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/3] git commit: Fix the KafkaConsumer to put the message in the body Right now, the consumer would create an exchange for each received message. However, it didn't filled the exchange body with the received message content.
Date Thu, 27 Mar 2014 07:02:38 GMT
Repository: camel
Updated Branches:
  refs/heads/camel-2.13.x 2249559a0 -> 02f2945cd


Fix the KafkaConsumer to put the message in the body
Right now, the consumer would create an exchange for each received message.
However, it didn't filled the exchange body with the received message content.

Right now, it is set as an array of bytes but in the future we could use the Consumer decoder
class to convert the content in the right type.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6af7f21b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6af7f21b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6af7f21b

Branch: refs/heads/camel-2.13.x
Commit: 6af7f21b51298c7cda8a263ae74a8f75c4b5af02
Parents: e2d549d
Author: Fabien Chaillou <fabien.chaillou@gmail.com>
Authored: Wed Mar 26 15:14:10 2014 -0400
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Thu Mar 27 08:05:48 2014 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/camel/component/kafka/KafkaEndpoint.java   | 1 +
 .../test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java | 1 +
 2 files changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6af7f21b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index b700850..f88e3d6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -111,6 +111,7 @@ public class KafkaEndpoint extends DefaultEndpoint {
         message.setHeader(KafkaConstants.PARTITION, mm.partition());
         message.setHeader(KafkaConstants.TOPIC, mm.topic());
         message.setHeader(KafkaConstants.KEY, new String(mm.key()));
+        message.setBody(mm.message());
         exchange.setIn(message);
 
         return exchange;

http://git-wip-us.apache.org/repos/asf/camel/blob/6af7f21b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
index cb9be59..a8ca6c3 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
@@ -79,6 +79,7 @@ public class KafkaConsumerIT extends CamelTestSupport {
     @Test
     public void kaftMessageIsConsumedByCamel() throws InterruptedException, IOException {
         to.expectedMessageCount(5);
+        to.expectedBodiesReceived("message-0","message-1","message-2","message-3","message-4"
);
         for (int k = 0; k < 5; k++) {
             String msg = "message-" + k;
             KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC,
"1", msg);


Mime
View raw message