nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [3/6] nifi git commit: NIFI-4600: This closes #2312. Added nifi-kafka-1-0-nar and nifi-kafka-1-0-processors modules
Date Mon, 04 Dec 2017 21:52:45 GMT
http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java
new file mode 100644
index 0000000..6f17bd5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import javax.xml.bind.DatatypeConverter;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.FlowFileFilters;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "1.0"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka using the Kafka 1.0 Producer API."
+    + "The messages to send may be individual FlowFiles or may be delimited, using a "
+    + "user-specified delimiter, such as a new-line. "
+    + "The complementary NiFi processor for fetching messages is ConsumeKafka_1_0.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
+    description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+        + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+        + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
+@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
+    + "FlowFiles that are routed to success. If the <Message Demarcator> Property is not set, this will always be 1, but if the Property is set, it may "
+    + "be greater than 1.")
+public class PublishKafka_1_0 extends AbstractProcessor {
+    protected static final String MSG_COUNT = "msg.count";
+
+    static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
+        "FlowFile will be routed to failure unless the message is replicated to the appropriate "
+            + "number of Kafka Nodes according to the Topic configuration");
+    static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery",
+        "FlowFile will be routed to success if the message is received by a single Kafka node, "
+            + "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> "
+            + "but can result in data loss if a Kafka node crashes");
+    static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort",
+        "FlowFile will be routed to success after successfully writing the content to a Kafka node, "
+            + "without waiting for a response. This provides the best performance but may result in data loss.");
+
+    static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(Partitioners.RoundRobinPartitioner.class.getName(),
+        Partitioners.RoundRobinPartitioner.class.getSimpleName(),
+        "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, "
+            + "the next Partition to Partition 2, and so on, wrapping as necessary.");
+    static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
+        "DefaultPartitioner", "Messages will be assigned to random partitions.");
+
+    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
+    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
+        "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
+
+    static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
+        .name("topic")
+        .displayName("Topic Name")
+        .description("The name of the Kafka Topic to publish to.")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
+
+    static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
+        .name(ProducerConfig.ACKS_CONFIG)
+        .displayName("Delivery Guarantee")
+        .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
+        .required(true)
+        .expressionLanguageSupported(false)
+        .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
+        .defaultValue(DELIVERY_BEST_EFFORT.getValue())
+        .build();
+
+    static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder()
+        .name(ProducerConfig.MAX_BLOCK_MS_CONFIG)
+        .displayName("Max Metadata Wait Time")
+        .description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the "
+            + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
+        .required(true)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .defaultValue("5 sec")
+        .build();
+
+    static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder()
+        .name("ack.wait.time")
+        .displayName("Acknowledgment Wait Time")
+        .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. "
+            + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.")
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .required(true)
+        .defaultValue("5 secs")
+        .build();
+
+    static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder()
+        .name("max.request.size")
+        .displayName("Max Request Size")
+        .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
+        .required(true)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .defaultValue("1 MB")
+        .build();
+
+    static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
+        .name("kafka-key")
+        .displayName("Kafka Key")
+        .description("The Key to use for the Message. "
+            + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present "
+            + "and we're not demarcating.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
+
+    static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
+        .name("key-attribute-encoding")
+        .displayName("Key Attribute Encoding")
+        .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
+        .required(true)
+        .defaultValue(UTF8_ENCODING.getValue())
+        .allowableValues(UTF8_ENCODING, HEX_ENCODING)
+        .build();
+
+    static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
+        .name("message-demarcator")
+        .displayName("Message Demarcator")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within "
+            + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the "
+            + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. "
+            + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.")
+        .build();
+
+    static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder()
+        .name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
+        .displayName("Partitioner class")
+        .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.")
+        .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
+        .defaultValue(RANDOM_PARTITIONING.getValue())
+        .required(false)
+        .build();
+
+    static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
+        .name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
+        .displayName("Compression Type")
+        .description("This parameter allows you to specify the compression codec for all data generated by this producer.")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .allowableValues("none", "gzip", "snappy", "lz4")
+        .defaultValue("none")
+        .build();
+
+    static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder()
+        .name("attribute-name-regex")
+        .displayName("Attributes to Send as Headers (Regex)")
+        .description("A Regular Expression that is matched against all FlowFile attribute names. "
+            + "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. "
+            + "If not specified, no FlowFile attributes will be added as headers.")
+        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .required(false)
+        .build();
+    static final PropertyDescriptor USE_TRANSACTIONS = new PropertyDescriptor.Builder()
+        .name("use-transactions")
+        .displayName("Use Transactions")
+        .description("Specifies whether or not NiFi should provide Transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, "
+            + "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. "
+            + "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true "
+            + "requires that the <Delivery Guarantee> property be set to \"Guarantee Replicated Delivery.\"")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .required(true)
+        .build();
+    static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder()
+        .name("message-header-encoding")
+        .displayName("Message Header Encoding")
+        .description("For any attribute that is added as a message header, as configured via the <Attributes to Send as Headers> property, "
+            + "this property indicates the Character Encoding to use for serializing the headers.")
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .defaultValue("UTF-8")
+        .required(false)
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("FlowFiles for which all content was sent to Kafka.")
+        .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
+        .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES;
+    private static final Set<Relationship> RELATIONSHIPS;
+
+    private volatile PublisherPool publisherPool = null;
+
+    static {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
+        properties.add(TOPIC);
+        properties.add(DELIVERY_GUARANTEE);
+        properties.add(USE_TRANSACTIONS);
+        properties.add(ATTRIBUTE_NAME_REGEX);
+        properties.add(MESSAGE_HEADER_ENCODING);
+        properties.add(KEY);
+        properties.add(KEY_ATTRIBUTE_ENCODING);
+        properties.add(MESSAGE_DEMARCATOR);
+        properties.add(MAX_REQUEST_SIZE);
+        properties.add(ACK_WAIT_TIME);
+        properties.add(METADATA_WAIT_TIME);
+        properties.add(PARTITION_CLASS);
+        properties.add(COMPRESSION_CODEC);
+
+        PROPERTIES = Collections.unmodifiableList(properties);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+            .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
+            .name(propertyDescriptorName)
+            .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
+            .dynamic(true)
+            .build();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>();
+        results.addAll(KafkaProcessorUtils.validateCommonProperties(validationContext));
+
+        final boolean useTransactions = validationContext.getProperty(USE_TRANSACTIONS).asBoolean();
+        if (useTransactions) {
+            final String deliveryGuarantee = validationContext.getProperty(DELIVERY_GUARANTEE).getValue();
+            if (!DELIVERY_REPLICATED.getValue().equals(deliveryGuarantee)) {
+                results.add(new ValidationResult.Builder()
+                    .subject("Delivery Guarantee")
+                    .valid(false)
+                    .explanation("In order to use Transactions, the Delivery Guarantee must be \"Guarantee Replicated Delivery.\" "
+                        + "Either change the <Use Transactions> property or the <Delivery Guarantee> property.")
+                    .build());
+            }
+        }
+
+        return results;
+    }
+
+    private synchronized PublisherPool getPublisherPool(final ProcessContext context) {
+        PublisherPool pool = publisherPool;
+        if (pool != null) {
+            return pool;
+        }
+
+        return publisherPool = createPublisherPool(context);
+    }
+
+    protected PublisherPool createPublisherPool(final ProcessContext context) {
+        final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
+        final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
+
+        final String attributeNameRegex = context.getProperty(ATTRIBUTE_NAME_REGEX).getValue();
+        final Pattern attributeNamePattern = attributeNameRegex == null ? null : Pattern.compile(attributeNameRegex);
+        final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
+
+        final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
+        final Charset charset = Charset.forName(charsetName);
+
+        final Map<String, Object> kafkaProperties = new HashMap<>();
+        KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
+        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
+
+        return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis, useTransactions, attributeNamePattern, charset);
+    }
+
+    @OnStopped
+    public void closePool() {
+        if (publisherPool != null) {
+            publisherPool.close();
+        }
+
+        publisherPool = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final boolean useDemarcator = context.getProperty(MESSAGE_DEMARCATOR).isSet();
+
+        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 500));
+        if (flowFiles.isEmpty()) {
+            return;
+        }
+
+        final PublisherPool pool = getPublisherPool(context);
+        if (pool == null) {
+            context.yield();
+            return;
+        }
+
+        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
+        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+        final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
+
+        final long startTime = System.nanoTime();
+        try (final PublisherLease lease = pool.obtainPublisher()) {
+            if (useTransactions) {
+                lease.beginTransaction();
+            }
+
+            // Send each FlowFile to Kafka asynchronously.
+            for (final FlowFile flowFile : flowFiles) {
+                if (!isScheduled()) {
+                    // If stopped, re-queue FlowFile instead of sending it
+                    if (useTransactions) {
+                        session.rollback();
+                        lease.rollback();
+                        return;
+                    }
+
+                    session.transfer(flowFile);
+                    continue;
+                }
+
+                final byte[] messageKey = getMessageKey(flowFile, context);
+                final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+                final byte[] demarcatorBytes;
+                if (useDemarcator) {
+                    demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8);
+                } else {
+                    demarcatorBytes = null;
+                }
+
+                session.read(flowFile, new InputStreamCallback() {
+                    @Override
+                    public void process(final InputStream rawIn) throws IOException {
+                        try (final InputStream in = new BufferedInputStream(rawIn)) {
+                            lease.publish(flowFile, in, messageKey, demarcatorBytes, topic);
+                        }
+                    }
+                });
+            }
+
+            // Complete the send
+            final PublishResult publishResult = lease.complete();
+
+            if (publishResult.isFailure()) {
+                getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
+                session.transfer(flowFiles, REL_FAILURE);
+                return;
+            }
+
+            // Transfer any successful FlowFiles.
+            final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
+            for (FlowFile success : flowFiles) {
+                final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
+
+                final int msgCount = publishResult.getSuccessfulMessageCount(success);
+                success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
+                session.adjustCounter("Messages Sent", msgCount, true);
+
+                final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
+                session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
+                session.transfer(success, REL_SUCCESS);
+            }
+        }
+    }
+
+
+    private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) {
+        if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
+            return null;
+        }
+
+        final String uninterpretedKey;
+        if (context.getProperty(KEY).isSet()) {
+            uninterpretedKey = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+        } else {
+            uninterpretedKey = flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY);
+        }
+
+        if (uninterpretedKey == null) {
+            return null;
+        }
+
+        final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
+        if (UTF8_ENCODING.getValue().equals(keyEncoding)) {
+            return uninterpretedKey.getBytes(StandardCharsets.UTF_8);
+        }
+
+        return DatatypeConverter.parseHexBinary(uninterpretedKey);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java
new file mode 100644
index 0000000..1f7c3ab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+public interface PublishResult {
+
+    boolean isFailure();
+
+    int getSuccessfulMessageCount(FlowFile flowFile);
+
+    Exception getReasonForFailure(FlowFile flowFile);
+
+    public static PublishResult EMPTY = new PublishResult() {
+        @Override
+        public boolean isFailure() {
+            return false;
+        }
+
+        @Override
+        public int getSuccessfulMessageCount(FlowFile flowFile) {
+            return 0;
+        }
+
+        @Override
+        public Exception getReasonForFailure(FlowFile flowFile) {
+            return null;
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
new file mode 100644
index 0000000..abcd15f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.Headers;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.exception.TokenTooLargeException;
+import org.apache.nifi.stream.io.util.StreamDemarcator;
+
+public class PublisherLease implements Closeable {
+    private final ComponentLog logger;
+    private final Producer<byte[], byte[]> producer;
+    private final int maxMessageSize;
+    private final long maxAckWaitMillis;
+    private final boolean useTransactions;
+    private final Pattern attributeNameRegex;
+    private final Charset headerCharacterSet;
+    private volatile boolean poisoned = false;
+    private final AtomicLong messagesSent = new AtomicLong(0L);
+
+    private volatile boolean transactionsInitialized = false;
+    private volatile boolean activeTransaction = false;
+
+    private InFlightMessageTracker tracker;
+
+    public PublisherLease(final Producer<byte[], byte[]> producer, final int maxMessageSize, final long maxAckWaitMillis, final ComponentLog logger,
+        final boolean useTransactions, final Pattern attributeNameRegex, final Charset headerCharacterSet) {
+        this.producer = producer;
+        this.maxMessageSize = maxMessageSize;
+        this.logger = logger;
+        this.maxAckWaitMillis = maxAckWaitMillis;
+        this.useTransactions = useTransactions;
+        this.attributeNameRegex = attributeNameRegex;
+        this.headerCharacterSet = headerCharacterSet;
+    }
+
+    protected void poison() {
+        this.poisoned = true;
+    }
+
+    public boolean isPoisoned() {
+        return poisoned;
+    }
+
+    void beginTransaction() {
+        if (!useTransactions) {
+            return;
+        }
+
+        if (!transactionsInitialized) {
+            producer.initTransactions();
+            transactionsInitialized = true;
+        }
+
+        producer.beginTransaction();
+        activeTransaction = true;
+    }
+
+    void rollback() {
+        if (!useTransactions || !activeTransaction) {
+            return;
+        }
+
+        producer.abortTransaction();
+        activeTransaction = false;
+    }
+
+    void fail(final FlowFile flowFile, final Exception cause) {
+        getTracker().fail(flowFile, cause);
+        rollback();
+    }
+
+    void publish(final FlowFile flowFile, final InputStream flowFileContent, final byte[] messageKey, final byte[] demarcatorBytes, final String topic) throws IOException {
+        if (tracker == null) {
+            tracker = new InFlightMessageTracker(logger);
+        }
+
+        try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
+            byte[] messageContent;
+            try {
+                while ((messageContent = demarcator.nextToken()) != null) {
+                    // We do not want to use any key if we have a demarcator because that would result in
+                    // the key being the same for multiple messages
+                    final byte[] keyToUse = demarcatorBytes == null ? messageKey : null;
+                    publish(flowFile, keyToUse, messageContent, topic, tracker);
+
+                    if (tracker.isFailed(flowFile)) {
+                        // If we have a failure, don't try to send anything else.
+                        return;
+                    }
+                }
+            } catch (final TokenTooLargeException ttle) {
+                tracker.fail(flowFile, ttle);
+            }
+        } catch (final Exception e) {
+            tracker.fail(flowFile, e);
+            poison();
+            throw e;
+        }
+    }
+
+    void publish(final FlowFile flowFile, final RecordSet recordSet, final RecordSetWriterFactory writerFactory, final RecordSchema schema,
+        final String messageKeyField, final String topic) throws IOException {
+        if (tracker == null) {
+            tracker = new InFlightMessageTracker(logger);
+        }
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+
+        Record record;
+        int recordCount = 0;
+
+        try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
+            while ((record = recordSet.next()) != null) {
+                recordCount++;
+                baos.reset();
+
+                writer.write(record);
+                writer.flush();
+
+                final byte[] messageContent = baos.toByteArray();
+                final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
+                final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
+
+                publish(flowFile, messageKey, messageContent, topic, tracker);
+
+                if (tracker.isFailed(flowFile)) {
+                    // If we have a failure, don't try to send anything else.
+                    return;
+                }
+            }
+
+            if (recordCount == 0) {
+                tracker.trackEmpty(flowFile);
+            }
+        } catch (final TokenTooLargeException ttle) {
+            tracker.fail(flowFile, ttle);
+        } catch (final SchemaNotFoundException snfe) {
+            throw new IOException(snfe);
+        } catch (final Exception e) {
+            tracker.fail(flowFile, e);
+            poison();
+            throw e;
+        }
+    }
+
+    private void addHeaders(final FlowFile flowFile, final ProducerRecord<?, ?> record) {
+        if (attributeNameRegex == null) {
+            return;
+        }
+
+        final Headers headers = record.headers();
+        for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) {
+            if (attributeNameRegex.matcher(entry.getKey()).matches()) {
+                headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet));
+            }
+        }
+    }
+
+    protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
+        final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, messageKey, messageContent);
+        addHeaders(flowFile, record);
+
+        producer.send(record, new Callback() {
+            @Override
+            public void onCompletion(final RecordMetadata metadata, final Exception exception) {
+                if (exception == null) {
+                    tracker.incrementAcknowledgedCount(flowFile);
+                } else {
+                    tracker.fail(flowFile, exception);
+                    poison();
+                }
+            }
+        });
+
+        messagesSent.incrementAndGet();
+        tracker.incrementSentCount(flowFile);
+    }
+
+
+    public PublishResult complete() {
+        if (tracker == null) {
+            if (messagesSent.get() == 0L) {
+                return PublishResult.EMPTY;
+            }
+
+            rollback();
+            throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
+        }
+
+        producer.flush();
+
+        if (activeTransaction) {
+            producer.commitTransaction();
+            activeTransaction = false;
+        }
+
+        try {
+            tracker.awaitCompletion(maxAckWaitMillis);
+            return tracker.createPublishResult();
+        } catch (final InterruptedException e) {
+            logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
+            Thread.currentThread().interrupt();
+            return tracker.failOutstanding(e);
+        } catch (final TimeoutException e) {
+            logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
+            return tracker.failOutstanding(e);
+        } finally {
+            tracker = null;
+        }
+    }
+
+    @Override
+    public void close() {
+        producer.close(maxAckWaitMillis, TimeUnit.MILLISECONDS);
+        tracker = null;
+    }
+
+    public InFlightMessageTracker getTracker() {
+        if (tracker == null) {
+            tracker = new InFlightMessageTracker(logger);
+        }
+
+        return tracker;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
new file mode 100644
index 0000000..d5caa8d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.Closeable;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.nifi.logging.ComponentLog;
+
+public class PublisherPool implements Closeable {
+    private final ComponentLog logger;
+    private final BlockingQueue<PublisherLease> publisherQueue;
+    private final Map<String, Object> kafkaProperties;
+    private final int maxMessageSize;
+    private final long maxAckWaitMillis;
+    private final boolean useTransactions;
+    private final Pattern attributeNameRegex;
+    private final Charset headerCharacterSet;
+
+    private volatile boolean closed = false;
+
+    PublisherPool(final Map<String, Object> kafkaProperties, final ComponentLog logger, final int maxMessageSize, final long maxAckWaitMillis,
+        final boolean useTransactions, final Pattern attributeNameRegex, final Charset headerCharacterSet) {
+        this.logger = logger;
+        this.publisherQueue = new LinkedBlockingQueue<>();
+        this.kafkaProperties = kafkaProperties;
+        this.maxMessageSize = maxMessageSize;
+        this.maxAckWaitMillis = maxAckWaitMillis;
+        this.useTransactions = useTransactions;
+        this.attributeNameRegex = attributeNameRegex;
+        this.headerCharacterSet = headerCharacterSet;
+    }
+
+    public PublisherLease obtainPublisher() {
+        if (isClosed()) {
+            throw new IllegalStateException("Connection Pool is closed");
+        }
+
+        PublisherLease lease = publisherQueue.poll();
+        if (lease != null) {
+            return lease;
+        }
+
+        lease = createLease();
+        return lease;
+    }
+
+    private PublisherLease createLease() {
+        final Map<String, Object> properties = new HashMap<>(kafkaProperties);
+        if (useTransactions) {
+            properties.put("transactional.id", UUID.randomUUID().toString());
+        }
+
+        final Producer<byte[], byte[]> producer = new KafkaProducer<>(properties);
+
+        final PublisherLease lease = new PublisherLease(producer, maxMessageSize, maxAckWaitMillis, logger, useTransactions, attributeNameRegex, headerCharacterSet) {
+            @Override
+            public void close() {
+                if (isPoisoned() || isClosed()) {
+                    super.close();
+                } else {
+                    publisherQueue.offer(this);
+                }
+            }
+        };
+
+        return lease;
+    }
+
+    public synchronized boolean isClosed() {
+        return closed;
+    }
+
+    @Override
+    public synchronized void close() {
+        closed = true;
+
+        PublisherLease lease;
+        while ((lease = publisherQueue.poll()) != null) {
+            lease.close();
+        }
+    }
+
+    /**
+     * Returns the number of leases that are currently available
+     *
+     * @return the number of leases currently available
+     */
+    protected int available() {
+        return publisherQueue.size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..ea9d84d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.kafka.pubsub.PublishKafka_1_0
+org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0
+org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_1_0
+org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0/additionalDetails.html
new file mode 100644
index 0000000..1fd6449
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0/additionalDetails.html
@@ -0,0 +1,143 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>ConsumeKafka</title>
+        <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+    </head>
+
+    <body>
+        <h2>Description</h2>
+        <p>
+            This Processor polls <a href="http://kafka.apache.org/">Apache Kafka</a>
+            for data using KafkaConsumer API available with Kafka 1.0. When a message is received 
+            from Kafka, the message will be deserialized using the configured Record Reader, and then
+            written to a FlowFile by serializing the message with the configured Record Writer.
+        </p>
+
+
+        <h2>Security Configuration:</h2>
+        <p>
+            The Security Protocol property allows the user to specify the protocol for communicating
+            with the Kafka broker. The following sections describe each of the protocols in further detail.
+        </p>
+        <h3>PLAINTEXT</h3>
+        <p>
+            This option provides an unsecured connection to the broker, with no client authentication and no encryption.
+            In order to use this option the broker must be configured with a listener of the form:
+        <pre>
+    PLAINTEXT://host.name:port
+            </pre>
+        </p>
+        <h3>SSL</h3>
+        <p>
+            This option provides an encrypted connection to the broker, with optional client authentication. In order
+            to use this option the broker must be configured with a listener of the form:
+        <pre>
+    SSL://host.name:port
+            </pre>
+        In addition, the processor must have an SSL Context Service selected.
+        </p>
+        <p>
+            If the broker specifies ssl.client.auth=none, or does not specify ssl.client.auth, then the client will
+            not be required to present a certificate. In this case, the SSL Context Service selected may specify only
+            a truststore containing the public key of the certificate authority used to sign the broker's key.
+        </p>
+        <p>
+            If the broker specifies ssl.client.auth=required then the client will be required to present a certificate.
+            In this case, the SSL Context Service must also specify a keystore containing a client key, in addition to
+            a truststore as described above.
+        </p>
+        <h3>SASL_PLAINTEXT</h3>
+        <p>
+            This option uses SASL with a PLAINTEXT transport layer to authenticate to the broker. In order to use this
+            option the broker must be configured with a listener of the form:
+        <pre>
+    SASL_PLAINTEXT://host.name:port
+            </pre>
+        In addition, the Kerberos Service Name must be specified in the processor.
+        </p>
+        <h4>SASL_PLAINTEXT - GSSAPI</h4>
+        <p>
+            If the SASL mechanism is GSSAPI, then the client must provide a JAAS configuration to authenticate. The
+            JAAS configuration can be provided by specifying the java.security.auth.login.config system property in
+            NiFi's bootstrap.conf, such as:
+        <pre>
+    java.arg.16=-Djava.security.auth.login.config=/path/to/kafka_client_jaas.conf
+            </pre>
+        </p>
+        <p>
+            An example of the JAAS config file would be the following:
+        <pre>
+    KafkaClient {
+        com.sun.security.auth.module.Krb5LoginModule required
+        useKeyTab=true
+        storeKey=true
+        keyTab="/path/to/nifi.keytab"
+        serviceName="kafka"
+        principal="nifi@YOURREALM.COM";
+    };
+            </pre>
+        <b>NOTE:</b> The serviceName in the JAAS file must match the Kerberos Service Name in the processor.
+        </p>
+        <p>
+            Alternatively, the JAAS
+            configuration when using GSSAPI can be provided by specifying the Kerberos Principal and Kerberos Keytab
+            directly in the processor properties. This will dynamically create a JAAS configuration like above, and
+            will take precedence over the java.security.auth.login.config system property.
+        </p>
+        <h4>SASL_PLAINTEXT - PLAIN</h4>
+        <p>
+            If the SASL mechanism is PLAIN, then client must provide a JAAS configuration to authenticate, but
+            the JAAS configuration must use Kafka's PlainLoginModule. An example of the JAAS config file would
+            be the following:
+        <pre>
+    KafkaClient {
+      org.apache.kafka.common.security.plain.PlainLoginModule required
+      username="nifi"
+      password="nifi-password";
+    };
+            </pre>
+        </p>
+        <p>
+            <b>NOTE:</b> It is not recommended to use a SASL mechanism of PLAIN with SASL_PLAINTEXT, as it would transmit
+            the username and password unencrypted.
+        </p>
+        <p>
+            <b>NOTE:</b> Using the PlainLoginModule will cause it be registered in the JVM's static list of Providers, making
+            it visible to components in other NARs that may access the providers. There is currently a known issue
+            where Kafka processors using the PlainLoginModule will cause HDFS processors with Keberos to no longer work.
+        </p>
+        <h3>SASL_SSL</h3>
+        <p>
+            This option uses SASL with an SSL/TLS transport layer to authenticate to the broker. In order to use this
+            option the broker must be configured with a listener of the form:
+        <pre>
+    SASL_SSL://host.name:port
+            </pre>
+        </p>
+        <p>
+            See the SASL_PLAINTEXT section for a description of how to provide the proper JAAS configuration
+            depending on the SASL mechanism (GSSAPI or PLAIN).
+        </p>
+        <p>
+            See the SSL section for a description of how to configure the SSL Context Service based on the
+            ssl.client.auth property.
+        </p>
+
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_1_0/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_1_0/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_1_0/additionalDetails.html
new file mode 100644
index 0000000..f206b0b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_1_0/additionalDetails.html
@@ -0,0 +1,143 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>ConsumeKafka</title>
+        <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+    </head>
+
+    <body>
+        <h2>Description</h2>
+        <p>
+            This Processor polls <a href="http://kafka.apache.org/">Apache Kafka</a>
+            for data using KafkaConsumer API available with Kafka 1.0. When a message is received 
+            from Kafka, this Processor emits a FlowFile where the content of the FlowFile is the value 
+            of the Kafka message.
+        </p>
+
+
+        <h2>Security Configuration</h2>
+        <p>
+            The Security Protocol property allows the user to specify the protocol for communicating
+            with the Kafka broker. The following sections describe each of the protocols in further detail.
+        </p>
+        <h3>PLAINTEXT</h3>
+        <p>
+            This option provides an unsecured connection to the broker, with no client authentication and no encryption.
+            In order to use this option the broker must be configured with a listener of the form:
+        <pre>
+    PLAINTEXT://host.name:port
+            </pre>
+        </p>
+        <h3>SSL</h3>
+        <p>
+            This option provides an encrypted connection to the broker, with optional client authentication. In order
+            to use this option the broker must be configured with a listener of the form:
+        <pre>
+    SSL://host.name:port
+            </pre>
+        In addition, the processor must have an SSL Context Service selected.
+        </p>
+        <p>
+            If the broker specifies ssl.client.auth=none, or does not specify ssl.client.auth, then the client will
+            not be required to present a certificate. In this case, the SSL Context Service selected may specify only
+            a truststore containing the public key of the certificate authority used to sign the broker's key.
+        </p>
+        <p>
+            If the broker specifies ssl.client.auth=required then the client will be required to present a certificate.
+            In this case, the SSL Context Service must also specify a keystore containing a client key, in addition to
+            a truststore as described above.
+        </p>
+        <h3>SASL_PLAINTEXT</h3>
+        <p>
+            This option uses SASL with a PLAINTEXT transport layer to authenticate to the broker. In order to use this
+            option the broker must be configured with a listener of the form:
+        <pre>
+    SASL_PLAINTEXT://host.name:port
+            </pre>
+        In addition, the Kerberos Service Name must be specified in the processor.
+        </p>
+        <h4>SASL_PLAINTEXT - GSSAPI</h4>
+        <p>
+            If the SASL mechanism is GSSAPI, then the client must provide a JAAS configuration to authenticate. The
+            JAAS configuration can be provided by specifying the java.security.auth.login.config system property in
+            NiFi's bootstrap.conf, such as:
+        <pre>
+    java.arg.16=-Djava.security.auth.login.config=/path/to/kafka_client_jaas.conf
+            </pre>
+        </p>
+        <p>
+            An example of the JAAS config file would be the following:
+        <pre>
+    KafkaClient {
+        com.sun.security.auth.module.Krb5LoginModule required
+        useKeyTab=true
+        storeKey=true
+        keyTab="/path/to/nifi.keytab"
+        serviceName="kafka"
+        principal="nifi@YOURREALM.COM";
+    };
+            </pre>
+        <b>NOTE:</b> The serviceName in the JAAS file must match the Kerberos Service Name in the processor.
+        </p>
+        <p>
+            Alternatively, the JAAS
+            configuration when using GSSAPI can be provided by specifying the Kerberos Principal and Kerberos Keytab
+            directly in the processor properties. This will dynamically create a JAAS configuration like above, and
+            will take precedence over the java.security.auth.login.config system property.
+        </p>
+        <h4>SASL_PLAINTEXT - PLAIN</h4>
+        <p>
+            If the SASL mechanism is PLAIN, then client must provide a JAAS configuration to authenticate, but
+            the JAAS configuration must use Kafka's PlainLoginModule. An example of the JAAS config file would
+            be the following:
+        <pre>
+    KafkaClient {
+      org.apache.kafka.common.security.plain.PlainLoginModule required
+      username="nifi"
+      password="nifi-password";
+    };
+            </pre>
+        </p>
+        <p>
+            <b>NOTE:</b> It is not recommended to use a SASL mechanism of PLAIN with SASL_PLAINTEXT, as it would transmit
+            the username and password unencrypted.
+        </p>
+        <p>
+            <b>NOTE:</b> Using the PlainLoginModule will cause it be registered in the JVM's static list of Providers, making
+            it visible to components in other NARs that may access the providers. There is currently a known issue
+            where Kafka processors using the PlainLoginModule will cause HDFS processors with Keberos to no longer work.
+        </p>
+        <h3>SASL_SSL</h3>
+        <p>
+            This option uses SASL with an SSL/TLS transport layer to authenticate to the broker. In order to use this
+            option the broker must be configured with a listener of the form:
+        <pre>
+    SASL_SSL://host.name:port
+            </pre>
+        </p>
+        <p>
+            See the SASL_PLAINTEXT section for a description of how to provide the proper JAAS configuration
+            depending on the SASL mechanism (GSSAPI or PLAIN).
+        </p>
+        <p>
+            See the SSL section for a description of how to configure the SSL Context Service based on the
+            ssl.client.auth property.
+        </p>
+
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0/additionalDetails.html
new file mode 100644
index 0000000..54b7786
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0/additionalDetails.html
@@ -0,0 +1,144 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>PublishKafka</title>
+        <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+    </head>
+
+    <body>
+        <h2>Description</h2>
+        <p>
+            This Processor puts the contents of a FlowFile to a Topic in
+            <a href="http://kafka.apache.org/">Apache Kafka</a> using KafkaProducer API available
+            with Kafka 1.0 API. The contents of the incoming FlowFile will be read using the
+            configured Record Reader. Each record will then be serialized using the configured
+            Record Writer, and this serialized form will be the content of a Kafka message.
+            This message is optionally assigned a key by using the &lt;Kafka Key&gt; Property.
+        </p>
+        
+
+        <h2>Security Configuration</h2>
+        <p>
+            The Security Protocol property allows the user to specify the protocol for communicating
+            with the Kafka broker. The following sections describe each of the protocols in further detail.
+        </p>
+        <h3>PLAINTEXT</h3>
+        <p>
+            This option provides an unsecured connection to the broker, with no client authentication and no encryption.
+            In order to use this option the broker must be configured with a listener of the form:
+            <pre>
+    PLAINTEXT://host.name:port
+            </pre>
+        </p>
+        <h3>SSL</h3>
+        <p>
+            This option provides an encrypted connection to the broker, with optional client authentication. In order
+            to use this option the broker must be configured with a listener of the form:
+            <pre>
+    SSL://host.name:port
+            </pre>
+            In addition, the processor must have an SSL Context Service selected.
+        </p>
+        <p>
+            If the broker specifies ssl.client.auth=none, or does not specify ssl.client.auth, then the client will
+            not be required to present a certificate. In this case, the SSL Context Service selected may specify only
+            a truststore containing the public key of the certificate authority used to sign the broker's key.
+        </p>
+        <p>
+            If the broker specifies ssl.client.auth=required then the client will be required to present a certificate.
+            In this case, the SSL Context Service must also specify a keystore containing a client key, in addition to
+            a truststore as described above.
+        </p>
+        <h3>SASL_PLAINTEXT</h3>
+        <p>
+            This option uses SASL with a PLAINTEXT transport layer to authenticate to the broker. In order to use this
+            option the broker must be configured with a listener of the form:
+            <pre>
+    SASL_PLAINTEXT://host.name:port
+            </pre>
+            In addition, the Kerberos Service Name must be specified in the processor.
+        </p>
+        <h4>SASL_PLAINTEXT - GSSAPI</h4>
+        <p>
+            If the SASL mechanism is GSSAPI, then the client must provide a JAAS configuration to authenticate. The
+            JAAS configuration can be provided by specifying the java.security.auth.login.config system property in
+            NiFi's bootstrap.conf, such as:
+            <pre>
+    java.arg.16=-Djava.security.auth.login.config=/path/to/kafka_client_jaas.conf
+            </pre>
+        </p>
+        <p>
+            An example of the JAAS config file would be the following:
+            <pre>
+    KafkaClient {
+        com.sun.security.auth.module.Krb5LoginModule required
+        useKeyTab=true
+        storeKey=true
+        keyTab="/path/to/nifi.keytab"
+        serviceName="kafka"
+        principal="nifi@YOURREALM.COM";
+    };
+            </pre>
+        <b>NOTE:</b> The serviceName in the JAAS file must match the Kerberos Service Name in the processor.
+        </p>
+        <p>
+            Alternatively, the JAAS
+            configuration when using GSSAPI can be provided by specifying the Kerberos Principal and Kerberos Keytab
+            directly in the processor properties. This will dynamically create a JAAS configuration like above, and
+            will take precedence over the java.security.auth.login.config system property.
+        </p>
+        <h4>SASL_PLAINTEXT - PLAIN</h4>
+        <p>
+            If the SASL mechanism is PLAIN, then client must provide a JAAS configuration to authenticate, but
+            the JAAS configuration must use Kafka's PlainLoginModule. An example of the JAAS config file would
+            be the following:
+            <pre>
+    KafkaClient {
+      org.apache.kafka.common.security.plain.PlainLoginModule required
+      username="nifi"
+      password="nifi-password";
+    };
+            </pre>
+        </p>
+        <p>
+            <b>NOTE:</b> It is not recommended to use a SASL mechanism of PLAIN with SASL_PLAINTEXT, as it would transmit
+            the username and password unencrypted.
+        </p>
+        <p>
+            <b>NOTE:</b> Using the PlainLoginModule will cause it be registered in the JVM's static list of Providers, making
+            it visible to components in other NARs that may access the providers. There is currently a known issue
+            where Kafka processors using the PlainLoginModule will cause HDFS processors with Keberos to no longer work.
+        </p>
+        <h3>SASL_SSL</h3>
+        <p>
+            This option uses SASL with an SSL/TLS transport layer to authenticate to the broker. In order to use this
+            option the broker must be configured with a listener of the form:
+            <pre>
+    SASL_SSL://host.name:port
+            </pre>
+        </p>
+        <p>
+            See the SASL_PLAINTEXT section for a description of how to provide the proper JAAS configuration
+            depending on the SASL mechanism (GSSAPI or PLAIN).
+        </p>
+        <p>
+            See the SSL section for a description of how to configure the SSL Context Service based on the
+            ssl.client.auth property.
+        </p>
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka_1_0/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka_1_0/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka_1_0/additionalDetails.html
new file mode 100644
index 0000000..7d68fe0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka_1_0/additionalDetails.html
@@ -0,0 +1,156 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>PublishKafka</title>
+        <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+    </head>
+
+    <body>
+        <h2>Description</h2>
+        <p>
+            This Processor puts the contents of a FlowFile to a Topic in
+            <a href="http://kafka.apache.org/">Apache Kafka</a> using KafkaProducer API available
+            with Kafka 1.0 API. The content of a FlowFile becomes the contents of a Kafka message.
+            This message is optionally assigned a key by using the &lt;Kafka Key&gt; Property.
+        </p>
+
+        <p>
+            The Processor allows the user to configure an optional Message Demarcator that
+            can be used to send many messages per FlowFile. For example, a <i>\n</i> could be used
+            to indicate that the contents of the FlowFile should be used to send one message
+            per line of text. It also supports multi-char demarcators (e.g., 'my custom demarcator').
+            If the property is not set, the entire contents of the FlowFile
+            will be sent as a single message. When using the demarcator, if some messages are
+            successfully sent but other messages fail to send, the resulting FlowFile will be
+            considered a failed FlowFile and will have additional attributes to that effect.
+            One of such attributes is 'failed.last.idx' which indicates the index of the last message
+            that was successfully ACKed by Kafka. (if no demarcator is used the value of this index will be -1).
+            This will allow PublishKafka to only re-send un-ACKed messages on the next re-try.
+        </p>
+        
+        
+        <h2>Security Configuration</h2>
+        <p>
+            The Security Protocol property allows the user to specify the protocol for communicating
+            with the Kafka broker. The following sections describe each of the protocols in further detail.
+        </p>
+        <h3>PLAINTEXT</h3>
+        <p>
+            This option provides an unsecured connection to the broker, with no client authentication and no encryption.
+            In order to use this option the broker must be configured with a listener of the form:
+            <pre>
+    PLAINTEXT://host.name:port
+            </pre>
+        </p>
+        <h3>SSL</h3>
+        <p>
+            This option provides an encrypted connection to the broker, with optional client authentication. In order
+            to use this option the broker must be configured with a listener of the form:
+            <pre>
+    SSL://host.name:port
+            </pre>
+            In addition, the processor must have an SSL Context Service selected.
+        </p>
+        <p>
+            If the broker specifies ssl.client.auth=none, or does not specify ssl.client.auth, then the client will
+            not be required to present a certificate. In this case, the SSL Context Service selected may specify only
+            a truststore containing the public key of the certificate authority used to sign the broker's key.
+        </p>
+        <p>
+            If the broker specifies ssl.client.auth=required then the client will be required to present a certificate.
+            In this case, the SSL Context Service must also specify a keystore containing a client key, in addition to
+            a truststore as described above.
+        </p>
+        <h3>SASL_PLAINTEXT</h3>
+        <p>
+            This option uses SASL with a PLAINTEXT transport layer to authenticate to the broker. In order to use this
+            option the broker must be configured with a listener of the form:
+            <pre>
+    SASL_PLAINTEXT://host.name:port
+            </pre>
+            In addition, the Kerberos Service Name must be specified in the processor.
+        </p>
+        <h4>SASL_PLAINTEXT - GSSAPI</h4>
+        <p>
+            If the SASL mechanism is GSSAPI, then the client must provide a JAAS configuration to authenticate. The
+            JAAS configuration can be provided by specifying the java.security.auth.login.config system property in
+            NiFi's bootstrap.conf, such as:
+            <pre>
+    java.arg.16=-Djava.security.auth.login.config=/path/to/kafka_client_jaas.conf
+            </pre>
+        </p>
+        <p>
+            An example of the JAAS config file would be the following:
+            <pre>
+    KafkaClient {
+        com.sun.security.auth.module.Krb5LoginModule required
+        useKeyTab=true
+        storeKey=true
+        keyTab="/path/to/nifi.keytab"
+        serviceName="kafka"
+        principal="nifi@YOURREALM.COM";
+    };
+            </pre>
+        <b>NOTE:</b> The serviceName in the JAAS file must match the Kerberos Service Name in the processor.
+        </p>
+        <p>
+            Alternatively, the JAAS
+            configuration when using GSSAPI can be provided by specifying the Kerberos Principal and Kerberos Keytab
+            directly in the processor properties. This will dynamically create a JAAS configuration like above, and
+            will take precedence over the java.security.auth.login.config system property.
+        </p>
+        <h4>SASL_PLAINTEXT - PLAIN</h4>
+        <p>
+            If the SASL mechanism is PLAIN, then client must provide a JAAS configuration to authenticate, but
+            the JAAS configuration must use Kafka's PlainLoginModule. An example of the JAAS config file would
+            be the following:
+            <pre>
+    KafkaClient {
+      org.apache.kafka.common.security.plain.PlainLoginModule required
+      username="nifi"
+      password="nifi-password";
+    };
+            </pre>
+        </p>
+        <p>
+            <b>NOTE:</b> It is not recommended to use a SASL mechanism of PLAIN with SASL_PLAINTEXT, as it would transmit
+            the username and password unencrypted.
+        </p>
+        <p>
+            <b>NOTE:</b> Using the PlainLoginModule will cause it be registered in the JVM's static list of Providers, making
+            it visible to components in other NARs that may access the providers. There is currently a known issue
+            where Kafka processors using the PlainLoginModule will cause HDFS processors with Keberos to no longer work.
+        </p>
+        <h3>SASL_SSL</h3>
+        <p>
+            This option uses SASL with an SSL/TLS transport layer to authenticate to the broker. In order to use this
+            option the broker must be configured with a listener of the form:
+            <pre>
+    SASL_SSL://host.name:port
+            </pre>
+        </p>
+        <p>
+            See the SASL_PLAINTEXT section for a description of how to provide the proper JAAS configuration
+            depending on the SASL mechanism (GSSAPI or PLAIN).
+        </p>
+        <p>
+            See the SSL section for a description of how to configure the SSL Context Service based on the
+            ssl.client.auth property.
+        </p>
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
new file mode 100644
index 0000000..7b5a8fc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ConsumeKafkaTest {
+
+    ConsumerLease mockLease = null;
+    ConsumerPool mockConsumerPool = null;
+
+    @Before
+    public void setup() {
+        mockLease = mock(ConsumerLease.class);
+        mockConsumerPool = mock(ConsumerPool.class);
+    }
+
+    @Test
+    public void validateCustomValidatorSettings() throws Exception {
+        ConsumeKafka_1_0 consumeKafka = new ConsumeKafka_1_0();
+        TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo");
+        runner.setProperty(ConsumeKafka_1_0.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST);
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        runner.assertValid();
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Foo");
+        runner.assertNotValid();
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        runner.assertValid();
+        runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        runner.assertValid();
+        runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void validatePropertiesValidation() throws Exception {
+        ConsumeKafka_1_0 consumeKafka = new ConsumeKafka_1_0();
+        TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo");
+        runner.setProperty(ConsumeKafka_1_0.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST);
+
+        runner.removeProperty(ConsumeKafka_1_0.GROUP_ID);
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("invalid because Group ID is required"));
+        }
+
+        runner.setProperty(ConsumeKafka_1_0.GROUP_ID, "");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+        }
+
+        runner.setProperty(ConsumeKafka_1_0.GROUP_ID, "  ");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+        }
+    }
+
+    @Test
+    public void validateGetAllMessages() throws Exception {
+        String groupName = "validateGetAllMessages";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+        ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void validateGetAllMessagesPattern() throws Exception {
+        String groupName = "validateGetAllMessagesPattern";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+        ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_1_0.TOPICS, "(fo.*)|(ba)");
+        runner.setProperty(ConsumeKafka_1_0.TOPIC_TYPE, "pattern");
+        runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void validateGetErrorMessages() throws Exception {
+        String groupName = "validateGetErrorMessages";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(true, false);
+        when(mockLease.commit()).thenReturn(Boolean.FALSE);
+
+        ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+        verify(mockLease, times(2)).continuePolling();
+        verify(mockLease, times(1)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void testJaasConfiguration() throws Exception {
+        ConsumeKafka_1_0 consumeKafka = new ConsumeKafka_1_0();
+        TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo");
+        runner.setProperty(ConsumeKafka_1_0.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST);
+
+        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka");
+        runner.assertValid();
+
+        runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File");
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
+        runner.assertValid();
+    }
+
+}


Mime
View raw message