nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [2/3] nifi git commit: NIFI-3739: This closes #1695. Added ConsumeKafkaRecord_0_10 and PublishKafkaRecord_0_10 processors
Date Tue, 02 May 2017 00:59:57 GMT
http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index b375b34..3bf01eb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -16,21 +16,24 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.nifi.logging.ComponentLog;
-
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.regex.Pattern;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.KafkaException;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
 
 /**
  * A pool of Kafka Consumers for a given topic. Consumers can be obtained by
@@ -49,6 +52,8 @@ public class ConsumerPool implements Closeable {
     private final String keyEncoding;
     private final String securityProtocol;
     private final String bootstrapServers;
+    private final RecordReaderFactory readerFactory;
+    private final RecordSetWriterFactory writerFactory;
     private final AtomicLong consumerCreatedCountRef = new AtomicLong();
     private final AtomicLong consumerClosedCountRef = new AtomicLong();
     private final AtomicLong leasesObtainedCountRef = new AtomicLong();
@@ -93,6 +98,8 @@ public class ConsumerPool implements Closeable {
         this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
         this.topics = Collections.unmodifiableList(topics);
         this.topicPattern = null;
+        this.readerFactory = null;
+        this.writerFactory = null;
     }
 
     public ConsumerPool(
@@ -115,6 +122,56 @@ public class ConsumerPool implements Closeable {
         this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
         this.topics = null;
         this.topicPattern = topics;
+        this.readerFactory = null;
+        this.writerFactory = null;
+    }
+
+    public ConsumerPool(
+            final int maxConcurrentLeases,
+            final RecordReaderFactory readerFactory,
+            final RecordSetWriterFactory writerFactory,
+            final Map<String, Object> kafkaProperties,
+            final Pattern topics,
+            final long maxWaitMillis,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger) {
+        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.maxWaitMillis = maxWaitMillis;
+        this.logger = logger;
+        this.demarcatorBytes = null;
+        this.keyEncoding = null;
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+        this.topics = null;
+        this.topicPattern = topics;
+    }
+
+    public ConsumerPool(
+            final int maxConcurrentLeases,
+            final RecordReaderFactory readerFactory,
+            final RecordSetWriterFactory writerFactory,
+            final Map<String, Object> kafkaProperties,
+            final List<String> topics,
+            final long maxWaitMillis,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger) {
+        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.maxWaitMillis = maxWaitMillis;
+        this.logger = logger;
+        this.demarcatorBytes = null;
+        this.keyEncoding = null;
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+        this.topics = topics;
+        this.topicPattern = null;
     }
 
     /**
@@ -122,10 +179,12 @@ public class ConsumerPool implements Closeable {
      * initializes a new one if deemed necessary.
      *
      * @param session the session for which the consumer lease will be
-     * associated
+     *            associated
+     * @param processContext the ProcessContext for which the consumer
+     *            lease will be associated
      * @return consumer to use or null if not available or necessary
      */
-    public ConsumerLease obtainConsumer(final ProcessSession session) {
+    public ConsumerLease obtainConsumer(final ProcessSession session, final ProcessContext processContext) {
         SimpleConsumerLease lease = pooledLeases.poll();
         if (lease == null) {
             final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
@@ -150,7 +209,8 @@ public class ConsumerPool implements Closeable {
               consumer.subscribe(topicPattern, lease);
             }
         }
-        lease.setProcessSession(session);
+        lease.setProcessSession(session, processContext);
+
         leasesObtainedCountRef.incrementAndGet();
         return lease;
     }
@@ -200,15 +260,24 @@ public class ConsumerPool implements Closeable {
 
         private final Consumer<byte[], byte[]> consumer;
         private volatile ProcessSession session;
+        private volatile ProcessContext processContext;
         private volatile boolean closedConsumer;
 
         private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
-            super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers, logger);
+            super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers, readerFactory, writerFactory, logger);
             this.consumer = consumer;
         }
 
-        void setProcessSession(final ProcessSession session) {
+        void setProcessSession(final ProcessSession session, final ProcessContext context) {
             this.session = session;
+            this.processContext = context;
+        }
+
+        @Override
+        public void yield() {
+            if (processContext != null) {
+                processContext.yield();
+            }
         }
 
         @Override
@@ -229,7 +298,7 @@ public class ConsumerPool implements Closeable {
             super.close();
             if (session != null) {
                 session.rollback();
-                setProcessSession(null);
+                setProcessSession(null, null);
             }
             if (forceClose || isPoisoned() || !pooledLeases.offer(this)) {
                 closedConsumer = true;

http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
new file mode 100644
index 0000000..f96a575
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.FlowFileFilters;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.RecordWriter;
+
+@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "0.10.x"})
+@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 0.10.x Producer API. "
+    + "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. "
+    + " Please note there are cases where the publisher can get into an indefinite stuck state.  We are closely monitoring"
+    + " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can.  In the meantime"
+    + " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on. The complementary NiFi "
+    + "processor for fetching messages is ConsumeKafka_0_10_Record.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
+    description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+    + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+    + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
+@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
+    + "FlowFiles that are routed to success.")
+@SeeAlso({PublishKafka_0_10.class, ConsumeKafka_0_10.class, ConsumeKafkaRecord_0_10.class})
+public class PublishKafkaRecord_0_10 extends AbstractProcessor {
+    protected static final String MSG_COUNT = "msg.count";
+
+    static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
+        "FlowFile will be routed to failure unless the message is replicated to the appropriate "
+            + "number of Kafka Nodes according to the Topic configuration");
+    static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery",
+        "FlowFile will be routed to success if the message is received by a single Kafka node, "
+            + "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> "
+            + "but can result in data loss if a Kafka node crashes");
+    static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort",
+        "FlowFile will be routed to success after successfully writing the content to a Kafka node, "
+            + "without waiting for a response. This provides the best performance but may result in data loss.");
+
+    static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(Partitioners.RoundRobinPartitioner.class.getName(),
+        Partitioners.RoundRobinPartitioner.class.getSimpleName(),
+        "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, "
+            + "the next Partition to Partition 2, and so on, wrapping as necessary.");
+    static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
+        "DefaultPartitioner", "Messages will be assigned to random partitions.");
+
+    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
+    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
+        "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
+
+    static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
+        .name("topic")
+        .displayName("Topic Name")
+        .description("The name of the Kafka Topic to publish to.")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+        .name("record-reader")
+        .displayName("Record Reader")
+        .description("The Record Reader to use for incoming FlowFiles")
+        .identifiesControllerService(RecordReaderFactory.class)
+        .expressionLanguageSupported(false)
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+        .name("record-writer")
+        .displayName("Record Writer")
+        .description("The Record Writer to use in order to serialize the data before sending to Kafka")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .expressionLanguageSupported(false)
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor MESSAGE_KEY_FIELD = new PropertyDescriptor.Builder()
+        .name("message-key-field")
+        .displayName("Message Key Field")
+        .description("The name of a field in the Input Records that should be used as the Key for the Kafka message.")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .required(false)
+        .build();
+
+    static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
+        .name("acks")
+        .displayName("Delivery Guarantee")
+        .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
+        .required(true)
+        .expressionLanguageSupported(false)
+        .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
+        .defaultValue(DELIVERY_BEST_EFFORT.getValue())
+        .build();
+
+    static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder()
+        .name("max.block.ms")
+        .displayName("Max Metadata Wait Time")
+        .description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the "
+            + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
+        .required(true)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .defaultValue("5 sec")
+        .build();
+
+    static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder()
+        .name("ack.wait.time")
+        .displayName("Acknowledgment Wait Time")
+        .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. "
+            + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.")
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .required(true)
+        .defaultValue("5 secs")
+        .build();
+
+    static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder()
+        .name("max.request.size")
+        .displayName("Max Request Size")
+        .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
+        .required(true)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .defaultValue("1 MB")
+        .build();
+
+    static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder()
+        .name("partitioner.class")
+        .displayName("Partitioner class")
+        .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.")
+        .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
+        .defaultValue(RANDOM_PARTITIONING.getValue())
+        .required(false)
+        .build();
+
+    static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
+        .name("compression.type")
+        .displayName("Compression Type")
+        .description("This parameter allows you to specify the compression codec for all data generated by this producer.")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .allowableValues("none", "gzip", "snappy", "lz4")
+        .defaultValue("none")
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("FlowFiles for which all content was sent to Kafka.")
+        .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
+        .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES;
+    private static final Set<Relationship> RELATIONSHIPS;
+
+    private volatile PublisherPool publisherPool = null;
+
+    static {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(KafkaProcessorUtils.BOOTSTRAP_SERVERS);
+        properties.add(TOPIC);
+        properties.add(RECORD_READER);
+        properties.add(RECORD_WRITER);
+        properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
+        properties.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE);
+        properties.add(KafkaProcessorUtils.USER_PRINCIPAL);
+        properties.add(KafkaProcessorUtils.USER_KEYTAB);
+        properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
+        properties.add(DELIVERY_GUARANTEE);
+        properties.add(MESSAGE_KEY_FIELD);
+        properties.add(MAX_REQUEST_SIZE);
+        properties.add(ACK_WAIT_TIME);
+        properties.add(METADATA_WAIT_TIME);
+        properties.add(PARTITION_CLASS);
+        properties.add(COMPRESSION_CODEC);
+
+        PROPERTIES = Collections.unmodifiableList(properties);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+            .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
+            .name(propertyDescriptorName)
+            .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
+            .dynamic(true)
+            .build();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        return KafkaProcessorUtils.validateCommonProperties(validationContext);
+    }
+
+    private synchronized PublisherPool getPublisherPool(final ProcessContext context) {
+        PublisherPool pool = publisherPool;
+        if (pool != null) {
+            return pool;
+        }
+
+        return publisherPool = createPublisherPool(context);
+    }
+
+    protected PublisherPool createPublisherPool(final ProcessContext context) {
+        final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
+        final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
+
+        final Map<String, Object> kafkaProperties = new HashMap<>();
+        KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
+        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
+
+        return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis);
+    }
+
+    @OnStopped
+    public void closePool() {
+        if (publisherPool != null) {
+            publisherPool.close();
+        }
+
+        publisherPool = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 500));
+        if (flowFiles.isEmpty()) {
+            return;
+        }
+
+        final PublisherPool pool = getPublisherPool(context);
+        if (pool == null) {
+            context.yield();
+            return;
+        }
+
+        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
+        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+
+        final long startTime = System.nanoTime();
+        try (final PublisherLease lease = pool.obtainPublisher()) {
+            // Send each FlowFile to Kafka asynchronously.
+            for (final FlowFile flowFile : flowFiles) {
+                if (!isScheduled()) {
+                    // If stopped, re-queue FlowFile instead of sending it
+                    session.transfer(flowFile);
+                    continue;
+                }
+
+                final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+                final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
+
+                final RecordWriter writer;
+                try (final InputStream in = new BufferedInputStream(session.read(flowFile))) {
+                    writer = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class).createWriter(getLogger(), flowFile, in);
+                } catch (final Exception e) {
+                    getLogger().error("Failed to create a Record Writer for {}; routing to failure", new Object[] {flowFile, e});
+                    session.transfer(flowFile, REL_FAILURE);
+                    continue;
+                }
+
+                try {
+                    session.read(flowFile, new InputStreamCallback() {
+                        @Override
+                        public void process(final InputStream rawIn) throws IOException {
+                            try (final InputStream in = new BufferedInputStream(rawIn)) {
+                                final RecordReader reader = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class).createRecordReader(flowFile, in, getLogger());
+                                lease.publish(flowFile, reader, writer, messageKeyField, topic);
+                            } catch (final SchemaNotFoundException | MalformedRecordException e) {
+                                throw new ProcessException(e);
+                            }
+                        }
+                    });
+                } catch (final Exception e) {
+                    // The FlowFile will be obtained and the error logged below, when calling publishResult.getFailedFlowFiles()
+                    lease.getTracker().fail(flowFile, e);
+                    continue;
+                }
+            }
+
+            // Complete the send
+            final PublishResult publishResult = lease.complete();
+
+            // Transfer any successful FlowFiles.
+            final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
+            for (FlowFile success : publishResult.getSuccessfulFlowFiles()) {
+                final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
+
+                final int msgCount = publishResult.getSuccessfulMessageCount(success);
+                success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
+                session.adjustCounter("Messages Sent", msgCount, true);
+
+                final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
+                session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
+                session.transfer(success, REL_SUCCESS);
+            }
+
+            // Transfer any failures.
+            for (final FlowFile failure : publishResult.getFailedFlowFiles()) {
+                final int successCount = publishResult.getSuccessfulMessageCount(failure);
+                if (successCount > 0) {
+                    getLogger().error("Failed to send some messages for {} to Kafka, but {} messages were acknowledged by Kafka. Routing to failure due to {}",
+                        new Object[] {failure, successCount, publishResult.getReasonForFailure(failure)});
+                } else {
+                    getLogger().error("Failed to send all message for {} to Kafka; routing to failure due to {}",
+                        new Object[] {failure, publishResult.getReasonForFailure(failure)});
+                }
+
+                session.transfer(failure, REL_FAILURE);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index b67e8a8..f08f7a9 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -17,11 +17,14 @@
 
 package org.apache.nifi.processors.kafka.pubsub;
 
+import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
@@ -29,6 +32,10 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.stream.io.exception.TokenTooLargeException;
 import org.apache.nifi.stream.io.util.StreamDemarcator;
 
@@ -38,6 +45,7 @@ public class PublisherLease implements Closeable {
     private final int maxMessageSize;
     private final long maxAckWaitMillis;
     private volatile boolean poisoned = false;
+    private final AtomicLong messagesSent = new AtomicLong(0L);
 
     private InFlightMessageTracker tracker;
 
@@ -85,7 +93,42 @@ public class PublisherLease implements Closeable {
         }
     }
 
-    private void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
+    void publish(final FlowFile flowFile, final RecordReader reader, final RecordWriter writer, final String messageKeyField, final String topic) throws IOException {
+        if (tracker == null) {
+            tracker = new InFlightMessageTracker();
+        }
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+
+        Record record;
+        final RecordSet recordSet = reader.createRecordSet();
+
+        try {
+            while ((record = recordSet.next()) != null) {
+                baos.reset();
+                writer.write(record, baos);
+
+                final byte[] messageContent = baos.toByteArray();
+                final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
+                final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
+
+                publish(flowFile, messageKey, messageContent, topic, tracker);
+
+                if (tracker.isFailed(flowFile)) {
+                    // If we have a failure, don't try to send anything else.
+                    return;
+                }
+            }
+        } catch (final TokenTooLargeException ttle) {
+            tracker.fail(flowFile, ttle);
+        } catch (final Exception e) {
+            tracker.fail(flowFile, e);
+            poison();
+            throw e;
+        }
+    }
+
+    protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
         final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, messageKey, messageContent);
         producer.send(record, new Callback() {
             @Override
@@ -99,11 +142,17 @@ public class PublisherLease implements Closeable {
             }
         });
 
+        messagesSent.incrementAndGet();
         tracker.incrementSentCount(flowFile);
     }
 
+
     public PublishResult complete() {
         if (tracker == null) {
+            if (messagesSent.get() == 0L) {
+                return PublishResult.EMPTY;
+            }
+
             throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
         }
 
@@ -129,4 +178,8 @@ public class PublisherLease implements Closeable {
         producer.close(maxAckWaitMillis, TimeUnit.MILLISECONDS);
         tracker = null;
     }
+
+    public InFlightMessageTracker getTracker() {
+        return tracker;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index aa1d4e2..6da2282 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,4 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_10
-org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10
\ No newline at end of file
+org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_0_10
+org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10
+org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 4a5c4fb..cc524dd 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -16,18 +16,8 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import org.junit.Before;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -35,15 +25,22 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
 public class ConsumeKafkaTest {
 
-    Consumer<byte[], byte[]> mockConsumer = null;
     ConsumerLease mockLease = null;
     ConsumerPool mockConsumerPool = null;
 
     @Before
     public void setup() {
-        mockConsumer = mock(Consumer.class);
         mockLease = mock(ConsumerLease.class);
         mockConsumerPool = mock(ConsumerPool.class);
     }
@@ -106,7 +103,7 @@ public class ConsumeKafkaTest {
     public void validateGetAllMessages() throws Exception {
         String groupName = "validateGetAllMessages";
 
-        when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+        when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
         when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
         when(mockLease.commit()).thenReturn(Boolean.TRUE);
 
@@ -124,7 +121,7 @@ public class ConsumeKafkaTest {
         runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
         runner.run(1, false);
 
-        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
         verify(mockLease, times(3)).continuePolling();
         verify(mockLease, times(2)).poll();
         verify(mockLease, times(1)).commit();
@@ -137,7 +134,7 @@ public class ConsumeKafkaTest {
     public void validateGetAllMessagesPattern() throws Exception {
         String groupName = "validateGetAllMessagesPattern";
 
-        when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+        when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
         when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
         when(mockLease.commit()).thenReturn(Boolean.TRUE);
 
@@ -156,7 +153,7 @@ public class ConsumeKafkaTest {
         runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
         runner.run(1, false);
 
-        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
         verify(mockLease, times(3)).continuePolling();
         verify(mockLease, times(2)).poll();
         verify(mockLease, times(1)).commit();
@@ -169,7 +166,7 @@ public class ConsumeKafkaTest {
     public void validateGetErrorMessages() throws Exception {
         String groupName = "validateGetErrorMessages";
 
-        when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+        when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
         when(mockLease.continuePolling()).thenReturn(true, false);
         when(mockLease.commit()).thenReturn(Boolean.FALSE);
 
@@ -187,7 +184,7 @@ public class ConsumeKafkaTest {
         runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
         runner.run(1, false);
 
-        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
         verify(mockLease, times(2)).continuePolling();
         verify(mockLease, times(1)).poll();
         verify(mockLease, times(1)).commit();

http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 0ebf2b3..12a137e 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
@@ -36,6 +37,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -44,14 +47,16 @@ import static org.mockito.Mockito.when;
 
 public class ConsumerPoolTest {
 
-    Consumer<byte[], byte[]> consumer = null;
-    ProcessSession mockSession = null;
-    ProvenanceReporter mockReporter = null;
-    ConsumerPool testPool = null;
-    ConsumerPool testDemarcatedPool = null;
-    ComponentLog logger = null;
+    private Consumer<byte[], byte[]> consumer = null;
+    private ProcessSession mockSession = null;
+    private ProcessContext mockContext = Mockito.mock(ProcessContext.class);
+    private ProvenanceReporter mockReporter = null;
+    private ConsumerPool testPool = null;
+    private ConsumerPool testDemarcatedPool = null;
+    private ComponentLog logger = null;
 
     @Before
+    @SuppressWarnings("unchecked")
     public void setup() {
         consumer = mock(Consumer.class);
         logger = mock(ComponentLog.class);
@@ -94,16 +99,16 @@ public class ConsumerPoolTest {
     public void validatePoolSimpleCreateClose() throws Exception {
 
         when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
-        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
             lease.poll();
         }
-        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
             lease.poll();
         }
-        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
             lease.poll();
         }
-        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
             lease.poll();
         }
         testPool.close();
@@ -125,7 +130,7 @@ public class ConsumerPoolTest {
         final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
 
         when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
-        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
             lease.poll();
             lease.commit();
         }
@@ -142,7 +147,7 @@ public class ConsumerPoolTest {
     public void validatePoolSimpleBatchCreateClose() throws Exception {
         when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
         for (int i = 0; i < 100; i++) {
-            try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
                 for (int j = 0; j < 100; j++) {
                     lease.poll();
                 }
@@ -167,7 +172,7 @@ public class ConsumerPoolTest {
         final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
 
         when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
-        try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession)) {
+        try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession, mockContext)) {
             lease.poll();
             lease.commit();
         }
@@ -184,7 +189,7 @@ public class ConsumerPoolTest {
     public void validatePoolConsumerFails() throws Exception {
 
         when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
-        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
             try {
                 lease.poll();
                 fail();

http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java
new file mode 100644
index 0000000..da63877
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestConsumeKafkaRecord_0_10 {
+
+    private ConsumerLease mockLease = null;
+    private ConsumerPool mockConsumerPool = null;
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws InitializationException {
+        mockLease = mock(ConsumerLease.class);
+        mockConsumerPool = mock(ConsumerPool.class);
+
+        ConsumeKafkaRecord_0_10 proc = new ConsumeKafkaRecord_0_10() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+
+        runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+
+        final String readerId = "record-reader";
+        final MockRecordParser readerService = new MockRecordParser();
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+        runner.addControllerService(readerId, readerService);
+        runner.enableControllerService(readerService);
+
+        final String writerId = "record-writer";
+        final RecordSetWriterFactory writerService = new MockRecordWriter("name, age");
+        runner.addControllerService(writerId, writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(ConsumeKafkaRecord_0_10.RECORD_READER, readerId);
+        runner.setProperty(ConsumeKafkaRecord_0_10.RECORD_WRITER, writerId);
+    }
+
+    @Test
+    public void validateCustomValidatorSettings() throws Exception {
+        runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo");
+        runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST);
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        runner.assertValid();
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Foo");
+        runner.assertNotValid();
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        runner.assertValid();
+        runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        runner.assertValid();
+        runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void validatePropertiesValidation() throws Exception {
+        runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo");
+        runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST);
+
+        runner.removeProperty(ConsumeKafkaRecord_0_10.GROUP_ID);
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("invalid because Group ID is required"));
+        }
+
+        runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, "");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+        }
+
+        runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, "  ");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+        }
+    }
+
+    @Test
+    public void validateGetAllMessages() throws Exception {
+        String groupName = "validateGetAllMessages";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void validateGetAllMessagesPattern() throws Exception {
+        String groupName = "validateGetAllMessagesPattern";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "(fo.*)|(ba)");
+        runner.setProperty(ConsumeKafkaRecord_0_10.TOPIC_TYPE, "pattern");
+        runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void validateGetErrorMessages() throws Exception {
+        String groupName = "validateGetErrorMessages";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(true, false);
+        when(mockLease.commit()).thenReturn(Boolean.FALSE);
+
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+        verify(mockLease, times(2)).continuePolling();
+        verify(mockLease, times(1)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void testJaasConfiguration() throws Exception {
+        runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo");
+        runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST);
+
+        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka");
+        runner.assertValid();
+
+        runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File");
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
+        runner.assertValid();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
new file mode 100644
index 0000000..c1df792
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.RecordWriter;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestPublishKafkaRecord_0_10 {
+
+    private static final String TOPIC_NAME = "unit-test";
+
+    private PublisherPool mockPool;
+    private PublisherLease mockLease;
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws InitializationException, IOException {
+        mockPool = mock(PublisherPool.class);
+        mockLease = mock(PublisherLease.class);
+        Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), any(String.class), any(String.class));
+
+        when(mockPool.obtainPublisher()).thenReturn(mockLease);
+
+        runner = TestRunners.newTestRunner(new PublishKafkaRecord_0_10() {
+            @Override
+            protected PublisherPool createPublisherPool(final ProcessContext context) {
+                return mockPool;
+            }
+        });
+
+        runner.setProperty(PublishKafkaRecord_0_10.TOPIC, TOPIC_NAME);
+
+        final String readerId = "record-reader";
+        final MockRecordParser readerService = new MockRecordParser();
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+        runner.addControllerService(readerId, readerService);
+        runner.enableControllerService(readerService);
+
+        final String writerId = "record-writer";
+        final RecordSetWriterFactory writerService = new MockRecordWriter("name, age");
+        runner.addControllerService(writerId, writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(PublishKafkaRecord_0_10.RECORD_READER, readerId);
+        runner.setProperty(PublishKafkaRecord_0_10.RECORD_WRITER, writerId);
+    }
+
+    @Test
+    public void testSingleSuccess() throws IOException {
+        final MockFlowFile flowFile = runner.enqueue("John Doe, 48");
+
+        when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFile, 1));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleSuccess() throws IOException {
+        final Set<FlowFile> flowFiles = new HashSet<>();
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+
+
+        when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 1));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 3);
+
+        verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testSingleFailure() throws IOException {
+        final MockFlowFile flowFile = runner.enqueue("John Doe, 48");
+
+        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 1);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleFailures() throws IOException {
+        final Set<FlowFile> flowFiles = new HashSet<>();
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+
+        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 3);
+
+        verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleMessagesPerFlowFile() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 47"));
+        flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 29"));
+
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        msgCounts.put(flowFiles.get(0), 10);
+        msgCounts.put(flowFiles.get(1), 20);
+
+        final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap());
+
+        when(mockLease.complete()).thenReturn(result);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 2);
+
+        verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+
+        runner.assertAllFlowFilesContainAttribute("msg.count");
+        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_SUCCESS).stream()
+            .filter(ff -> ff.getAttribute("msg.count").equals("10"))
+            .count());
+        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_SUCCESS).stream()
+            .filter(ff -> ff.getAttribute("msg.count").equals("20"))
+            .count());
+    }
+
+
+    @Test
+    public void testSomeSuccessSomeFailure() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        msgCounts.put(flowFiles.get(0), 10);
+        msgCounts.put(flowFiles.get(1), 20);
+
+        final Map<FlowFile, Exception> failureMap = new HashMap<>();
+        failureMap.put(flowFiles.get(2), new RuntimeException("Intentional Unit Test Exception"));
+        failureMap.put(flowFiles.get(3), new RuntimeException("Intentional Unit Test Exception"));
+
+        final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles.subList(0, 2)), failureMap);
+
+        when(mockLease.complete()).thenReturn(result);
+
+        runner.run();
+        runner.assertTransferCount(PublishKafkaRecord_0_10.REL_SUCCESS, 2);
+        runner.assertTransferCount(PublishKafkaRecord_0_10.REL_FAILURE, 2);
+
+        verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+
+        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_SUCCESS).stream()
+            .filter(ff -> "10".equals(ff.getAttribute("msg.count")))
+            .count());
+        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_SUCCESS).stream()
+            .filter(ff -> "20".equals(ff.getAttribute("msg.count")))
+            .count());
+
+        assertTrue(runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_FAILURE).stream()
+            .noneMatch(ff -> ff.getAttribute("msg.count") != null));
+    }
+
+
+    private PublishResult createAllSuccessPublishResult(final FlowFile successfulFlowFile, final int msgCount) {
+        return createAllSuccessPublishResult(Collections.singleton(successfulFlowFile), msgCount);
+    }
+
+    private PublishResult createAllSuccessPublishResult(final Set<FlowFile> successfulFlowFiles, final int msgCountPerFlowFile) {
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        for (final FlowFile ff : successfulFlowFiles) {
+            msgCounts.put(ff, msgCountPerFlowFile);
+        }
+        return createPublishResult(msgCounts, successfulFlowFiles, Collections.emptyMap());
+    }
+
+    private PublishResult createFailurePublishResult(final FlowFile failure) {
+        return createFailurePublishResult(Collections.singleton(failure));
+    }
+
+    private PublishResult createFailurePublishResult(final Set<FlowFile> failures) {
+        final Map<FlowFile, Exception> failureMap = failures.stream().collect(Collectors.toMap(ff -> ff, ff -> new RuntimeException("Intentional Unit Test Exception")));
+        return createPublishResult(Collections.emptyMap(), Collections.emptySet(), failureMap);
+    }
+
+    private PublishResult createPublishResult(final Map<FlowFile, Integer> msgCounts, final Set<FlowFile> successFlowFiles, final Map<FlowFile, Exception> failures) {
+        // sanity check.
+        for (final FlowFile success : successFlowFiles) {
+            if (failures.containsKey(success)) {
+                throw new IllegalArgumentException("Found same FlowFile in both 'success' and 'failures' collections: " + success);
+            }
+        }
+
+        return new PublishResult() {
+            @Override
+            public Collection<FlowFile> getSuccessfulFlowFiles() {
+                return successFlowFiles;
+            }
+
+            @Override
+            public Collection<FlowFile> getFailedFlowFiles() {
+                return failures.keySet();
+            }
+
+            @Override
+            public int getSuccessfulMessageCount(FlowFile flowFile) {
+                Integer count = msgCounts.get(flowFile);
+                return count == null ? 0 : count.intValue();
+            }
+
+            @Override
+            public Exception getReasonForFailure(FlowFile flowFile) {
+                return failures.get(flowFile);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java
new file mode 100644
index 0000000..b47bece
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory {
+    private final List<Object[]> records = new ArrayList<>();
+    private final List<RecordField> fields = new ArrayList<>();
+    private final int failAfterN;
+
+    public MockRecordParser() {
+        this(-1);
+    }
+
+    public MockRecordParser(final int failAfterN) {
+        this.failAfterN = failAfterN;
+    }
+
+
+    public void addSchemaField(final String fieldName, final RecordFieldType type) {
+        fields.add(new RecordField(fieldName, type.getDataType()));
+    }
+
+    public void addRecord(Object... values) {
+        records.add(values);
+    }
+
+    @Override
+    public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
+        final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+
+        return new RecordReader() {
+            private int recordCount = 0;
+
+            @Override
+            public void close() throws IOException {
+            }
+
+            @Override
+            public Record nextRecord() throws IOException, MalformedRecordException {
+                if (failAfterN >= recordCount) {
+                    throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read");
+                }
+                final String line = reader.readLine();
+                if (line == null) {
+                    return null;
+                }
+
+                recordCount++;
+
+                final String[] values = line.split(",");
+                final Map<String, Object> valueMap = new HashMap<>();
+                int i = 0;
+                for (final RecordField field : fields) {
+                    final String fieldName = field.getFieldName();
+                    valueMap.put(fieldName, values[i++].trim());
+                }
+
+                return new MapRecord(new SimpleRecordSchema(fields), valueMap);
+            }
+
+            @Override
+            public RecordSchema getSchema() {
+                return new SimpleRecordSchema(fields);
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
new file mode 100644
index 0000000..22d7249
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
+    private final String header;
+    private final int failAfterN;
+    private final boolean quoteValues;
+
+    public MockRecordWriter(final String header) {
+        this(header, true, -1);
+    }
+
+    public MockRecordWriter(final String header, final boolean quoteValues) {
+        this(header, quoteValues, -1);
+    }
+
+    public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN) {
+        this.header = header;
+        this.quoteValues = quoteValues;
+        this.failAfterN = failAfterN;
+    }
+
+    @Override
+    public RecordSetWriter createWriter(final ComponentLog logger, final FlowFile flowFile, final InputStream in) {
+        return new RecordSetWriter() {
+            @Override
+            public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
+                out.write(header.getBytes());
+                out.write("\n".getBytes());
+
+                int recordCount = 0;
+                final int numCols = rs.getSchema().getFieldCount();
+                Record record = null;
+                while ((record = rs.next()) != null) {
+                    if (++recordCount > failAfterN && failAfterN > -1) {
+                        throw new IOException("Unit Test intentionally throwing IOException after " + failAfterN + " records were written");
+                    }
+
+                    int i = 0;
+                    for (final String fieldName : record.getSchema().getFieldNames()) {
+                        final String val = record.getAsString(fieldName);
+                        if (quoteValues) {
+                            out.write("\"".getBytes());
+                            if (val != null) {
+                                out.write(val.getBytes());
+                            }
+                            out.write("\"".getBytes());
+                        } else if (val != null) {
+                            out.write(val.getBytes());
+                        }
+
+                        if (i++ < numCols - 1) {
+                            out.write(",".getBytes());
+                        }
+                    }
+                    out.write("\n".getBytes());
+                }
+
+                return WriteResult.of(recordCount, Collections.emptyMap());
+            }
+
+            @Override
+            public String getMimeType() {
+                return "text/plain";
+            }
+
+            @Override
+            public WriteResult write(Record record, OutputStream out) throws IOException {
+                return null;
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
index 8fcb016..f48d0f5 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
@@ -23,6 +23,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 
 import org.apache.avro.LogicalType;
@@ -53,6 +55,7 @@ import org.apache.nifi.serialization.record.SchemaIdentifier;
 public class AvroSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
     private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
     private final Map<String, String> schemaNameToSchemaMap;
+    private final ConcurrentMap<String, RecordSchema> recordSchemas = new ConcurrentHashMap<>();
 
     private static final String LOGICAL_TYPE_DATE = "date";
     private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis";
@@ -65,6 +68,21 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch
     }
 
     @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+        if (newValue == null) {
+            recordSchemas.remove(descriptor.getName());
+        } else {
+            try {
+                final Schema avroSchema = new Schema.Parser().parse(newValue);
+                final RecordSchema recordSchema = createRecordSchema(avroSchema, newValue, descriptor.getName());
+                recordSchemas.put(descriptor.getName(), recordSchema);
+            } catch (final Exception e) {
+                // not a problem - the service won't be valid and the validation message will indicate what is wrong.
+            }
+        }
+    }
+
+    @Override
     public String retrieveSchemaText(final String schemaName) throws SchemaNotFoundException {
         final String schemaText = schemaNameToSchemaMap.get(schemaName);
         if (schemaText == null) {
@@ -76,9 +94,11 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch
 
     @Override
     public RecordSchema retrieveSchema(final String schemaName) throws SchemaNotFoundException {
-        final String schemaText = retrieveSchemaText(schemaName);
-        final Schema schema = new Schema.Parser().parse(schemaText);
-        return createRecordSchema(schema, schemaText, schemaName);
+        final RecordSchema recordSchema = recordSchemas.get(schemaName);
+        if (recordSchema == null) {
+            throw new SchemaNotFoundException("Unable to find schema with name '" + schemaName + "'");
+        }
+        return recordSchema;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
index a63097a..9121f04 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
@@ -21,15 +21,11 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -69,44 +65,4 @@ public class TestAvroSchemaRegistry {
 
         delegate.close();
     }
-
-
-    @Test
-    public void validateRecordSchemaRetrieval() throws Exception {
-        String schemaName = "fooSchema";
-        ConfigurationContext configContext = mock(ConfigurationContext.class);
-        Map<PropertyDescriptor, String> properties = new HashMap<>();
-        PropertyDescriptor fooSchema = new PropertyDescriptor.Builder()
-            .name(schemaName)
-            .dynamic(true)
-            .build();
-        String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", "
-            + "\"fields\": [ " + "{\"name\": \"name\", \"type\": [\"string\", \"null\"]}, "
-            + "{\"name\": \"favorite_number\",  \"type\": \"int\"}, "
-            + "{\"name\": \"foo\",  \"type\": \"boolean\"}, "
-            + "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}";
-        PropertyDescriptor barSchema = new PropertyDescriptor.Builder()
-            .name("barSchema")
-            .dynamic(false)
-            .build();
-        properties.put(fooSchema, fooSchemaText);
-        properties.put(barSchema, "");
-        when(configContext.getProperties()).thenReturn(properties);
-        AvroSchemaRegistry delegate = new AvroSchemaRegistry();
-        delegate.enable(configContext);
-
-        RecordSchema locatedSchema = delegate.retrieveSchema(schemaName);
-        List<RecordField> recordFields = locatedSchema.getFields();
-        assertEquals(4, recordFields.size());
-        assertEquals(RecordFieldType.STRING.getDataType(), recordFields.get(0).getDataType());
-        assertEquals("name", recordFields.get(0).getFieldName());
-        assertEquals(RecordFieldType.INT.getDataType(), recordFields.get(1).getDataType());
-        assertEquals("favorite_number", recordFields.get(1).getFieldName());
-        assertEquals(RecordFieldType.BOOLEAN.getDataType(), recordFields.get(2).getDataType());
-        assertEquals("foo", recordFields.get(2).getFieldName());
-        assertEquals(RecordFieldType.STRING.getDataType(), recordFields.get(3).getDataType());
-        assertEquals("favorite_color", recordFields.get(3).getFieldName());
-        delegate.close();
-    }
-
 }


Mime
View raw message