camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [2/2] camel git commit: Kafka offset of the message included as Camel Exchange header.
Date Fri, 07 Aug 2015 11:53:27 GMT
Kafka offset of the message included as Camel Exchange header.

This might be helpful also for logging purposes.
In the test I've reused an offset value which was already present but
not checked.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c81a0516
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c81a0516
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c81a0516

Branch: refs/heads/master
Commit: c81a051616ecf97139c2ec3a8b972320b570167b
Parents: 3fc9de7
Author: tarilabs <matteo.mortari@gmail.com>
Authored: Fri Aug 7 13:43:07 2015 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Fri Aug 7 14:00:27 2015 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/camel/component/kafka/KafkaConstants.java  | 1 +
 .../main/java/org/apache/camel/component/kafka/KafkaEndpoint.java   | 1 +
 .../java/org/apache/camel/component/kafka/KafkaEndpointTest.java    | 1 +
 3 files changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c81a0516/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index 6c31b65..3397060 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -27,6 +27,7 @@ public final class KafkaConstants {
     public static final String PARTITION = "kafka.EXCHANGE_NAME";
     public static final String KEY = "kafka.CONTENT_TYPE";
     public static final String TOPIC = "kafka.TOPIC";
+    public static final String OFFSET = "kafka.OFFSET";
 
     public static final String KAFKA_DEFAULT_ENCODER = "kafka.serializer.DefaultEncoder";
     public static final String KAFKA_STRING_ENCODER = "kafka.serializer.StringEncoder";

http://git-wip-us.apache.org/repos/asf/camel/blob/c81a0516/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 78863f8..165c984 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -113,6 +113,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         Message message = new DefaultMessage();
         message.setHeader(KafkaConstants.PARTITION, mm.partition());
         message.setHeader(KafkaConstants.TOPIC, mm.topic());
+        message.setHeader(KafkaConstants.OFFSET, mm.offset());
         if (mm.key() != null) {
             message.setHeader(KafkaConstants.KEY, new String(mm.key()));
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/c81a0516/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
index ed4a6d1..be16766 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
@@ -44,6 +44,7 @@ public class KafkaEndpointTest {
         assertEquals("somekey", exchange.getIn().getHeader(KafkaConstants.KEY));
         assertEquals("topic", exchange.getIn().getHeader(KafkaConstants.TOPIC));
         assertEquals(4, exchange.getIn().getHeader(KafkaConstants.PARTITION));
+        assertEquals(56L, exchange.getIn().getHeader(KafkaConstants.OFFSET));
     }
 
     @Test


Mime
View raw message