nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [1/2] nifi git commit: NIFI-2670: This closes #954. Expose option for how to interpret Kafka Key - hexadeimal encoding or UTF-8 String
Date Fri, 26 Aug 2016 02:18:08 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 102a9a2b7 -> 405252244


NIFI-2670: This closes #954. Expose option for how to interpret Kafka Key - hexadeimal encoding or UTF-8 String


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/58e0ce7f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/58e0ce7f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/58e0ce7f

Branch: refs/heads/master
Commit: 58e0ce7f92f708422276a68f4e04c61470c71b11
Parents: 102a9a2
Author: Mark Payne <markap14@hotmail.com>
Authored: Thu Aug 25 21:00:45 2016 -0400
Committer: joewitt <joewitt@apache.org>
Committed: Thu Aug 25 21:39:40 2016 -0400

----------------------------------------------------------------------
 .../kafka/pubsub/ConsumeKafka_0_10.java         |  37 ++++-
 .../kafka/pubsub/KafkaProcessorUtils.java       |   2 +-
 .../kafka/pubsub/PublishKafka_0_10.java         |  50 +++++--
 .../kafka/pubsub/ConsumeKafkaTest.java          | 133 +++++++++++++++++
 .../kafka/pubsub/PublishKafkaTest.java          |  46 ++++++
 .../kafka/pubsub/StubPublishKafka.java          |  12 +-
 .../processors/kafka/pubsub/ConsumeKafka.java   |  37 ++++-
 .../kafka/pubsub/KafkaProcessorUtils.java       |   2 +-
 .../processors/kafka/pubsub/PublishKafka.java   |  57 ++++++--
 .../kafka/pubsub/ConsumeKafkaTest.java          | 146 ++++++++++++++++++-
 .../kafka/pubsub/PublishKafkaTest.java          |  46 ++++++
 .../kafka/pubsub/StubPublishKafka.java          |  19 ++-
 12 files changed, 538 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/58e0ce7f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
index 53d6fcd..847f8a4 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
@@ -62,7 +62,8 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURI
 @Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.10"})
 @WritesAttributes({
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"),
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY_HEX, description = "The hex encoded key of message if present and if single message"),
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
+        + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
@@ -82,6 +83,10 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
 
     static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
 
+    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
+    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
+        "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
+
     static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
             .name("topic")
             .displayName("Topic Name(s)")
@@ -110,6 +115,15 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
             .defaultValue(OFFSET_LATEST.getValue())
             .build();
 
+    static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
+            .name("key-attribute-encoding")
+            .displayName("Key Attribute Encoding")
+            .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
+            .required(true)
+            .defaultValue(UTF8_ENCODING.getValue())
+            .allowableValues(UTF8_ENCODING, HEX_ENCODING)
+            .build();
+
     static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
             .name("message-demarcator")
             .displayName("Message Demarcator")
@@ -148,6 +162,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
         descriptors.add(TOPICS);
         descriptors.add(GROUP_ID);
         descriptors.add(AUTO_OFFSET_RESET);
+        descriptors.add(KEY_ATTRIBUTE_ENCODING);
         descriptors.add(MESSAGE_DEMARCATOR);
         descriptors.add(MAX_POLL_RECORDS);
         DESCRIPTORS = Collections.unmodifiableList(descriptors);
@@ -290,10 +305,24 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
         }
     }
 
+    private String encodeKafkaKey(final byte[] key, final String encoding) {
+        if (key == null) {
+            return null;
+        }
+
+        if (HEX_ENCODING.getValue().equals(encoding)) {
+            return DatatypeConverter.printHexBinary(key);
+        } else if (UTF8_ENCODING.getValue().equals(encoding)) {
+            return new String(key, StandardCharsets.UTF_8);
+        } else {
+            return null;    // won't happen because it is guaranteed by the Allowable Values
+        }
+    }
+
     private void writeData(final ProcessContext context, final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final long startTimeNanos) {
         final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
         final String offset = String.valueOf(firstRecord.offset());
-        final String keyHex = (firstRecord.key() != null) ? DatatypeConverter.printHexBinary(firstRecord.key()) : null;
+        final String keyValue = encodeKafkaKey(firstRecord.key(), context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue());
         final String topic = firstRecord.topic();
         final String partition = String.valueOf(firstRecord.partition());
         FlowFile flowFile = session.create();
@@ -309,8 +338,8 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
         });
         final Map<String, String> kafkaAttrs = new HashMap<>();
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset);
-        if (keyHex != null && records.size() == 1) {
-            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY_HEX, keyHex);
+        if (keyValue != null && records.size() == 1) {
+            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, keyValue);
         }
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition);
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic);

http://git-wip-us.apache.org/repos/asf/nifi/blob/58e0ce7f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index c2cc32a..3ae7495 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -57,7 +57,7 @@ final class KafkaProcessorUtils {
 
     static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
 
-    static final String KAFKA_KEY_HEX = "kafka.key.hex";
+    static final String KAFKA_KEY = "kafka.key";
     static final String KAFKA_TOPIC = "kafka.topic";
     static final String KAFKA_PARTITION = "kafka.partition";
     static final String KAFKA_OFFSET = "kafka.offset";

http://git-wip-us.apache.org/repos/asf/nifi/blob/58e0ce7f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
index 3ad2fc6..5175f13 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
@@ -104,6 +104,10 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
     static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
             "DefaultPartitioner", "Messages will be assigned to random partitions.");
 
+    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
+    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
+        "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
+
     static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
             .name("topic")
             .displayName("Topic Name")
@@ -155,6 +159,15 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
             .expressionLanguageSupported(true)
             .build();
 
+    static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
+            .name("key-attribute-encoding")
+            .displayName("Key Attribute Encoding")
+            .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
+            .required(true)
+            .defaultValue(UTF8_ENCODING.getValue())
+            .allowableValues(UTF8_ENCODING, HEX_ENCODING)
+            .build();
+
     static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
             .name("message-demarcator")
             .displayName("Message Demarcator")
@@ -216,6 +229,7 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
         _descriptors.add(TOPIC);
         _descriptors.add(DELIVERY_GUARANTEE);
         _descriptors.add(KEY);
+        _descriptors.add(KEY_ATTRIBUTE_ENCODING);
         _descriptors.add(MESSAGE_DEMARCATOR);
         _descriptors.add(MAX_REQUEST_SIZE);
         _descriptors.add(META_WAIT_TIME);
@@ -449,26 +463,18 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
      * regardless if it has #FAILED* attributes set.
      */
     private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, InputStream contentStream) {
-        String topicName;
-        byte[] keyBytes;
-        byte[] delimiterBytes = null;
+        final byte[] keyBytes = getMessageKey(flowFile, context);
+
+        final String topicName;
+        final byte[] delimiterBytes;
         int lastAckedMessageIndex = -1;
         if (this.isFailedFlowFile(flowFile)) {
             lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX));
             topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR);
-            keyBytes = flowFile.getAttribute(FAILED_KEY_ATTR) != null
-                    ? flowFile.getAttribute(FAILED_KEY_ATTR).getBytes(StandardCharsets.UTF_8) : null;
             delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null
                     ? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null;
-
         } else {
             topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
-            String _key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
-            keyBytes = _key == null ? null : _key.getBytes(StandardCharsets.UTF_8);
-            String keyHex = flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY_HEX);
-            if (_key == null && keyHex != null && KafkaProcessorUtils.HEX_KEY_PATTERN.matcher(keyHex).matches()) {
-                keyBytes = DatatypeConverter.parseHexBinary(keyHex);
-            }
             delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR)
                     .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null;
         }
@@ -480,6 +486,26 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
         return publishingContext;
     }
 
+    private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) {
+        final String uninterpretedKey;
+        if (context.getProperty(KEY).isSet()) {
+            uninterpretedKey = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+        } else {
+            uninterpretedKey = flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY);
+        }
+
+        if (uninterpretedKey == null) {
+            return null;
+        }
+
+        final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
+        if (UTF8_ENCODING.getValue().equals(keyEncoding)) {
+            return uninterpretedKey.getBytes(StandardCharsets.UTF_8);
+        }
+
+        return DatatypeConverter.parseHexBinary(uninterpretedKey);
+    }
+
     /**
      * Will remove FAILED_* attributes if FlowFile is no longer considered a
      * failed FlowFile

http://git-wip-us.apache.org/repos/asf/nifi/blob/58e0ce7f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index c172b03..a85563d 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -301,6 +301,7 @@ public class ConsumeKafkaTest {
 
     }
 
+    @SuppressWarnings({"rawtypes", "unchecked"})
     private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
         final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
         final TopicPartition tPart = new TopicPartition(topic, partition);
@@ -314,6 +315,23 @@ public class ConsumeKafkaTest {
         return new ConsumerRecords(map);
     }
 
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final Map<byte[], byte[]> rawRecords) {
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
+        final TopicPartition tPart = new TopicPartition(topic, partition);
+        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+        long offset = startingOffset;
+
+        for (final Map.Entry<byte[], byte[]> entry : rawRecords.entrySet()) {
+            final byte[] key = entry.getKey();
+            final byte[] rawRecord = entry.getValue();
+            final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, key, rawRecord);
+            records.add(rec);
+        }
+        map.put(tPart, records);
+        return new ConsumerRecords(map);
+    }
+
     private ConsumerRecords<byte[], byte[]> mergeRecords(final ConsumerRecords<byte[], byte[]>... records) {
         final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
         for (final ConsumerRecords<byte[], byte[]> rec : records) {
@@ -493,4 +511,119 @@ public class ConsumeKafkaTest {
 
         assertNull(null, mockPool.actualCommitOffsets);
     }
+
+    @Test
+    public void validateUtf8Key() {
+        String groupName = "validateGetAllMessages";
+
+        final Map<byte[], byte[]> rawRecords = new HashMap<>();
+        rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
+        rawRecords.put(new byte[0], "Hello-2".getBytes());
+        rawRecords.put(null, "Hello-3".getBytes());
+
+        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
+
+        final List<String> expectedTopics = new ArrayList<>();
+        expectedTopics.add("foo");
+        expectedTopics.add("bar");
+        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
+        mockPool.nextPlannedRecordsQueue.add(firstRecs);
+
+        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
+            @Override
+            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
+                return mockPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
+
+        runner.run(1, false);
+
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
+
+        assertEquals(expectedTopics, mockPool.actualTopics);
+
+        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
+        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
+        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
+
+        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "key1".equals(key)).count());
+        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
+        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
+
+
+        //asert that all consumers were closed as expected
+        //assert that the consumer pool was properly closed
+        assertFalse(mockPool.wasConsumerLeasePoisoned);
+        assertTrue(mockPool.wasConsumerLeaseClosed);
+        assertFalse(mockPool.wasPoolClosed);
+        runner.run(1, true);
+        assertFalse(mockPool.wasConsumerLeasePoisoned);
+        assertTrue(mockPool.wasConsumerLeaseClosed);
+        assertTrue(mockPool.wasPoolClosed);
+    }
+
+    @Test
+    public void validateHexKey() {
+        String groupName = "validateGetAllMessages";
+
+        final Map<byte[], byte[]> rawRecords = new HashMap<>();
+        rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
+        rawRecords.put(new byte[0], "Hello-2".getBytes());
+        rawRecords.put(null, "Hello-3".getBytes());
+
+        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
+
+        final List<String> expectedTopics = new ArrayList<>();
+        expectedTopics.add("foo");
+        expectedTopics.add("bar");
+        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
+        mockPool.nextPlannedRecordsQueue.add(firstRecs);
+
+        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
+            @Override
+            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
+                return mockPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
+        runner.setProperty(ConsumeKafka_0_10.KEY_ATTRIBUTE_ENCODING, ConsumeKafka_0_10.HEX_ENCODING);
+
+        runner.run(1, false);
+
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
+
+        assertEquals(expectedTopics, mockPool.actualTopics);
+
+        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
+        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
+        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
+
+        final String expectedHex = (Integer.toHexString('k') + Integer.toHexString('e') + Integer.toHexString('y') + Integer.toHexString('1')).toUpperCase();
+
+        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> expectedHex.equals(key)).count());
+        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
+        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
+
+
+        //asert that all consumers were closed as expected
+        //assert that the consumer pool was properly closed
+        assertFalse(mockPool.wasConsumerLeasePoisoned);
+        assertTrue(mockPool.wasConsumerLeaseClosed);
+        assertFalse(mockPool.wasPoolClosed);
+        runner.run(1, true);
+        assertFalse(mockPool.wasConsumerLeasePoisoned);
+        assertTrue(mockPool.wasConsumerLeaseClosed);
+        assertTrue(mockPool.wasPoolClosed);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/58e0ce7f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
index 5480ea7..af0d343 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
@@ -17,6 +17,10 @@
 package org.apache.nifi.processors.kafka.pubsub;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -326,4 +330,46 @@ public class PublishKafkaTest {
         verify(producer, times(2)).send(Mockito.any(ProducerRecord.class));
         runner.shutdown();
     }
+
+    @Test
+    public void validateUtf8Key() {
+        String topicName = "validateUtf8Key";
+        StubPublishKafka putKafka = new StubPublishKafka(100);
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka_0_10.KEY, "${myKey}");
+
+        final Map<String, String> attributes = Collections.singletonMap("myKey", "key1");
+        runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
+        final Map<Object, Object> msgs = putKafka.getMessagesSent();
+        assertEquals(1, msgs.size());
+        final byte[] msgKey = (byte[]) msgs.keySet().iterator().next();
+        assertTrue(Arrays.equals("key1".getBytes(StandardCharsets.UTF_8), msgKey));
+    }
+
+    @Test
+    public void validateHexKey() {
+        String topicName = "validateUtf8Key";
+        StubPublishKafka putKafka = new StubPublishKafka(100);
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka_0_10.KEY_ATTRIBUTE_ENCODING, PublishKafka_0_10.HEX_ENCODING);
+        runner.setProperty(PublishKafka_0_10.KEY, "${myKey}");
+
+        final Map<String, String> attributes = Collections.singletonMap("myKey", "6B657931");
+        runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
+        final Map<Object, Object> msgs = putKafka.getMessagesSent();
+        assertEquals(1, msgs.size());
+        final byte[] msgKey = (byte[]) msgs.keySet().iterator().next();
+
+        assertTrue(Arrays.equals(new byte[] {0x6B, 0x65, 0x79, 0x31}, msgKey));
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/58e0ce7f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
index 27d86f5..c009014 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
@@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -53,6 +54,7 @@ public class StubPublishKafka extends PublishKafka_0_10 {
     private final int ackCheckSize;
 
     private final ExecutorService executor = Executors.newCachedThreadPool();
+    private final Map<Object, Object> msgsSent = new ConcurrentHashMap<>();
 
     StubPublishKafka(int ackCheckSize) {
         this.ackCheckSize = ackCheckSize;
@@ -66,6 +68,10 @@ public class StubPublishKafka extends PublishKafka_0_10 {
         this.executor.shutdownNow();
     }
 
+    public Map<Object, Object> getMessagesSent() {
+        return new HashMap<>(msgsSent);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session)
@@ -107,7 +113,11 @@ public class StubPublishKafka extends PublishKafka_0_10 {
         when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer<Future<RecordMetadata>>() {
             @Override
             public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
-                ProducerRecord<byte[], byte[]> record = (ProducerRecord<byte[], byte[]>) invocation.getArguments()[0];
+                final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
+                if (record != null && record.key() != null) {
+                    msgsSent.put(record.key(), record.value());
+                }
+
                 String value = new String(record.value(), StandardCharsets.UTF_8);
                 if ("fail".equals(value) && !StubPublishKafka.this.failed) {
                     StubPublishKafka.this.failed = true;

http://git-wip-us.apache.org/repos/asf/nifi/blob/58e0ce7f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
index e5255f5..0a3fe5d 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
@@ -62,7 +62,8 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURI
 @Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.9.x"})
 @WritesAttributes({
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"),
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY_HEX, description = "The hex encoded key of message if present and if single message"),
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
+        + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
@@ -82,6 +83,10 @@ public class ConsumeKafka extends AbstractProcessor {
 
     static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
 
+    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
+    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
+        "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
+
     static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
             .name("topic")
             .displayName("Topic Name(s)")
@@ -110,6 +115,15 @@ public class ConsumeKafka extends AbstractProcessor {
             .defaultValue(OFFSET_LATEST.getValue())
             .build();
 
+    static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
+            .name("key-attribute-encoding")
+            .displayName("Key Attribute Encoding")
+            .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
+            .required(true)
+            .defaultValue(UTF8_ENCODING.getValue())
+            .allowableValues(UTF8_ENCODING, HEX_ENCODING)
+            .build();
+
     static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
             .name("message-demarcator")
             .displayName("Message Demarcator")
@@ -148,6 +162,7 @@ public class ConsumeKafka extends AbstractProcessor {
         descriptors.add(TOPICS);
         descriptors.add(GROUP_ID);
         descriptors.add(AUTO_OFFSET_RESET);
+        descriptors.add(KEY_ATTRIBUTE_ENCODING);
         descriptors.add(MESSAGE_DEMARCATOR);
         descriptors.add(MAX_POLL_RECORDS);
         DESCRIPTORS = Collections.unmodifiableList(descriptors);
@@ -290,10 +305,24 @@ public class ConsumeKafka extends AbstractProcessor {
         }
     }
 
+    private String encodeKafkaKey(final byte[] key, final String encoding) {
+        if (key == null) {
+            return null;
+        }
+
+        if (HEX_ENCODING.getValue().equals(encoding)) {
+            return DatatypeConverter.printHexBinary(key);
+        } else if (UTF8_ENCODING.getValue().equals(encoding)) {
+            return new String(key, StandardCharsets.UTF_8);
+        } else {
+            return null;    // won't happen because it is guaranteed by the Allowable Values
+        }
+    }
+
     private void writeData(final ProcessContext context, final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final long startTimeNanos) {
         final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
         final String offset = String.valueOf(firstRecord.offset());
-        final String keyHex = (firstRecord.key() != null) ? DatatypeConverter.printHexBinary(firstRecord.key()) : null;
+        final String keyValue = encodeKafkaKey(firstRecord.key(), context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue());
         final String topic = firstRecord.topic();
         final String partition = String.valueOf(firstRecord.partition());
         FlowFile flowFile = session.create();
@@ -309,8 +338,8 @@ public class ConsumeKafka extends AbstractProcessor {
         });
         final Map<String, String> kafkaAttrs = new HashMap<>();
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset);
-        if (keyHex != null && records.size() == 1) {
-            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY_HEX, keyHex);
+        if (keyValue != null && records.size() == 1) {
+            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, keyValue);
         }
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition);
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic);

http://git-wip-us.apache.org/repos/asf/nifi/blob/58e0ce7f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index fd747fc..c74ad18 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -57,7 +57,7 @@ final class KafkaProcessorUtils {
 
     static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
 
-    static final String KAFKA_KEY_HEX = "kafka.key.hex";
+    static final String KAFKA_KEY = "kafka.key";
     static final String KAFKA_TOPIC = "kafka.topic";
     static final String KAFKA_PARTITION = "kafka.partition";
     static final String KAFKA_OFFSET = "kafka.offset";

http://git-wip-us.apache.org/repos/asf/nifi/blob/58e0ce7f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
index 65f386e..4e1403d 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
@@ -104,6 +104,10 @@ public class PublishKafka extends AbstractSessionFactoryProcessor {
     static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
             "DefaultPartitioner", "Messages will be assigned to random partitions.");
 
+    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
+    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
+        "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
+
     static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
             .name("topic")
             .displayName("Topic Name")
@@ -146,15 +150,23 @@ public class PublishKafka extends AbstractSessionFactoryProcessor {
     static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
             .name("kafka-key")
             .displayName("Kafka Key")
-            .description("The Key to use for the Message.  It will be serialized as UTF-8 bytes. "
-                    + "If not specified then the flow file attribute kafka.key.hex is used if present "
-                    + "and we're not demarcating. In that case the hex string is coverted to its byte"
-                    + "form and written as a byte[] key.")
+            .description("The Key to use for the Message. "
+                    + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present "
+                    + "and we're not demarcating.")
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(true)
             .build();
 
+    static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
+            .name("key-attribute-encoding")
+            .displayName("Key Attribute Encoding")
+            .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
+            .required(true)
+            .defaultValue(UTF8_ENCODING.getValue())
+            .allowableValues(UTF8_ENCODING, HEX_ENCODING)
+            .build();
+
     static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
             .name("message-demarcator")
             .displayName("Message Demarcator")
@@ -216,6 +228,7 @@ public class PublishKafka extends AbstractSessionFactoryProcessor {
         _descriptors.add(TOPIC);
         _descriptors.add(DELIVERY_GUARANTEE);
         _descriptors.add(KEY);
+        _descriptors.add(KEY_ATTRIBUTE_ENCODING);
         _descriptors.add(MESSAGE_DEMARCATOR);
         _descriptors.add(MAX_REQUEST_SIZE);
         _descriptors.add(META_WAIT_TIME);
@@ -449,26 +462,18 @@ public class PublishKafka extends AbstractSessionFactoryProcessor {
      * regardless if it has #FAILED* attributes set.
      */
     private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, InputStream contentStream) {
-        String topicName;
-        byte[] keyBytes;
-        byte[] delimiterBytes = null;
+        final byte[] keyBytes = getMessageKey(flowFile, context);
+
+        final String topicName;
+        final byte[] delimiterBytes;
         int lastAckedMessageIndex = -1;
         if (this.isFailedFlowFile(flowFile)) {
             lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX));
             topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR);
-            keyBytes = flowFile.getAttribute(FAILED_KEY_ATTR) != null
-                    ? flowFile.getAttribute(FAILED_KEY_ATTR).getBytes(StandardCharsets.UTF_8) : null;
             delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null
                     ? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null;
-
         } else {
             topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
-            String _key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
-            keyBytes = _key == null ? null : _key.getBytes(StandardCharsets.UTF_8);
-            String keyHex = flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY_HEX);
-            if (_key == null && keyHex != null && KafkaProcessorUtils.HEX_KEY_PATTERN.matcher(keyHex).matches()) {
-                keyBytes = DatatypeConverter.parseHexBinary(keyHex);
-            }
             delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR)
                     .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null;
         }
@@ -480,6 +485,26 @@ public class PublishKafka extends AbstractSessionFactoryProcessor {
         return publishingContext;
     }
 
+    private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) {
+        final String uninterpretedKey;
+        if (context.getProperty(KEY).isSet()) {
+            uninterpretedKey = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+        } else {
+            uninterpretedKey = flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY);
+        }
+
+        if (uninterpretedKey == null) {
+            return null;
+        }
+
+        final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
+        if (UTF8_ENCODING.getValue().equals(keyEncoding)) {
+            return uninterpretedKey.getBytes(StandardCharsets.UTF_8);
+        }
+
+        return DatatypeConverter.parseHexBinary(uninterpretedKey);
+    }
+
     /**
      * Will remove FAILED_* attributes if FlowFile is no longer considered a
      * failed FlowFile

http://git-wip-us.apache.org/repos/asf/nifi/blob/58e0ce7f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 7874d4d..7e4b12c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -16,6 +16,12 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -25,24 +31,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
+
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 public class ConsumeKafkaTest {
 
@@ -301,6 +302,7 @@ public class ConsumeKafkaTest {
 
     }
 
+    @SuppressWarnings({"rawtypes", "unchecked"})
     private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
         final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
         final TopicPartition tPart = new TopicPartition(topic, partition);
@@ -314,6 +316,23 @@ public class ConsumeKafkaTest {
         return new ConsumerRecords(map);
     }
 
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final Map<byte[], byte[]> rawRecords) {
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
+        final TopicPartition tPart = new TopicPartition(topic, partition);
+        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+        long offset = startingOffset;
+
+        for (final Map.Entry<byte[], byte[]> entry : rawRecords.entrySet()) {
+            final byte[] key = entry.getKey();
+            final byte[] rawRecord = entry.getValue();
+            final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, key, rawRecord);
+            records.add(rec);
+        }
+        map.put(tPart, records);
+        return new ConsumerRecords(map);
+    }
+
     private ConsumerRecords<byte[], byte[]> mergeRecords(final ConsumerRecords<byte[], byte[]>... records) {
         final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
         for (final ConsumerRecords<byte[], byte[]> rec : records) {
@@ -493,4 +512,119 @@ public class ConsumeKafkaTest {
 
         assertNull(null, mockPool.actualCommitOffsets);
     }
+
+    @Test
+    public void validateUtf8Key() {
+        String groupName = "validateGetAllMessages";
+
+        final Map<byte[], byte[]> rawRecords = new HashMap<>();
+        rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
+        rawRecords.put(new byte[0], "Hello-2".getBytes());
+        rawRecords.put(null, "Hello-3".getBytes());
+
+        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
+
+        final List<String> expectedTopics = new ArrayList<>();
+        expectedTopics.add("foo");
+        expectedTopics.add("bar");
+        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
+        mockPool.nextPlannedRecordsQueue.add(firstRecs);
+
+        ConsumeKafka proc = new ConsumeKafka() {
+            @Override
+            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
+                return mockPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
+
+        runner.run(1, false);
+
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
+
+        assertEquals(expectedTopics, mockPool.actualTopics);
+
+        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
+        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
+        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
+
+        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "key1".equals(key)).count());
+        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
+        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
+
+
+        //asert that all consumers were closed as expected
+        //assert that the consumer pool was properly closed
+        assertFalse(mockPool.wasConsumerLeasePoisoned);
+        assertTrue(mockPool.wasConsumerLeaseClosed);
+        assertFalse(mockPool.wasPoolClosed);
+        runner.run(1, true);
+        assertFalse(mockPool.wasConsumerLeasePoisoned);
+        assertTrue(mockPool.wasConsumerLeaseClosed);
+        assertTrue(mockPool.wasPoolClosed);
+    }
+
+    @Test
+    public void validateHexKey() {
+        String groupName = "validateGetAllMessages";
+
+        final Map<byte[], byte[]> rawRecords = new HashMap<>();
+        rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
+        rawRecords.put(new byte[0], "Hello-2".getBytes());
+        rawRecords.put(null, "Hello-3".getBytes());
+
+        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
+
+        final List<String> expectedTopics = new ArrayList<>();
+        expectedTopics.add("foo");
+        expectedTopics.add("bar");
+        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
+        mockPool.nextPlannedRecordsQueue.add(firstRecs);
+
+        ConsumeKafka proc = new ConsumeKafka() {
+            @Override
+            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
+                return mockPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
+        runner.setProperty(ConsumeKafka.KEY_ATTRIBUTE_ENCODING, ConsumeKafka.HEX_ENCODING);
+
+        runner.run(1, false);
+
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
+
+        assertEquals(expectedTopics, mockPool.actualTopics);
+
+        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
+        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
+        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
+
+        final String expectedHex = (Integer.toHexString('k') + Integer.toHexString('e') + Integer.toHexString('y') + Integer.toHexString('1')).toUpperCase();
+
+        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> expectedHex.equals(key)).count());
+        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
+        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
+
+
+        //asert that all consumers were closed as expected
+        //assert that the consumer pool was properly closed
+        assertFalse(mockPool.wasConsumerLeasePoisoned);
+        assertTrue(mockPool.wasConsumerLeaseClosed);
+        assertFalse(mockPool.wasPoolClosed);
+        runner.run(1, true);
+        assertFalse(mockPool.wasConsumerLeasePoisoned);
+        assertTrue(mockPool.wasConsumerLeaseClosed);
+        assertTrue(mockPool.wasPoolClosed);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/58e0ce7f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
index 07ae2da..d81f0c1 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
@@ -17,6 +17,10 @@
 package org.apache.nifi.processors.kafka.pubsub;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -326,4 +330,46 @@ public class PublishKafkaTest {
         verify(producer, times(2)).send(Mockito.any(ProducerRecord.class));
         runner.shutdown();
     }
+
+    @Test
+    public void validateUtf8Key() {
+        String topicName = "validateUtf8Key";
+        StubPublishKafka putKafka = new StubPublishKafka(100);
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka.TOPIC, topicName);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka.KEY, "${myKey}");
+
+        final Map<String, String> attributes = Collections.singletonMap("myKey", "key1");
+        runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
+        final Map<Object, Object> msgs = putKafka.getMessagesSent();
+        assertEquals(1, msgs.size());
+        final byte[] msgKey = (byte[]) msgs.keySet().iterator().next();
+        assertTrue(Arrays.equals("key1".getBytes(StandardCharsets.UTF_8), msgKey));
+    }
+
+    @Test
+    public void validateHexKey() {
+        String topicName = "validateUtf8Key";
+        StubPublishKafka putKafka = new StubPublishKafka(100);
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka.TOPIC, topicName);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka.KEY_ATTRIBUTE_ENCODING, PublishKafka.HEX_ENCODING);
+        runner.setProperty(PublishKafka.KEY, "${myKey}");
+
+        final Map<String, String> attributes = Collections.singletonMap("myKey", "6B657931");
+        runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
+        final Map<Object, Object> msgs = putKafka.getMessagesSent();
+        assertEquals(1, msgs.size());
+        final byte[] msgKey = (byte[]) msgs.keySet().iterator().next();
+
+        assertTrue(Arrays.equals(new byte[] {0x6B, 0x65, 0x79, 0x31}, msgKey));
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/58e0ce7f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
index 950d623..533655e 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
@@ -16,13 +16,16 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
-import java.lang.reflect.Field;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.BOOTSTRAP_SERVERS;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -38,9 +41,7 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.BOOTSTRAP_SERVERS;
 import org.mockito.Mockito;
-import static org.mockito.Mockito.mock;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -53,6 +54,7 @@ public class StubPublishKafka extends PublishKafka {
     private final int ackCheckSize;
 
     private final ExecutorService executor = Executors.newCachedThreadPool();
+    private final Map<Object, Object> msgsSent = new ConcurrentHashMap<>();
 
     StubPublishKafka(int ackCheckSize) {
         this.ackCheckSize = ackCheckSize;
@@ -66,6 +68,10 @@ public class StubPublishKafka extends PublishKafka {
         this.executor.shutdownNow();
     }
 
+    public Map<Object, Object> getMessagesSent() {
+        return new HashMap<>(msgsSent);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session)
@@ -82,6 +88,7 @@ public class StubPublishKafka extends PublishKafka {
             publisher = (KafkaPublisher) TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class);
             publisher.setAckWaitTime(15000);
             producer = mock(Producer.class);
+
             this.instrumentProducer(producer, false);
             Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer");
             kf.setAccessible(true);
@@ -107,7 +114,11 @@ public class StubPublishKafka extends PublishKafka {
         when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer<Future<RecordMetadata>>() {
             @Override
             public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
-                ProducerRecord<byte[], byte[]> record = (ProducerRecord<byte[], byte[]>) invocation.getArguments()[0];
+                final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
+                if (record != null && record.key() != null) {
+                    msgsSent.put(record.key(), record.value());
+                }
+
                 String value = new String(record.value(), StandardCharsets.UTF_8);
                 if ("fail".equals(value) && !StubPublishKafka.this.failed) {
                     StubPublishKafka.this.failed = true;


Mime
View raw message