Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 160D818A77 for ; Fri, 7 Aug 2015 11:53:27 +0000 (UTC) Received: (qmail 69157 invoked by uid 500); 7 Aug 2015 11:53:27 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 69105 invoked by uid 500); 7 Aug 2015 11:53:26 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 69092 invoked by uid 99); 7 Aug 2015 11:53:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Aug 2015 11:53:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B821CDF97F; Fri, 7 Aug 2015 11:53:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Fri, 07 Aug 2015 11:53:27 -0000 Message-Id: <0238e5835b4e4136b1023ac20a643a61@git.apache.org> In-Reply-To: <13fd1b79a482462dad7f7f2bbffb60a7@git.apache.org> References: <13fd1b79a482462dad7f7f2bbffb60a7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] camel git commit: Kafka offset of the message included as Camel Exchange header. Kafka offset of the message included as Camel Exchange header. This might be helpful also for logging purposes. In the test I've reused an offset value which was already present but not checked. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c81a0516 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c81a0516 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c81a0516 Branch: refs/heads/master Commit: c81a051616ecf97139c2ec3a8b972320b570167b Parents: 3fc9de7 Author: tarilabs Authored: Fri Aug 7 13:43:07 2015 +0200 Committer: Claus Ibsen Committed: Fri Aug 7 14:00:27 2015 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/camel/component/kafka/KafkaConstants.java | 1 + .../main/java/org/apache/camel/component/kafka/KafkaEndpoint.java | 1 + .../java/org/apache/camel/component/kafka/KafkaEndpointTest.java | 1 + 3 files changed, 3 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c81a0516/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java index 6c31b65..3397060 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java @@ -27,6 +27,7 @@ public final class KafkaConstants { public static final String PARTITION = "kafka.EXCHANGE_NAME"; public static final String KEY = "kafka.CONTENT_TYPE"; public static final String TOPIC = "kafka.TOPIC"; + public static final String OFFSET = "kafka.OFFSET"; public static final String KAFKA_DEFAULT_ENCODER = "kafka.serializer.DefaultEncoder"; public static final String KAFKA_STRING_ENCODER = "kafka.serializer.StringEncoder"; http://git-wip-us.apache.org/repos/asf/camel/blob/c81a0516/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index 78863f8..165c984 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -113,6 +113,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS Message message = new DefaultMessage(); message.setHeader(KafkaConstants.PARTITION, mm.partition()); message.setHeader(KafkaConstants.TOPIC, mm.topic()); + message.setHeader(KafkaConstants.OFFSET, mm.offset()); if (mm.key() != null) { message.setHeader(KafkaConstants.KEY, new String(mm.key())); } http://git-wip-us.apache.org/repos/asf/camel/blob/c81a0516/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java index ed4a6d1..be16766 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java @@ -44,6 +44,7 @@ public class KafkaEndpointTest { assertEquals("somekey", exchange.getIn().getHeader(KafkaConstants.KEY)); assertEquals("topic", exchange.getIn().getHeader(KafkaConstants.TOPIC)); assertEquals(4, exchange.getIn().getHeader(KafkaConstants.PARTITION)); + assertEquals(56L, exchange.getIn().getHeader(KafkaConstants.OFFSET)); } @Test