nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [nifi] branch main updated: NIFI-7953: Updated ConsumeKafka_2_0/ConsumeKafkaRecord_2_0/ConsumeKafka_2_6/ConsumeKafkaRecord_2_6 to allow separating records by key
Date Tue, 27 Oct 2020 23:04:27 GMT
This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c610aab  NIFI-7953: Updated ConsumeKafka_2_0/ConsumeKafkaRecord_2_0/ConsumeKafka_2_6/ConsumeKafkaRecord_2_6 to allow separating records by key
c610aab is described below

commit c610aab3cb017d7030381f2715de446923870966
Author: Mark Payne <markap14@hotmail.com>
AuthorDate: Tue Oct 27 12:46:12 2020 -0400

    NIFI-7953: Updated ConsumeKafka_2_0/ConsumeKafkaRecord_2_0/ConsumeKafka_2_6/ConsumeKafkaRecord_2_6 to allow separating records by key
---
 .../java/org/apache/nifi/util/MockFlowFile.java    |  3 +-
 .../kafka/pubsub/ConsumeKafkaRecord_2_0.java       | 49 +++++++++++++++++-----
 .../processors/kafka/pubsub/ConsumeKafka_2_0.java  | 17 +++++++-
 .../processors/kafka/pubsub/ConsumerLease.java     | 37 ++++++++++------
 .../nifi/processors/kafka/pubsub/ConsumerPool.java | 33 ++++++++++-----
 .../kafka/pubsub/KafkaProcessorUtils.java          |  2 +
 .../processors/kafka/pubsub/ConsumerPoolTest.java  |  2 +
 .../kafka/pubsub/ConsumeKafkaRecord_2_6.java       | 31 +++++++++++++-
 .../processors/kafka/pubsub/ConsumeKafka_2_6.java  | 18 +++++++-
 .../processors/kafka/pubsub/ConsumerLease.java     | 33 ++++++++++-----
 .../nifi/processors/kafka/pubsub/ConsumerPool.java | 21 +++++++---
 .../kafka/pubsub/KafkaProcessorUtils.java          |  5 +--
 .../processors/kafka/pubsub/ConsumerPoolTest.java  |  2 +
 13 files changed, 194 insertions(+), 59 deletions(-)

diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
index 375158c..254320e 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
@@ -226,7 +226,8 @@ public class MockFlowFile implements FlowFileRecord {
     }
 
     public void assertAttributeEquals(final String attributeName, final String expectedValue) {
-        Assert.assertEquals(expectedValue, attributes.get(attributeName));
+        Assert.assertEquals("Expected attribute " + attributeName + " to be " + expectedValue + " but instead it was " + attributes.get(attributeName),
+            expectedValue, attributes.get(attributeName));
     }
 
     public void assertAttributeNotEquals(final String attributeName, final String expectedValue) {
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
index 7f5c75a..79824b1 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
@@ -34,7 +34,6 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -57,6 +56,12 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
+
 @CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 2.0 Consumer API. "
     + "The complementary NiFi processor for sending messages is PublishKafkaRecord_2_0. Please note that, at this time, the Processor assumes that "
     + "all records that are retrieved from a given partition have the same schema. If any of the Kafka messages are pulled but cannot be parsed or written with the "
@@ -77,7 +82,7 @@ import java.util.regex.Pattern;
         description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
         + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
         + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.",
-        expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
+        expressionLanguageScope = VARIABLE_REGISTRY)
 @SeeAlso({ConsumeKafka_2_0.class, PublishKafka_2_0.class, PublishKafkaRecord_2_0.class})
 public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
 
@@ -93,7 +98,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
             .description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.")
             .required(true)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .expressionLanguageSupported(VARIABLE_REGISTRY)
             .build();
 
     static final PropertyDescriptor TOPIC_TYPE = new Builder()
@@ -110,7 +115,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
         .displayName("Record Reader")
         .description("The Record Reader to use for incoming FlowFiles")
         .identifiesControllerService(RecordReaderFactory.class)
-        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .expressionLanguageSupported(NONE)
         .required(true)
         .build();
 
@@ -119,7 +124,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
         .displayName("Record Writer")
         .description("The Record Writer to use in order to serialize the data before sending to Kafka")
         .identifiesControllerService(RecordSetWriterFactory.class)
-        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .expressionLanguageSupported(NONE)
         .required(true)
         .build();
 
@@ -129,7 +134,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
             .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.")
             .required(true)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .expressionLanguageSupported(VARIABLE_REGISTRY)
             .build();
 
     static final PropertyDescriptor AUTO_OFFSET_RESET = new Builder()
@@ -179,7 +184,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
             + "read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. If "
             + "this value is true, NiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer must wait "
             + "for the producer to finish its entire transaction instead of pulling as the messages become available.")
-        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .expressionLanguageSupported(NONE)
         .allowableValues("true", "false")
         .defaultValue("true")
         .required(true)
@@ -203,8 +208,25 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
             + "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling "
             + "the messages together efficiently.")
         .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .expressionLanguageSupported(NONE)
+        .required(false)
+        .build();
+    static final PropertyDescriptor SEPARATE_BY_KEY = new Builder()
+        .name("separate-by-key")
+        .displayName("Separate By Key")
+        .description("If true, two Records will only be added to the same FlowFile if both of the Kafka Messages have identical keys.")
         .required(false)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .build();
+    static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
+        .name("key-attribute-encoding")
+        .displayName("Key Attribute Encoding")
+        .description("If the <Separate By Key> property is set to true, FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY +
+            "'. This property dictates how the value of the attribute should be encoded.")
+        .required(true)
+        .defaultValue(UTF8_ENCODING.getValue())
+        .allowableValues(UTF8_ENCODING, HEX_ENCODING, DO_NOT_ADD_KEY_AS_ATTRIBUTE)
         .build();
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -242,6 +264,8 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
         descriptors.add(KafkaProcessorUtils.TOKEN_AUTH);
         descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
         descriptors.add(GROUP_ID);
+        descriptors.add(SEPARATE_BY_KEY);
+        descriptors.add(KEY_ATTRIBUTE_ENCODING);
         descriptors.add(AUTO_OFFSET_RESET);
         descriptors.add(MESSAGE_HEADER_ENCODING);
         descriptors.add(HEADER_NAME_REGEX);
@@ -283,7 +307,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
                 .name(propertyDescriptorName)
                 .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class))
                 .dynamic(true)
-                .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                .expressionLanguageSupported(VARIABLE_REGISTRY)
                 .build();
     }
 
@@ -328,6 +352,9 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
         final String headerNameRegex = context.getProperty(HEADER_NAME_REGEX).getValue();
         final Pattern headerNamePattern = headerNameRegex == null ? null : Pattern.compile(headerNameRegex);
 
+        final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
+        final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
+
         if (topicType.equals(TOPIC_NAME.getValue())) {
             for (final String topic : topicListing.split(",", 100)) {
                 final String trimmedName = topic.trim();
@@ -337,11 +364,11 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
             }
 
             return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topics, maxUncommittedTime, securityProtocol,
-                bootstrapServers, log, honorTransactions, charset, headerNamePattern);
+                bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding);
         } else if (topicType.equals(TOPIC_PATTERN.getValue())) {
             final Pattern topicPattern = Pattern.compile(topicListing.trim());
             return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topicPattern, maxUncommittedTime, securityProtocol,
-                bootstrapServers, log, honorTransactions, charset, headerNamePattern);
+                bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding);
         } else {
             getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
             return null;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
index 13bebc9..fc00693 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
@@ -146,6 +146,16 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
                     + "will result in a single FlowFile which  "
                     + "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS")
             .build();
+
+    static final PropertyDescriptor SEPARATE_BY_KEY = new PropertyDescriptor.Builder()
+        .name("separate-by-key")
+        .displayName("Separate By Key")
+        .description("If true, and the <Message Demarcator> property is set, two messages will only be added to the same FlowFile if both of the Kafka Messages have identical keys.")
+        .required(false)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .build();
+
     static final PropertyDescriptor HEADER_NAME_REGEX = new PropertyDescriptor.Builder()
         .name("header-name-regex")
         .displayName("Headers to Add as Attributes (Regex)")
@@ -234,6 +244,7 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
         descriptors.add(AUTO_OFFSET_RESET);
         descriptors.add(KEY_ATTRIBUTE_ENCODING);
         descriptors.add(MESSAGE_DEMARCATOR);
+        descriptors.add(SEPARATE_BY_KEY);
         descriptors.add(MESSAGE_HEADER_ENCODING);
         descriptors.add(HEADER_NAME_REGEX);
         descriptors.add(MAX_POLL_RECORDS);
@@ -315,6 +326,8 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
         final String headerNameRegex = context.getProperty(HEADER_NAME_REGEX).getValue();
         final Pattern headerNamePattern = headerNameRegex == null ? null : Pattern.compile(headerNameRegex);
 
+        final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
+
         if (topicType.equals(TOPIC_NAME.getValue())) {
             for (final String topic : topicListing.split(",", 100)) {
                 final String trimmedName = topic.trim();
@@ -323,11 +336,11 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
                 }
             }
 
-            return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol,
+            return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topics, maxUncommittedTime, keyEncoding, securityProtocol,
                 bootstrapServers, log, honorTransactions, charset, headerNamePattern);
         } else if (topicType.equals(TOPIC_PATTERN.getValue())) {
             final Pattern topicPattern = Pattern.compile(topicListing.trim());
-            return new ConsumerPool(maxLeases, demarcator, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol,
+            return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol,
                 bootstrapServers, log, honorTransactions, charset, headerNamePattern);
         } else {
             getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 2674dd9..3ecec49 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -49,6 +49,7 @@ import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -83,6 +84,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     private final RecordReaderFactory readerFactory;
     private final Charset headerCharacterSet;
     private final Pattern headerNamePattern;
+    private final boolean separateByKey;
     private boolean poisoned = false;
     //used for tracking demarcated flowfiles to their TopicPartition so we can append
     //to them on subsequent poll calls
@@ -103,7 +105,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
             final RecordSetWriterFactory writerFactory,
             final ComponentLog logger,
             final Charset headerCharacterSet,
-            final Pattern headerNamePattern) {
+            final Pattern headerNamePattern,
+            final boolean separateByKey) {
         this.maxWaitMillis = maxWaitMillis;
         this.kafkaConsumer = kafkaConsumer;
         this.demarcatorBytes = demarcatorBytes;
@@ -115,6 +118,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         this.logger = logger;
         this.headerCharacterSet = headerCharacterSet;
         this.headerNamePattern = headerNamePattern;
+        this.separateByKey = separateByKey;
     }
 
     /**
@@ -164,7 +168,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
      * flowfiles necessary or appends to existing ones if in demarcation mode.
      */
     void poll() {
-        /**
+        /*
          * Implementation note:
          * Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged,
          * for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread.
@@ -202,7 +206,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
             return false;
         }
         try {
-            /**
+            /*
              * Committing the nifi session then the offsets means we have an at
              * least once guarantee here. If we reversed the order we'd have at
              * most once.
@@ -412,7 +416,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     private void writeDemarcatedData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
         // Group the Records by their BundleInformation
         final Map<BundleInformation, List<ConsumerRecord<byte[], byte[]>>> map = records.stream()
-            .collect(Collectors.groupingBy(rec -> new BundleInformation(topicPartition, null, getAttributes(rec))));
+            .collect(Collectors.groupingBy(rec -> new BundleInformation(topicPartition, null, getAttributes(rec), separateByKey ? rec.key() : null)));
 
         for (final Map.Entry<BundleInformation, List<ConsumerRecord<byte[], byte[]>>> entry : map.entrySet()) {
             final BundleInformation bundleInfo = entry.getKey();
@@ -538,7 +542,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
                         while ((record = reader.nextRecord()) != null) {
                             // Determine the bundle for this record.
                             final RecordSchema recordSchema = record.getSchema();
-                            final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);
+                            final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes, separateByKey ? consumerRecord.key() : null);
 
                             BundleTracker tracker = bundleMap.get(bundleInfo);
                             if (tracker == null) {
@@ -626,9 +630,16 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         final Map<String, String> kafkaAttrs = new HashMap<>();
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
-        if (tracker.key != null && tracker.totalRecords == 1) {
-            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+
+        // If we have a kafka key, we will add it as an attribute only if
+        // the FlowFile contains a single Record, or if the Records have been separated by Key,
+        // because we then know that even though there are multiple Records, they all have the same key.
+        if (tracker.key != null && (tracker.totalRecords == 1 || separateByKey)) {
+            if (!keyEncoding.equalsIgnoreCase(KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE.getValue())) {
+                kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+            }
         }
+
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
         if (tracker.totalRecords > 1) {
@@ -647,8 +658,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         tracker.updateFlowFile(newFlowFile);
     }
 
-    private static class BundleTracker {
 
+    private static class BundleTracker {
         final long initialOffset;
         final long initialTimestamp;
         final int partition;
@@ -678,23 +689,24 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         private void updateFlowFile(final FlowFile flowFile) {
             this.flowFile = flowFile;
         }
-
     }
 
     private static class BundleInformation {
         private final TopicPartition topicPartition;
         private final RecordSchema schema;
         private final Map<String, String> attributes;
+        private final byte[] messageKey;
 
-        public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema, final Map<String, String> attributes) {
+        public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema, final Map<String, String> attributes, final byte[] messageKey) {
             this.topicPartition = topicPartition;
             this.schema = schema;
             this.attributes = attributes;
+            this.messageKey = messageKey;
         }
 
         @Override
         public int hashCode() {
-            return 41 + 13 * topicPartition.hashCode() + ((schema == null) ? 0 : 13 * schema.hashCode()) + ((attributes == null) ? 0 : 13 * attributes.hashCode());
+            return 41 + Objects.hash(topicPartition, schema, attributes) + 37 * Arrays.hashCode(messageKey);
         }
 
         @Override
@@ -710,7 +722,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
             }
 
             final BundleInformation other = (BundleInformation) obj;
-            return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema) && Objects.equals(attributes, other.attributes);
+            return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema) && Objects.equals(attributes, other.attributes)
+                && Arrays.equals(this.messageKey, other.messageKey);
         }
     }
 }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 7f02b26..0462729 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -59,6 +59,7 @@ public class ConsumerPool implements Closeable {
     private final RecordSetWriterFactory writerFactory;
     private final Charset headerCharacterSet;
     private final Pattern headerNamePattern;
+    private final boolean separateByKey;
     private final AtomicLong consumerCreatedCountRef = new AtomicLong();
     private final AtomicLong consumerClosedCountRef = new AtomicLong();
     private final AtomicLong leasesObtainedCountRef = new AtomicLong();
@@ -86,6 +87,7 @@ public class ConsumerPool implements Closeable {
     public ConsumerPool(
             final int maxConcurrentLeases,
             final byte[] demarcator,
+            final boolean separateByKey,
             final Map<String, Object> kafkaProperties,
             final List<String> topics,
             final long maxWaitMillis,
@@ -111,11 +113,13 @@ public class ConsumerPool implements Closeable {
         this.honorTransactions = honorTransactions;
         this.headerCharacterSet = headerCharacterSet;
         this.headerNamePattern = headerNamePattern;
+        this.separateByKey = separateByKey;
     }
 
     public ConsumerPool(
             final int maxConcurrentLeases,
             final byte[] demarcator,
+            final boolean separateByKey,
             final Map<String, Object> kafkaProperties,
             final Pattern topics,
             final long maxWaitMillis,
@@ -141,6 +145,7 @@ public class ConsumerPool implements Closeable {
         this.honorTransactions = honorTransactions;
         this.headerCharacterSet = headerCharacterSet;
         this.headerNamePattern = headerNamePattern;
+        this.separateByKey = separateByKey;
     }
 
     public ConsumerPool(
@@ -155,12 +160,13 @@ public class ConsumerPool implements Closeable {
             final ComponentLog logger,
             final boolean honorTransactions,
             final Charset headerCharacterSet,
-            final Pattern headerNamePattern) {
+            final Pattern headerNamePattern,
+            final boolean separateByKey,
+            final String keyEncoding) {
         this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
         this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
         this.demarcatorBytes = null;
-        this.keyEncoding = null;
         this.readerFactory = readerFactory;
         this.writerFactory = writerFactory;
         this.securityProtocol = securityProtocol;
@@ -171,6 +177,8 @@ public class ConsumerPool implements Closeable {
         this.honorTransactions = honorTransactions;
         this.headerCharacterSet = headerCharacterSet;
         this.headerNamePattern = headerNamePattern;
+        this.separateByKey = separateByKey;
+        this.keyEncoding = keyEncoding;
     }
 
     public ConsumerPool(
@@ -185,12 +193,13 @@ public class ConsumerPool implements Closeable {
             final ComponentLog logger,
             final boolean honorTransactions,
             final Charset headerCharacterSet,
-            final Pattern headerNamePattern) {
+            final Pattern headerNamePattern,
+            final boolean separateByKey,
+            final String keyEncoding) {
         this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
         this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
         this.demarcatorBytes = null;
-        this.keyEncoding = null;
         this.readerFactory = readerFactory;
         this.writerFactory = writerFactory;
         this.securityProtocol = securityProtocol;
@@ -201,6 +210,8 @@ public class ConsumerPool implements Closeable {
         this.honorTransactions = honorTransactions;
         this.headerCharacterSet = headerCharacterSet;
         this.headerNamePattern = headerNamePattern;
+        this.separateByKey = separateByKey;
+        this.keyEncoding = keyEncoding;
     }
 
     /**
@@ -218,7 +229,8 @@ public class ConsumerPool implements Closeable {
         if (lease == null) {
             final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
             consumerCreatedCountRef.incrementAndGet();
-            /**
+
+            /*
              * For now return a new consumer lease. But we could later elect to
              * have this return null if we determine the broker indicates that
              * the lag time on all topics being monitored is sufficiently low.
@@ -228,10 +240,9 @@ public class ConsumerPool implements Closeable {
              * sitting idle which could prompt excessive rebalances.
              */
             lease = new SimpleConsumerLease(consumer);
-            /**
-             * This subscription tightly couples the lease to the given
-             * consumer. They cannot be separated from then on.
-             */
+
+            // This subscription tightly couples the lease to the given
+            // consumer. They cannot be separated from then on.
             if (topics != null) {
               consumer.subscribe(topics, lease);
             } else {
@@ -268,7 +279,7 @@ public class ConsumerPool implements Closeable {
     public void close() {
         final List<SimpleConsumerLease> leases = new ArrayList<>();
         pooledLeases.drainTo(leases);
-        leases.stream().forEach((lease) -> {
+        leases.forEach((lease) -> {
             lease.close(true);
         });
     }
@@ -301,7 +312,7 @@ public class ConsumerPool implements Closeable {
 
         private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
             super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers,
-                readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern);
+                readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern, separateByKey);
             this.consumer = consumer;
         }
 
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index e756776..b089fce 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -60,6 +60,8 @@ public final class KafkaProcessorUtils {
     static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
     static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
             "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
+    static final AllowableValue DO_NOT_ADD_KEY_AS_ATTRIBUTE = new AllowableValue("do-not-add", "Do Not Add Key as Attribute",
+        "The key will not be added as an Attribute");
 
     static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
 
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 9d53ee6..d006a6e 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -70,6 +70,7 @@ public class ConsumerPoolTest {
         testPool = new ConsumerPool(
                 1,
                 null,
+                false,
                 Collections.emptyMap(),
                 Collections.singletonList("nifi"),
                 100L,
@@ -88,6 +89,7 @@ public class ConsumerPoolTest {
         testDemarcatedPool = new ConsumerPool(
                 1,
                 "--demarcator--".getBytes(StandardCharsets.UTF_8),
+                false,
                 Collections.emptyMap(),
                 Collections.singletonList("nifi"),
                 100L,
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
index 8043058..3e7b16a 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
@@ -57,6 +57,10 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
+
 @CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 2.6 Consumer API. "
     + "The complementary NiFi processor for sending messages is PublishKafkaRecord_2_6. Please note that, at this time, the Processor assumes that "
     + "all records that are retrieved from a given partition have the same schema. If any of the Kafka messages are pulled but cannot be parsed or written with the "
@@ -207,6 +211,24 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor {
         .required(false)
         .build();
 
+    static final PropertyDescriptor SEPARATE_BY_KEY = new Builder()
+        .name("separate-by-key")
+        .displayName("Separate By Key")
+        .description("If true, two Records will only be added to the same FlowFile if both of the Kafka Messages have identical keys.")
+        .required(false)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .build();
+    static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
+        .name("key-attribute-encoding")
+        .displayName("Key Attribute Encoding")
+        .description("If the <Separate By Key> property is set to true, FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY +
+            "'. This property dictates how the value of the attribute should be encoded.")
+        .required(true)
+        .defaultValue(UTF8_ENCODING.getValue())
+        .allowableValues(UTF8_ENCODING, HEX_ENCODING, DO_NOT_ADD_KEY_AS_ATTRIBUTE)
+        .build();
+
     static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("FlowFiles received from Kafka.  Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.")
@@ -242,6 +264,8 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor {
         descriptors.add(KafkaProcessorUtils.TOKEN_AUTH);
         descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
         descriptors.add(GROUP_ID);
+        descriptors.add(SEPARATE_BY_KEY);
+        descriptors.add(KEY_ATTRIBUTE_ENCODING);
         descriptors.add(AUTO_OFFSET_RESET);
         descriptors.add(MESSAGE_HEADER_ENCODING);
         descriptors.add(HEADER_NAME_REGEX);
@@ -328,6 +352,9 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor {
         final String headerNameRegex = context.getProperty(HEADER_NAME_REGEX).getValue();
         final Pattern headerNamePattern = headerNameRegex == null ? null : Pattern.compile(headerNameRegex);
 
+        final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
+        final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
+
         if (topicType.equals(TOPIC_NAME.getValue())) {
             for (final String topic : topicListing.split(",", 100)) {
                 final String trimmedName = topic.trim();
@@ -337,11 +364,11 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor {
             }
 
             return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topics, maxUncommittedTime, securityProtocol,
-                bootstrapServers, log, honorTransactions, charset, headerNamePattern);
+                bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding);
         } else if (topicType.equals(TOPIC_PATTERN.getValue())) {
             final Pattern topicPattern = Pattern.compile(topicListing.trim());
             return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topicPattern, maxUncommittedTime, securityProtocol,
-                bootstrapServers, log, honorTransactions, charset, headerNamePattern);
+                bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding);
         } else {
             getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
             return null;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
index c96bb60..5461abb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
@@ -146,6 +146,17 @@ public class ConsumeKafka_2_6 extends AbstractProcessor {
                     + "will result in a single FlowFile which  "
                     + "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS")
             .build();
+
+
+    static final PropertyDescriptor SEPARATE_BY_KEY = new PropertyDescriptor.Builder()
+        .name("separate-by-key")
+        .displayName("Separate By Key")
+        .description("If true, and the <Message Demarcator> property is set, two messages will only be added to the same FlowFile if both of the Kafka Messages have identical keys.")
+        .required(false)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .build();
+
     static final PropertyDescriptor HEADER_NAME_REGEX = new PropertyDescriptor.Builder()
         .name("header-name-regex")
         .displayName("Headers to Add as Attributes (Regex)")
@@ -234,6 +245,7 @@ public class ConsumeKafka_2_6 extends AbstractProcessor {
         descriptors.add(AUTO_OFFSET_RESET);
         descriptors.add(KEY_ATTRIBUTE_ENCODING);
         descriptors.add(MESSAGE_DEMARCATOR);
+        descriptors.add(SEPARATE_BY_KEY);
         descriptors.add(MESSAGE_HEADER_ENCODING);
         descriptors.add(HEADER_NAME_REGEX);
         descriptors.add(MAX_POLL_RECORDS);
@@ -315,6 +327,8 @@ public class ConsumeKafka_2_6 extends AbstractProcessor {
         final String headerNameRegex = context.getProperty(HEADER_NAME_REGEX).getValue();
         final Pattern headerNamePattern = headerNameRegex == null ? null : Pattern.compile(headerNameRegex);
 
+        final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
+
         if (topicType.equals(TOPIC_NAME.getValue())) {
             for (final String topic : topicListing.split(",", 100)) {
                 final String trimmedName = topic.trim();
@@ -323,11 +337,11 @@ public class ConsumeKafka_2_6 extends AbstractProcessor {
                 }
             }
 
-            return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol,
+            return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topics, maxUncommittedTime, keyEncoding, securityProtocol,
                 bootstrapServers, log, honorTransactions, charset, headerNamePattern);
         } else if (topicType.equals(TOPIC_PATTERN.getValue())) {
             final Pattern topicPattern = Pattern.compile(topicListing.trim());
-            return new ConsumerPool(maxLeases, demarcator, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol,
+            return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol,
                 bootstrapServers, log, honorTransactions, charset, headerNamePattern);
         } else {
             getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 458165b..c3846a2 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -49,6 +49,7 @@ import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -83,6 +84,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     private final RecordReaderFactory readerFactory;
     private final Charset headerCharacterSet;
     private final Pattern headerNamePattern;
+    private final boolean separateByKey;
     private boolean poisoned = false;
     //used for tracking demarcated flowfiles to their TopicPartition so we can append
     //to them on subsequent poll calls
@@ -103,7 +105,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
             final RecordSetWriterFactory writerFactory,
             final ComponentLog logger,
             final Charset headerCharacterSet,
-            final Pattern headerNamePattern) {
+            final Pattern headerNamePattern,
+            final boolean separateByKey) {
         this.maxWaitMillis = maxWaitMillis;
         this.kafkaConsumer = kafkaConsumer;
         this.demarcatorBytes = demarcatorBytes;
@@ -115,6 +118,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         this.logger = logger;
         this.headerCharacterSet = headerCharacterSet;
         this.headerNamePattern = headerNamePattern;
+        this.separateByKey = separateByKey;
     }
 
     /**
@@ -412,7 +416,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     private void writeDemarcatedData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
         // Group the Records by their BundleInformation
         final Map<BundleInformation, List<ConsumerRecord<byte[], byte[]>>> map = records.stream()
-            .collect(Collectors.groupingBy(rec -> new BundleInformation(topicPartition, null, getAttributes(rec))));
+            .collect(Collectors.groupingBy(rec -> new BundleInformation(topicPartition, null, getAttributes(rec), separateByKey ? rec.key() : null)));
 
         for (final Map.Entry<BundleInformation, List<ConsumerRecord<byte[], byte[]>>> entry : map.entrySet()) {
             final BundleInformation bundleInfo = entry.getKey();
@@ -538,7 +542,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
                         while ((record = reader.nextRecord()) != null) {
                             // Determine the bundle for this record.
                             final RecordSchema recordSchema = record.getSchema();
-                            final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);
+                            final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes, separateByKey ? consumerRecord.key() : null);
 
                             BundleTracker tracker = bundleMap.get(bundleInfo);
                             if (tracker == null) {
@@ -626,9 +630,16 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         final Map<String, String> kafkaAttrs = new HashMap<>();
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
-        if (tracker.key != null && tracker.totalRecords == 1) {
-            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+
+        // If we have a kafka key, we will add it as an attribute only if
+        // the FlowFile contains a single Record, or if the Records have been separated by Key,
+        // because we then know that even though there are multiple Records, they all have the same key.
+        if (tracker.key != null && (tracker.totalRecords == 1 || separateByKey)) {
+            if (!keyEncoding.equalsIgnoreCase(KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE.getValue())) {
+                kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+            }
         }
+
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
         if (tracker.totalRecords > 1) {
@@ -647,8 +658,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         tracker.updateFlowFile(newFlowFile);
     }
 
-    private static class BundleTracker {
 
+    private static class BundleTracker {
         final long initialOffset;
         final long initialTimestamp;
         final int partition;
@@ -678,23 +689,24 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         private void updateFlowFile(final FlowFile flowFile) {
             this.flowFile = flowFile;
         }
-
     }
 
     private static class BundleInformation {
         private final TopicPartition topicPartition;
         private final RecordSchema schema;
         private final Map<String, String> attributes;
+        private final byte[] messageKey;
 
-        public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema, final Map<String, String> attributes) {
+        public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema, final Map<String, String> attributes, final byte[] messageKey) {
             this.topicPartition = topicPartition;
             this.schema = schema;
             this.attributes = attributes;
+            this.messageKey = messageKey;
         }
 
         @Override
         public int hashCode() {
-            return 41 + 13 * topicPartition.hashCode() + ((schema == null) ? 0 : 13 * schema.hashCode()) + ((attributes == null) ? 0 : 13 * attributes.hashCode());
+            return 41 + Objects.hash(topicPartition, schema, attributes) + 37 * Arrays.hashCode(messageKey);
         }
 
         @Override
@@ -710,7 +722,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
             }
 
             final BundleInformation other = (BundleInformation) obj;
-            return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema) && Objects.equals(attributes, other.attributes);
+            return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema) && Objects.equals(attributes, other.attributes)
+                && Arrays.equals(this.messageKey, other.messageKey);
         }
     }
 }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 7f02b26..2a33298 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -59,6 +59,7 @@ public class ConsumerPool implements Closeable {
     private final RecordSetWriterFactory writerFactory;
     private final Charset headerCharacterSet;
     private final Pattern headerNamePattern;
+    private final boolean separateByKey;
     private final AtomicLong consumerCreatedCountRef = new AtomicLong();
     private final AtomicLong consumerClosedCountRef = new AtomicLong();
     private final AtomicLong leasesObtainedCountRef = new AtomicLong();
@@ -86,6 +87,7 @@ public class ConsumerPool implements Closeable {
     public ConsumerPool(
             final int maxConcurrentLeases,
             final byte[] demarcator,
+            final boolean separateByKey,
             final Map<String, Object> kafkaProperties,
             final List<String> topics,
             final long maxWaitMillis,
@@ -111,11 +113,13 @@ public class ConsumerPool implements Closeable {
         this.honorTransactions = honorTransactions;
         this.headerCharacterSet = headerCharacterSet;
         this.headerNamePattern = headerNamePattern;
+        this.separateByKey = separateByKey;
     }
 
     public ConsumerPool(
             final int maxConcurrentLeases,
             final byte[] demarcator,
+            final boolean separateByKey,
             final Map<String, Object> kafkaProperties,
             final Pattern topics,
             final long maxWaitMillis,
@@ -141,6 +145,7 @@ public class ConsumerPool implements Closeable {
         this.honorTransactions = honorTransactions;
         this.headerCharacterSet = headerCharacterSet;
         this.headerNamePattern = headerNamePattern;
+        this.separateByKey = separateByKey;
     }
 
     public ConsumerPool(
@@ -155,12 +160,13 @@ public class ConsumerPool implements Closeable {
             final ComponentLog logger,
             final boolean honorTransactions,
             final Charset headerCharacterSet,
-            final Pattern headerNamePattern) {
+            final Pattern headerNamePattern,
+            final boolean separateByKey,
+            final String keyEncoding) {
         this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
         this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
         this.demarcatorBytes = null;
-        this.keyEncoding = null;
         this.readerFactory = readerFactory;
         this.writerFactory = writerFactory;
         this.securityProtocol = securityProtocol;
@@ -171,6 +177,8 @@ public class ConsumerPool implements Closeable {
         this.honorTransactions = honorTransactions;
         this.headerCharacterSet = headerCharacterSet;
         this.headerNamePattern = headerNamePattern;
+        this.separateByKey = separateByKey;
+        this.keyEncoding = keyEncoding;
     }
 
     public ConsumerPool(
@@ -185,12 +193,13 @@ public class ConsumerPool implements Closeable {
             final ComponentLog logger,
             final boolean honorTransactions,
             final Charset headerCharacterSet,
-            final Pattern headerNamePattern) {
+            final Pattern headerNamePattern,
+            final boolean separateByKey,
+            final String keyEncoding) {
         this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
         this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
         this.demarcatorBytes = null;
-        this.keyEncoding = null;
         this.readerFactory = readerFactory;
         this.writerFactory = writerFactory;
         this.securityProtocol = securityProtocol;
@@ -201,6 +210,8 @@ public class ConsumerPool implements Closeable {
         this.honorTransactions = honorTransactions;
         this.headerCharacterSet = headerCharacterSet;
         this.headerNamePattern = headerNamePattern;
+        this.separateByKey = separateByKey;
+        this.keyEncoding = keyEncoding;
     }
 
     /**
@@ -301,7 +312,7 @@ public class ConsumerPool implements Closeable {
 
         private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
             super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers,
-                readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern);
+                readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern, separateByKey);
             this.consumer = consumer;
         }
 
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index e756776..44a6984 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -50,7 +50,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
-import java.util.regex.Pattern;
 
 public final class KafkaProcessorUtils {
     private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
@@ -60,8 +59,8 @@ public final class KafkaProcessorUtils {
     static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
     static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
             "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
-
-    static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
+    static final AllowableValue DO_NOT_ADD_KEY_AS_ATTRIBUTE = new AllowableValue("do-not-add", "Do Not Add Key as Attribute",
+        "The key will not be added as an Attribute");
 
     static final String KAFKA_KEY = "kafka.key";
     static final String KAFKA_TOPIC = "kafka.topic";
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 9d53ee6..d006a6e 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -70,6 +70,7 @@ public class ConsumerPoolTest {
         testPool = new ConsumerPool(
                 1,
                 null,
+                false,
                 Collections.emptyMap(),
                 Collections.singletonList("nifi"),
                 100L,
@@ -88,6 +89,7 @@ public class ConsumerPoolTest {
         testDemarcatedPool = new ConsumerPool(
                 1,
                 "--demarcator--".getBytes(StandardCharsets.UTF_8),
+                false,
                 Collections.emptyMap(),
                 Collections.singletonList("nifi"),
                 100L,


Mime
View raw message