nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbe...@apache.org
Subject [nifi] branch master updated: NIFI-5267 Kafka support to replay by timestamp
Date Fri, 15 Mar 2019 13:19:24 GMT
This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 45ebeba  NIFI-5267 Kafka support to replay by timestamp
45ebeba is described below

commit 45ebeba846cf257c0dd27669c25e244208758fb9
Author: Sandish Kumar <sany@phdata.io>
AuthorDate: Wed Mar 13 17:35:51 2019 -0500

    NIFI-5267 Kafka support to replay by timestamp
    
    This closes #3372.
    
    Signed-off-by: Bryan Bende <bbende@apache.org>
---
 .../org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java    | 1 +
 .../java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java   | 4 ++++
 .../org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java  | 1 +
 .../org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java    | 1 +
 .../java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java   | 4 ++++
 .../org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java  | 1 +
 .../apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java   | 1 +
 .../org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java     | 1 +
 .../java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java   | 4 ++++
 .../org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java  | 1 +
 .../apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java   | 1 +
 .../org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java     | 1 +
 .../java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java   | 4 ++++
 .../org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java  | 1 +
 14 files changed, 26 insertions(+)

diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
index 0d80e76..4bc26a1 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
@@ -63,6 +63,7 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of
message if present and if single message. "
             + "How the key is encoded depends on the value of the 'Key Attribute Encoding'
property."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset
of the message in the partition of the topic."),
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The
timestamp of the message in the partition of the topic."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The
partition of the topic the message or message bundle is from"),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic
the message or message bundle is from")
 })
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 26562b9..85c85f6 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -442,6 +442,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
             // And continue to the next message.
             final Map<String, String> attributes = new HashMap<>();
             attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
+            attributes.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp()));
             attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
             attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
 
@@ -566,6 +567,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     private void populateAttributes(final BundleTracker tracker) {
         final Map<String, String> kafkaAttrs = new HashMap<>();
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
         if (tracker.key != null && tracker.totalRecords == 1) {
             kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
         }
@@ -590,6 +592,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     private static class BundleTracker {
 
         final long initialOffset;
+        final long initialTimestamp;
         final int partition;
         final String topic;
         final String key;
@@ -603,6 +606,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
 
         private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final
TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter)
{
             this.initialOffset = initialRecord.offset();
+            this.initialTimestamp = initialRecord.timestamp();
             this.partition = topicPartition.partition();
             this.topic = topicPartition.topic();
             this.recordWriter = recordWriter;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 2dfe1f0..3cde7ce 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -65,6 +65,7 @@ final class KafkaProcessorUtils {
     static final String KAFKA_TOPIC = "kafka.topic";
     static final String KAFKA_PARTITION = "kafka.partition";
     static final String KAFKA_OFFSET = "kafka.offset";
+    static final String KAFKA_TIMESTAMP = "kafka.timestamp";
     static final String KAFKA_COUNT = "kafka.count";
     static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT",
"PLAINTEXT");
     static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java
index 1a627dc..5ac2dfc 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java
@@ -64,6 +64,7 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of
message if present and if single message. "
             + "How the key is encoded depends on the value of the 'Key Attribute Encoding'
property."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset
of the message in the partition of the topic."),
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The
timestamp of the message in the partition of the topic."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The
partition of the topic the message or message bundle is from"),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic
the message or message bundle is from")
 })
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 815ed18..c61c549 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -463,6 +463,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         // If we are unable to parse the data, we need to transfer it to 'parse failure'
relationship
         final Map<String, String> attributes = getAttributes(consumerRecord);
         attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
+        attributes.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp()));
         attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
         attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());
 
@@ -623,6 +624,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     private void populateAttributes(final BundleTracker tracker) {
         final Map<String, String> kafkaAttrs = new HashMap<>();
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
         if (tracker.key != null && tracker.totalRecords == 1) {
             kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
         }
@@ -647,6 +649,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     private static class BundleTracker {
 
         final long initialOffset;
+        final long initialTimestamp;
         final int partition;
         final String topic;
         final String key;
@@ -660,6 +663,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
 
         private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final
TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter)
{
             this.initialOffset = initialRecord.offset();
+            this.initialTimestamp = initialRecord.timestamp();
             this.partition = topicPartition.partition();
             this.topic = topicPartition.topic();
             this.recordWriter = recordWriter;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index b9a6ae1..37c6599 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -65,6 +65,7 @@ final class KafkaProcessorUtils {
     static final String KAFKA_TOPIC = "kafka.topic";
     static final String KAFKA_PARTITION = "kafka.partition";
     static final String KAFKA_OFFSET = "kafka.offset";
+    static final String KAFKA_TIMESTAMP = "kafka.timestamp";
     static final String KAFKA_COUNT = "kafka.count";
     static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT",
"PLAINTEXT");
     static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
index 9a831db..28a582c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
@@ -68,6 +68,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
     @WritesAttribute(attribute = "record.count", description = "The number of records received"),
     @WritesAttribute(attribute = "mime.type", description = "The MIME Type that is provided
by the configured Record Writer"),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The
partition of the topic the records are from"),
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The
timestamp of the message in the partition of the topic."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic
records are from")
 })
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java
index 0245352..511d85f 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java
@@ -64,6 +64,7 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of
message if present and if single message. "
             + "How the key is encoded depends on the value of the 'Key Attribute Encoding'
property."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset
of the message in the partition of the topic."),
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The
timestamp of the message in the partition of the topic."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The
partition of the topic the message or message bundle is from"),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic
the message or message bundle is from")
 })
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index ac88a4c..f6b0d94 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -463,6 +463,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         // If we are unable to parse the data, we need to transfer it to 'parse failure'
relationship
         final Map<String, String> attributes = getAttributes(consumerRecord);
         attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
+        attributes.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp()));
         attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
         attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());
 
@@ -623,6 +624,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     private void populateAttributes(final BundleTracker tracker) {
         final Map<String, String> kafkaAttrs = new HashMap<>();
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
         if (tracker.key != null && tracker.totalRecords == 1) {
             kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
         }
@@ -647,6 +649,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     private static class BundleTracker {
 
         final long initialOffset;
+        final long initialTimestamp;
         final int partition;
         final String topic;
         final String key;
@@ -660,6 +663,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
 
         private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final
TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter)
{
             this.initialOffset = initialRecord.offset();
+            this.initialTimestamp = initialRecord.timestamp();
             this.partition = topicPartition.partition();
             this.topic = topicPartition.topic();
             this.recordWriter = recordWriter;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 2dfe1f0..3cde7ce 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -65,6 +65,7 @@ final class KafkaProcessorUtils {
     static final String KAFKA_TOPIC = "kafka.topic";
     static final String KAFKA_PARTITION = "kafka.partition";
     static final String KAFKA_OFFSET = "kafka.offset";
+    static final String KAFKA_TIMESTAMP = "kafka.timestamp";
     static final String KAFKA_COUNT = "kafka.count";
     static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT",
"PLAINTEXT");
     static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
index 2cafd97..fb31e32 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
@@ -69,6 +69,7 @@ import java.util.regex.Pattern;
     @WritesAttribute(attribute = "record.count", description = "The number of records received"),
     @WritesAttribute(attribute = "mime.type", description = "The MIME Type that is provided
by the configured Record Writer"),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The
partition of the topic the records are from"),
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The
timestamp of the message in the partition of the topic."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic
records are from")
 })
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
index 758ac87..13bebc9 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
@@ -66,6 +66,7 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of
message if present and if single message. "
             + "How the key is encoded depends on the value of the 'Key Attribute Encoding'
property."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset
of the message in the partition of the topic."),
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The
timestamp of the message in the partition of the topic."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The
partition of the topic the message or message bundle is from"),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic
the message or message bundle is from")
 })
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 5c32c82..2a90219 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -463,6 +463,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         // If we are unable to parse the data, we need to transfer it to 'parse failure'
relationship
         final Map<String, String> attributes = getAttributes(consumerRecord);
         attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
+        attributes.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp()));
         attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
         attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());
 
@@ -623,6 +624,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     private void populateAttributes(final BundleTracker tracker) {
         final Map<String, String> kafkaAttrs = new HashMap<>();
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
         if (tracker.key != null && tracker.totalRecords == 1) {
             kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
         }
@@ -647,6 +649,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     private static class BundleTracker {
 
         final long initialOffset;
+        final long initialTimestamp;
         final int partition;
         final String topic;
         final String key;
@@ -660,6 +663,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
 
         private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final
TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter)
{
             this.initialOffset = initialRecord.offset();
+            this.initialTimestamp = initialRecord.timestamp();
             this.partition = topicPartition.partition();
             this.topic = topicPartition.topic();
             this.recordWriter = recordWriter;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 2dfe1f0..3cde7ce 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -65,6 +65,7 @@ final class KafkaProcessorUtils {
     static final String KAFKA_TOPIC = "kafka.topic";
     static final String KAFKA_PARTITION = "kafka.partition";
     static final String KAFKA_OFFSET = "kafka.offset";
+    static final String KAFKA_TIMESTAMP = "kafka.timestamp";
     static final String KAFKA_COUNT = "kafka.count";
     static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT",
"PLAINTEXT");
     static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");


Mime
View raw message