nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [5/6] nifi git commit: NIFI-4600: This closes #2312. Added nifi-kafka-1-0-nar and nifi-kafka-1-0-processors modules
Date Mon, 04 Dec 2017 21:52:47 GMT
http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml
new file mode 100644
index 0000000..0f51298
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml
@@ -0,0 +1,92 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-kafka-bundle</artifactId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>nifi-kafka-1-0-processors</artifactId>
+    <packaging>jar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka1.0.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>${kafka1.0.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <!-- Transitive dependencies excluded because they are located
+                in a legacy Maven repository, which Maven 3 doesn't support. -->
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jms</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jdmk</groupId>
+                    <artifactId>jmxtools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jmx</groupId>
+                    <artifactId>jmxri</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
new file mode 100644
index 0000000..f0c1bd0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 1.0 Consumer API. "
+    + "The complementary NiFi processor for sending messages is PublishKafkaRecord_1_0. Please note that, at this time, the Processor assumes that "
+    + "all records that are retrieved from a given partition have the same schema. If any of the Kafka messages are pulled but cannot be parsed or written with the "
+    + "configured Record Reader or Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the "
+    + "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual messages within the single FlowFile. "
+    + "A 'record.count' attribute is added to indicate how many messages are contained in the FlowFile. No two Kafka messages will be placed into the same FlowFile if they "
+    + "have different schemas, or if they have different values for a message header that is included by the <Headers to Add as Attributes> property.")
+@Tags({"Kafka", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "1.0"})
+@WritesAttributes({
+    @WritesAttribute(attribute = "record.count", description = "The number of records received"),
+    @WritesAttribute(attribute = "mime.type", description = "The MIME Type that is provided by the configured Record Writer"),
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the records are from"),
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic records are from")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
+        description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+        + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+        + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
+@SeeAlso({ConsumeKafka_1_0.class, PublishKafka_1_0.class, PublishKafkaRecord_1_0.class})
+public class ConsumeKafkaRecord_1_0 extends AbstractProcessor {
+
+    static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
+    static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
+    static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
+    static final AllowableValue TOPIC_NAME = new AllowableValue("names", "names", "Topic is a full topic name or comma separated list of names");
+    static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", "pattern", "Topic is a regex using the Java Pattern syntax");
+
+    static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
+            .name("topic")
+            .displayName("Topic Name(s)")
+            .description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder()
+            .name("topic_type")
+            .displayName("Topic Name Format")
+            .description("Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression")
+            .required(true)
+            .allowableValues(TOPIC_NAME, TOPIC_PATTERN)
+            .defaultValue(TOPIC_NAME.getValue())
+            .build();
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+        .name("record-reader")
+        .displayName("Record Reader")
+        .description("The Record Reader to use for incoming FlowFiles")
+        .identifiesControllerService(RecordReaderFactory.class)
+        .expressionLanguageSupported(false)
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+        .name("record-writer")
+        .displayName("Record Writer")
+        .description("The Record Writer to use in order to serialize the data before sending to Kafka")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .expressionLanguageSupported(false)
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder()
+        .name("group.id")
+            .displayName("Group ID")
+            .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+
+    static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder()
+        .name("auto.offset.reset")
+            .displayName("Offset Reset")
+            .description("Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any "
+                    + "more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.")
+            .required(true)
+            .allowableValues(OFFSET_EARLIEST, OFFSET_LATEST, OFFSET_NONE)
+            .defaultValue(OFFSET_LATEST.getValue())
+            .build();
+
+    static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder()
+            .name("max.poll.records")
+            .displayName("Max Poll Records")
+            .description("Specifies the maximum number of records Kafka should return in a single poll.")
+            .required(false)
+            .defaultValue("10000")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new PropertyDescriptor.Builder()
+            .name("max-uncommit-offset-wait")
+            .displayName("Max Uncommitted Time")
+            .description("Specifies the maximum amount of time allowed to pass before offsets must be committed. "
+                    + "This value impacts how often offsets will be committed.  Committing offsets less often increases "
+                    + "throughput but also increases the window of potential data duplication in the event of a rebalance "
+                    + "or JVM restart between commits.  This value is also related to maximum poll records and the use "
+                    + "of a message demarcator.  When using a message demarcator we can have far more uncommitted messages "
+                    + "than when we're not as there is much less for us to keep track of in memory.")
+            .required(false)
+            .defaultValue("1 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+    static final PropertyDescriptor HONOR_TRANSACTIONS = new PropertyDescriptor.Builder()
+        .name("honor-transactions")
+        .displayName("Honor Transactions")
+        .description("Specifies whether or not NiFi should honor transactional guarantees when communicating with Kafka. If false, the Processor will use an \"isolation level\" of "
+            + "read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. If "
+            + "this value is true, NiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer must wait "
+            + "for the producer to finish its entire transaction instead of pulling as the messages become available.")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .required(true)
+        .build();
+    static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder()
+        .name("message-header-encoding")
+        .displayName("Message Header Encoding")
+        .description("Any message header that is found on a Kafka message will be added to the outbound FlowFile as an attribute. "
+            + "This property indicates the Character Encoding to use for deserializing the headers.")
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .defaultValue("UTF-8")
+        .required(false)
+        .build();
+    static final PropertyDescriptor HEADER_NAME_REGEX = new PropertyDescriptor.Builder()
+        .name("header-name-regex")
+        .displayName("Headers to Add as Attributes (Regex)")
+        .description("A Regular Expression that is matched against all message headers. "
+            + "Any message header whose name matches the regex will be added to the FlowFile as an Attribute. "
+            + "If not specified, no Header values will be added as FlowFile attributes. If two messages have a different value for the same header and that header is selected by "
+            + "the provided regex, then those two messages must be added to different FlowFiles. As a result, users should be cautious about using a regex like "
+            + "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling "
+            + "the messages together efficiently.")
+        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .required(false)
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles received from Kafka.  Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.")
+            .build();
+    static final Relationship REL_PARSE_FAILURE = new Relationship.Builder()
+            .name("parse.failure")
+            .description("If a message from Kafka cannot be parsed using the configured Record Reader, the contents of the "
+                + "message will be routed to this Relationship as its own individual FlowFile.")
+            .build();
+
+    static final List<PropertyDescriptor> DESCRIPTORS;
+    static final Set<Relationship> RELATIONSHIPS;
+
+    private volatile ConsumerPool consumerPool = null;
+    private final Set<ConsumerLease> activeLeases = Collections.synchronizedSet(new HashSet<>());
+
+    static {
+        List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(KafkaProcessorUtils.BOOTSTRAP_SERVERS);
+        descriptors.add(TOPICS);
+        descriptors.add(TOPIC_TYPE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(HONOR_TRANSACTIONS);
+        descriptors.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
+        descriptors.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE);
+        descriptors.add(KafkaProcessorUtils.USER_PRINCIPAL);
+        descriptors.add(KafkaProcessorUtils.USER_KEYTAB);
+        descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
+        descriptors.add(GROUP_ID);
+        descriptors.add(AUTO_OFFSET_RESET);
+        descriptors.add(MESSAGE_HEADER_ENCODING);
+        descriptors.add(HEADER_NAME_REGEX);
+        descriptors.add(MAX_POLL_RECORDS);
+        descriptors.add(MAX_UNCOMMITTED_TIME);
+        DESCRIPTORS = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_PARSE_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(rels);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @OnStopped
+    public void close() {
+        final ConsumerPool pool = consumerPool;
+        consumerPool = null;
+
+        if (pool != null) {
+            pool.close();
+        }
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
+                .name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)).dynamic(true)
+                .build();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        return KafkaProcessorUtils.validateCommonProperties(validationContext);
+    }
+
+    private synchronized ConsumerPool getConsumerPool(final ProcessContext context) {
+        ConsumerPool pool = consumerPool;
+        if (pool != null) {
+            return pool;
+        }
+
+        return consumerPool = createConsumerPool(context, getLogger());
+    }
+
+    protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+        final int maxLeases = context.getMaxConcurrentTasks();
+        final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
+
+        final Map<String, Object> props = new HashMap<>();
+        KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        final String topicListing = context.getProperty(ConsumeKafkaRecord_1_0.TOPICS).evaluateAttributeExpressions().getValue();
+        final String topicType = context.getProperty(ConsumeKafkaRecord_1_0.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
+        final List<String> topics = new ArrayList<>();
+        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
+        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean honorTransactions = context.getProperty(HONOR_TRANSACTIONS).asBoolean();
+
+        final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
+        final Charset charset = Charset.forName(charsetName);
+
+        final String headerNameRegex = context.getProperty(HEADER_NAME_REGEX).getValue();
+        final Pattern headerNamePattern = headerNameRegex == null ? null : Pattern.compile(headerNameRegex);
+
+        if (topicType.equals(TOPIC_NAME.getValue())) {
+            for (final String topic : topicListing.split(",", 100)) {
+                final String trimmedName = topic.trim();
+                if (!trimmedName.isEmpty()) {
+                    topics.add(trimmedName);
+                }
+            }
+
+            return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topics, maxUncommittedTime, securityProtocol,
+                bootstrapServers, log, honorTransactions, charset, headerNamePattern);
+        } else if (topicType.equals(TOPIC_PATTERN.getValue())) {
+            final Pattern topicPattern = Pattern.compile(topicListing.trim());
+            return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topicPattern, maxUncommittedTime, securityProtocol,
+                bootstrapServers, log, honorTransactions, charset, headerNamePattern);
+        } else {
+            getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
+            return null;
+        }
+    }
+
+    @OnUnscheduled
+    public void interruptActiveThreads() {
+        // There are known issues with the Kafka client library that result in the client code hanging
+        // indefinitely when unable to communicate with the broker. In order to address this, we will wait
+        // up to 30 seconds for the Threads to finish and then will call Consumer.wakeup() to trigger the
+        // thread to wakeup when it is blocked, waiting on a response.
+        final long nanosToWait = TimeUnit.SECONDS.toNanos(5L);
+        final long start = System.nanoTime();
+        while (System.nanoTime() - start < nanosToWait && !activeLeases.isEmpty()) {
+            try {
+                Thread.sleep(100L);
+            } catch (final InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                return;
+            }
+        }
+
+        if (!activeLeases.isEmpty()) {
+            int count = 0;
+            for (final ConsumerLease lease : activeLeases) {
+                getLogger().info("Consumer {} has not finished after waiting 30 seconds; will attempt to wake-up the lease", new Object[] {lease});
+                lease.wakeup();
+                count++;
+            }
+
+            getLogger().info("Woke up {} consumers", new Object[] {count});
+        }
+
+        activeLeases.clear();
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final ConsumerPool pool = getConsumerPool(context);
+        if (pool == null) {
+            context.yield();
+            return;
+        }
+
+        try (final ConsumerLease lease = pool.obtainConsumer(session, context)) {
+            if (lease == null) {
+                context.yield();
+                return;
+            }
+
+            activeLeases.add(lease);
+            try {
+                while (this.isScheduled() && lease.continuePolling()) {
+                    lease.poll();
+                }
+                if (this.isScheduled() && !lease.commit()) {
+                    context.yield();
+                }
+            } catch (final WakeupException we) {
+                getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
+                    + "Will roll back session and discard any partially received data.", new Object[] {lease});
+            } catch (final KafkaException kex) {
+                getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
+                        new Object[]{lease, kex}, kex);
+            } catch (final Throwable t) {
+                getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
+                        new Object[]{lease, t}, t);
+            } finally {
+                activeLeases.remove(lease);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java
new file mode 100644
index 0000000..fdc2cb3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
+
+@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 1.0 Consumer API. "
+    + "The complementary NiFi processor for sending messages is PublishKafka_1_0.")
+@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "1.0"})
+@WritesAttributes({
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"),
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
+            + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
+    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
+        description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+        + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+        + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
+public class ConsumeKafka_1_0 extends AbstractProcessor {
+
+    static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
+
+    static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
+
+    static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
+
+    static final AllowableValue TOPIC_NAME = new AllowableValue("names", "names", "Topic is a full topic name or comma separated list of names");
+
+    static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", "pattern", "Topic is a regex using the Java Pattern syntax");
+
+    static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
+            .name("topic")
+            .displayName("Topic Name(s)")
+            .description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder()
+            .name("topic_type")
+            .displayName("Topic Name Format")
+            .description("Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression")
+            .required(true)
+            .allowableValues(TOPIC_NAME, TOPIC_PATTERN)
+            .defaultValue(TOPIC_NAME.getValue())
+            .build();
+
+    static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder()
+            .name(ConsumerConfig.GROUP_ID_CONFIG)
+            .displayName("Group ID")
+            .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+
+    static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder()
+            .name(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
+            .displayName("Offset Reset")
+            .description("Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any "
+                    + "more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.")
+            .required(true)
+            .allowableValues(OFFSET_EARLIEST, OFFSET_LATEST, OFFSET_NONE)
+            .defaultValue(OFFSET_LATEST.getValue())
+            .build();
+
+    static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
+            .name("key-attribute-encoding")
+            .displayName("Key Attribute Encoding")
+            .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
+            .required(true)
+            .defaultValue(UTF8_ENCODING.getValue())
+            .allowableValues(UTF8_ENCODING, HEX_ENCODING)
+            .build();
+
+    static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
+            .name("message-demarcator")
+            .displayName("Message Demarcator")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .description("Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains "
+                    + "all Kafka messages in a single batch for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use "
+                    + "for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received "
+                    + "will result in a single FlowFile which  "
+                    + "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS")
+            .build();
+    static final PropertyDescriptor HEADER_NAME_REGEX = new PropertyDescriptor.Builder()
+        .name("header-name-regex")
+        .displayName("Headers to Add as Attributes (Regex)")
+        .description("A Regular Expression that is matched against all message headers. "
+            + "Any message header whose name matches the regex will be added to the FlowFile as an Attribute. "
+            + "If not specified, no Header values will be added as FlowFile attributes. If two messages have a different value for the same header and that header is selected by "
+            + "the provided regex, then those two messages must be added to different FlowFiles. As a result, users should be cautious about using a regex like "
+            + "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling "
+            + "the messages together efficiently.")
+        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .required(false)
+        .build();
+
+    static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder()
+            .name("max.poll.records")
+            .displayName("Max Poll Records")
+            .description("Specifies the maximum number of records Kafka should return in a single poll.")
+            .required(false)
+            .defaultValue("10000")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new PropertyDescriptor.Builder()
+            .name("max-uncommit-offset-wait")
+            .displayName("Max Uncommitted Time")
+            .description("Specifies the maximum amount of time allowed to pass before offsets must be committed. "
+                    + "This value impacts how often offsets will be committed.  Committing offsets less often increases "
+                    + "throughput but also increases the window of potential data duplication in the event of a rebalance "
+                    + "or JVM restart between commits.  This value is also related to maximum poll records and the use "
+                    + "of a message demarcator.  When using a message demarcator we can have far more uncommitted messages "
+                    + "than when we're not as there is much less for us to keep track of in memory.")
+            .required(false)
+            .defaultValue("1 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor HONOR_TRANSACTIONS = new PropertyDescriptor.Builder()
+        .name("honor-transactions")
+        .displayName("Honor Transactions")
+        .description("Specifies whether or not NiFi should honor transactional guarantees when communicating with Kafka. If false, the Processor will use an \"isolation level\" of "
+            + "read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. If "
+            + "this value is true, NiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer must wait "
+            + "for the producer to finish its entire transaction instead of pulling as the messages become available.")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .required(true)
+        .build();
+    static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder()
+        .name("message-header-encoding")
+        .displayName("Message Header Encoding")
+        .description("Any message header that is found on a Kafka message will be added to the outbound FlowFile as an attribute. "
+            + "This property indicates the Character Encoding to use for deserializing the headers.")
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .defaultValue("UTF-8")
+        .required(false)
+        .build();
+
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("FlowFiles received from Kafka. Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.")
+        .build();
+
+    static final List<PropertyDescriptor> DESCRIPTORS;
+    static final Set<Relationship> RELATIONSHIPS;
+
+    private volatile ConsumerPool consumerPool = null;
+    private final Set<ConsumerLease> activeLeases = Collections.synchronizedSet(new HashSet<>());
+
+    static {
+        List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
+        descriptors.add(TOPICS);
+        descriptors.add(TOPIC_TYPE);
+        descriptors.add(HONOR_TRANSACTIONS);
+        descriptors.add(GROUP_ID);
+        descriptors.add(AUTO_OFFSET_RESET);
+        descriptors.add(KEY_ATTRIBUTE_ENCODING);
+        descriptors.add(MESSAGE_DEMARCATOR);
+        descriptors.add(MESSAGE_HEADER_ENCODING);
+        descriptors.add(HEADER_NAME_REGEX);
+        descriptors.add(MAX_POLL_RECORDS);
+        descriptors.add(MAX_UNCOMMITTED_TIME);
+        DESCRIPTORS = Collections.unmodifiableList(descriptors);
+        RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @OnStopped
+    public void close() {
+        final ConsumerPool pool = consumerPool;
+        consumerPool = null;
+        if (pool != null) {
+            pool.close();
+        }
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
+                .name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)).dynamic(true)
+                .build();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        return KafkaProcessorUtils.validateCommonProperties(validationContext);
+    }
+
+    private synchronized ConsumerPool getConsumerPool(final ProcessContext context) {
+        ConsumerPool pool = consumerPool;
+        if (pool != null) {
+            return pool;
+        }
+
+        return consumerPool = createConsumerPool(context, getLogger());
+    }
+
+    protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+        final int maxLeases = context.getMaxConcurrentTasks();
+        final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
+        final byte[] demarcator = context.getProperty(ConsumeKafka_1_0.MESSAGE_DEMARCATOR).isSet()
+                ? context.getProperty(ConsumeKafka_1_0.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
+                : null;
+        final Map<String, Object> props = new HashMap<>();
+        KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        final String topicListing = context.getProperty(ConsumeKafka_1_0.TOPICS).evaluateAttributeExpressions().getValue();
+        final String topicType = context.getProperty(ConsumeKafka_1_0.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
+        final List<String> topics = new ArrayList<>();
+        final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
+        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
+        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+        final boolean honorTransactions = context.getProperty(HONOR_TRANSACTIONS).asBoolean();
+
+        final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
+        final Charset charset = Charset.forName(charsetName);
+
+        final String headerNameRegex = context.getProperty(HEADER_NAME_REGEX).getValue();
+        final Pattern headerNamePattern = headerNameRegex == null ? null : Pattern.compile(headerNameRegex);
+
+        if (topicType.equals(TOPIC_NAME.getValue())) {
+            for (final String topic : topicListing.split(",", 100)) {
+                final String trimmedName = topic.trim();
+                if (!trimmedName.isEmpty()) {
+                    topics.add(trimmedName);
+                }
+            }
+
+            return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol,
+                bootstrapServers, log, honorTransactions, charset, headerNamePattern);
+        } else if (topicType.equals(TOPIC_PATTERN.getValue())) {
+            final Pattern topicPattern = Pattern.compile(topicListing.trim());
+            return new ConsumerPool(maxLeases, demarcator, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol,
+                bootstrapServers, log, honorTransactions, charset, headerNamePattern);
+        } else {
+            getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
+            return null;
+        }
+    }
+
+    @OnUnscheduled
+    public void interruptActiveThreads() {
+        // There are known issues with the Kafka client library that result in the client code hanging
+        // indefinitely when unable to communicate with the broker. In order to address this, we will wait
+        // up to 30 seconds for the Threads to finish and then will call Consumer.wakeup() to trigger the
+        // thread to wakeup when it is blocked, waiting on a response.
+        final long nanosToWait = TimeUnit.SECONDS.toNanos(5L);
+        final long start = System.nanoTime();
+        while (System.nanoTime() - start < nanosToWait && !activeLeases.isEmpty()) {
+            try {
+                Thread.sleep(100L);
+            } catch (final InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                return;
+            }
+        }
+
+        if (!activeLeases.isEmpty()) {
+            int count = 0;
+            for (final ConsumerLease lease : activeLeases) {
+                getLogger().info("Consumer {} has not finished after waiting 30 seconds; will attempt to wake-up the lease", new Object[] {lease});
+                lease.wakeup();
+                count++;
+            }
+
+            getLogger().info("Woke up {} consumers", new Object[] {count});
+        }
+
+        activeLeases.clear();
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final ConsumerPool pool = getConsumerPool(context);
+        if (pool == null) {
+            context.yield();
+            return;
+        }
+
+        try (final ConsumerLease lease = pool.obtainConsumer(session, context)) {
+            if (lease == null) {
+                context.yield();
+                return;
+            }
+
+            activeLeases.add(lease);
+            try {
+                while (this.isScheduled() && lease.continuePolling()) {
+                    lease.poll();
+                }
+                if (this.isScheduled() && !lease.commit()) {
+                    context.yield();
+                }
+            } catch (final WakeupException we) {
+                getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
+                    + "Will roll back session and discard any partially received data.", new Object[] {lease});
+            } catch (final KafkaException kex) {
+                getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
+                        new Object[]{lease, kex}, kex);
+            } catch (final Throwable t) {
+                getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
+                        new Object[]{lease, t}, t);
+            } finally {
+                activeLeases.remove(lease);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
new file mode 100644
index 0000000..a2a449c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -0,0 +1,701 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.REL_PARSE_FAILURE;
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.REL_SUCCESS;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import javax.xml.bind.DatatypeConverter;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+/**
+ * This class represents a lease to access a Kafka Consumer object. The lease is
+ * intended to be obtained from a ConsumerPool. The lease is closeable to allow
+ * for the clean model of a try w/resources whereby non-exceptional cases mean
+ * the lease will be returned to the pool for future use by others. A given
+ * lease may only belong to a single thread a time.
+ */
+public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListener {
+
+    private final long maxWaitMillis;
+    private final Consumer<byte[], byte[]> kafkaConsumer;
+    private final ComponentLog logger;
+    private final byte[] demarcatorBytes;
+    private final String keyEncoding;
+    private final String securityProtocol;
+    private final String bootstrapServers;
+    private final RecordSetWriterFactory writerFactory;
+    private final RecordReaderFactory readerFactory;
+    private final Charset headerCharacterSet;
+    private final Pattern headerNamePattern;
+    private boolean poisoned = false;
+    //used for tracking demarcated flowfiles to their TopicPartition so we can append
+    //to them on subsequent poll calls
+    private final Map<BundleInformation, BundleTracker> bundleMap = new HashMap<>();
+    private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<>();
+    private long leaseStartNanos = -1;
+    private boolean lastPollEmpty = false;
+    private int totalMessages = 0;
+
+    ConsumerLease(
+            final long maxWaitMillis,
+            final Consumer<byte[], byte[]> kafkaConsumer,
+            final byte[] demarcatorBytes,
+            final String keyEncoding,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final RecordReaderFactory readerFactory,
+            final RecordSetWriterFactory writerFactory,
+            final ComponentLog logger,
+            final Charset headerCharacterSet,
+            final Pattern headerNamePattern) {
+        this.maxWaitMillis = maxWaitMillis;
+        this.kafkaConsumer = kafkaConsumer;
+        this.demarcatorBytes = demarcatorBytes;
+        this.keyEncoding = keyEncoding;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+        this.logger = logger;
+        this.headerCharacterSet = headerCharacterSet;
+        this.headerNamePattern = headerNamePattern;
+    }
+
+    /**
+     * clears out internal state elements excluding session and consumer as
+     * those are managed by the pool itself
+     */
+    private void resetInternalState() {
+        bundleMap.clear();
+        uncommittedOffsetsMap.clear();
+        leaseStartNanos = -1;
+        lastPollEmpty = false;
+        totalMessages = 0;
+    }
+
+    /**
+     * Kafka will call this method whenever it is about to rebalance the
+     * consumers for the given partitions. We'll simply take this to mean that
+     * we need to quickly commit what we've got and will return the consumer to
+     * the pool. This method will be called during the poll() method call of
+     * this class and will be called by the same thread calling poll according
+     * to the Kafka API docs. After this method executes the session and kafka
+     * offsets are committed and this lease is closed.
+     *
+     * @param partitions partitions being reassigned
+     */
+    @Override
+    public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
+        logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+        //force a commit here.  Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns partition
+        commit();
+    }
+
+    /**
+     * This will be called by Kafka when the rebalance has completed. We don't
+     * need to do anything with this information other than optionally log it as
+     * by this point we've committed what we've got and moved on.
+     *
+     * @param partitions topic partition set being reassigned
+     */
+    @Override
+    public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
+        logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+    }
+
+    /**
+     * Executes a poll on the underlying Kafka Consumer and creates any new
+     * flowfiles necessary or appends to existing ones if in demarcation mode.
+     */
+    void poll() {
+        /**
+         * Implementation note:
+         * Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged,
+         * for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread.
+         * If this situation lasts longer than max.poll.interval.ms (defaults to 5 min), Kafka consumer sends
+         * Leave Group request to Group Coordinator. When ConsumeKafka processor is scheduled again, Kafka client checks
+         * if this client instance is still a part of consumer group. If not, it rejoins before polling messages.
+         * This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0.
+         */
+        try {
+            final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
+            lastPollEmpty = records.count() == 0;
+            processRecords(records);
+        } catch (final ProcessException pe) {
+            throw pe;
+        } catch (final Throwable t) {
+            this.poison();
+            throw t;
+        }
+    }
+
+    /**
+     * Notifies Kafka to commit the offsets for the specified topic/partition
+     * pairs to the specified offsets w/the given metadata. This can offer
+     * higher performance than the other commitOffsets call as it allows the
+     * kafka client to collect more data from Kafka before committing the
+     * offsets.
+     *
+     * if false then we didn't do anything and should probably yield if true
+     * then we committed new data
+     *
+     */
+    boolean commit() {
+        if (uncommittedOffsetsMap.isEmpty()) {
+            resetInternalState();
+            return false;
+        }
+        try {
+            /**
+             * Committing the nifi session then the offsets means we have an at
+             * least once guarantee here. If we reversed the order we'd have at
+             * most once.
+             */
+            final Collection<FlowFile> bundledFlowFiles = getBundles();
+            if (!bundledFlowFiles.isEmpty()) {
+                getProcessSession().transfer(bundledFlowFiles, REL_SUCCESS);
+            }
+            getProcessSession().commit();
+
+            final Map<TopicPartition, OffsetAndMetadata> offsetsMap = uncommittedOffsetsMap;
+            kafkaConsumer.commitSync(offsetsMap);
+            resetInternalState();
+            return true;
+        } catch (final IOException ioe) {
+            poison();
+            logger.error("Failed to finish writing out FlowFile bundle", ioe);
+            throw new ProcessException(ioe);
+        } catch (final KafkaException kex) {
+            poison();
+            logger.warn("Duplicates are likely as we were able to commit the process"
+                    + " session but received an exception from Kafka while committing"
+                    + " offsets.");
+            throw kex;
+        } catch (final Throwable t) {
+            poison();
+            throw t;
+        }
+    }
+
+    /**
+     * Indicates whether we should continue polling for data. If we are not
+     * writing data with a demarcator then we're writing individual flow files
+     * per kafka message therefore we must be very mindful of memory usage for
+     * the flow file objects (not their content) being held in memory. The
+     * content of kafka messages will be written to the content repository
+     * immediately upon each poll call but we must still be mindful of how much
+     * memory can be used in each poll call. We will indicate that we should
+     * stop polling our last poll call produced no new results or if we've
+     * polling and processing data longer than the specified maximum polling
+     * time or if we have reached out specified max flow file limit or if a
+     * rebalance has been initiated for one of the partitions we're watching;
+     * otherwise true.
+     *
+     * @return true if should keep polling; false otherwise
+     */
+    boolean continuePolling() {
+        //stop if the last poll produced new no data
+        if (lastPollEmpty) {
+            return false;
+        }
+
+        //stop if we've gone past our desired max uncommitted wait time
+        if (leaseStartNanos < 0) {
+            leaseStartNanos = System.nanoTime();
+        }
+        final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+        if (durationMillis > maxWaitMillis) {
+            return false;
+        }
+
+        //stop if we've generated enough flowfiles that we need to be concerned about memory usage for the objects
+        if (bundleMap.size() > 200) { //a magic number - the number of simultaneous bundles to track
+            return false;
+        } else {
+            return totalMessages < 1000;//admittedlly a magic number - good candidate for processor property
+        }
+    }
+
+    /**
+     * Indicates that the underlying session and consumer should be immediately
+     * considered invalid. Once closed the session will be rolled back and the
+     * pool should destroy the underlying consumer. This is useful if due to
+     * external reasons, such as the processor no longer being scheduled, this
+     * lease should be terminated immediately.
+     */
+    private void poison() {
+        poisoned = true;
+    }
+
+    /**
+     * @return true if this lease has been poisoned; false otherwise
+     */
+    boolean isPoisoned() {
+        return poisoned;
+    }
+
+    /**
+     * Trigger the consumer's {@link KafkaConsumer#wakeup() wakeup()} method.
+     */
+    public void wakeup() {
+        kafkaConsumer.wakeup();
+    }
+
+    /**
+     * Abstract method that is intended to be extended by the pool that created
+     * this ConsumerLease object. It should ensure that the session given to
+     * create this session is rolled back and that the underlying kafka consumer
+     * is either returned to the pool for continued use or destroyed if this
+     * lease has been poisoned. It can only be called once. Calling it more than
+     * once can result in undefined and non threadsafe behavior.
+     */
+    @Override
+    public void close() {
+        resetInternalState();
+    }
+
+    public abstract ProcessSession getProcessSession();
+
+    public abstract void yield();
+
+    private void processRecords(final ConsumerRecords<byte[], byte[]> records) {
+        records.partitions().stream().forEach(partition -> {
+            List<ConsumerRecord<byte[], byte[]>> messages = records.records(partition);
+            if (!messages.isEmpty()) {
+                //update maximum offset map for this topic partition
+                long maxOffset = messages.stream()
+                        .mapToLong(record -> record.offset())
+                        .max()
+                        .getAsLong();
+
+                //write records to content repository and session
+                if (demarcatorBytes != null) {
+                    writeDemarcatedData(getProcessSession(), messages, partition);
+                } else if (readerFactory != null && writerFactory != null) {
+                    writeRecordData(getProcessSession(), messages, partition);
+                } else {
+                    messages.stream().forEach(message -> {
+                        writeData(getProcessSession(), message, partition);
+                    });
+                }
+
+                totalMessages += messages.size();
+                uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L));
+            }
+        });
+    }
+
+    private static String encodeKafkaKey(final byte[] key, final String encoding) {
+        if (key == null) {
+            return null;
+        }
+
+        if (HEX_ENCODING.getValue().equals(encoding)) {
+            return DatatypeConverter.printHexBinary(key);
+        } else if (UTF8_ENCODING.getValue().equals(encoding)) {
+            return new String(key, StandardCharsets.UTF_8);
+        } else {
+            return null;  // won't happen because it is guaranteed by the Allowable Values
+        }
+    }
+
+    private Collection<FlowFile> getBundles() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        for (final BundleTracker tracker : bundleMap.values()) {
+            final boolean includeBundle = processBundle(tracker);
+            if (includeBundle) {
+                flowFiles.add(tracker.flowFile);
+            }
+        }
+        return flowFiles;
+    }
+
+    private boolean processBundle(final BundleTracker bundle) throws IOException {
+        final RecordSetWriter writer = bundle.recordWriter;
+        if (writer != null) {
+            final WriteResult writeResult;
+
+            try {
+                writeResult = writer.finishRecordSet();
+            } finally {
+                writer.close();
+            }
+
+            if (writeResult.getRecordCount() == 0) {
+                getProcessSession().remove(bundle.flowFile);
+                return false;
+            }
+
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.putAll(writeResult.getAttributes());
+            attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+
+            bundle.flowFile = getProcessSession().putAllAttributes(bundle.flowFile, attributes);
+        }
+
+        populateAttributes(bundle);
+        return true;
+    }
+
+    private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
+        FlowFile flowFile = session.create();
+        final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
+        tracker.incrementRecordCount(1);
+        final byte[] value = record.value();
+        if (value != null) {
+            flowFile = session.write(flowFile, out -> {
+                out.write(value);
+            });
+        }
+        flowFile = session.putAllAttributes(flowFile, getAttributes(record));
+        tracker.updateFlowFile(flowFile);
+        populateAttributes(tracker);
+        session.transfer(tracker.flowFile, REL_SUCCESS);
+    }
+
+    private void writeDemarcatedData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
+        // Group the Records by their BundleInformation
+        final Map<BundleInformation, List<ConsumerRecord<byte[], byte[]>>> map = records.stream()
+            .collect(Collectors.groupingBy(rec -> new BundleInformation(topicPartition, null, getAttributes(rec))));
+
+        for (final Map.Entry<BundleInformation, List<ConsumerRecord<byte[], byte[]>>> entry : map.entrySet()) {
+            final BundleInformation bundleInfo = entry.getKey();
+            final List<ConsumerRecord<byte[], byte[]>> recordList = entry.getValue();
+
+            final boolean demarcateFirstRecord;
+
+            BundleTracker tracker = bundleMap.get(bundleInfo);
+
+            FlowFile flowFile;
+            if (tracker == null) {
+                tracker = new BundleTracker(recordList.get(0), topicPartition, keyEncoding);
+                flowFile = session.create();
+                flowFile = session.putAllAttributes(flowFile, bundleInfo.attributes);
+                tracker.updateFlowFile(flowFile);
+                demarcateFirstRecord = false; //have not yet written records for this topic/partition in this lease
+            } else {
+                demarcateFirstRecord = true; //have already been writing records for this topic/partition in this lease
+            }
+            flowFile = tracker.flowFile;
+
+            tracker.incrementRecordCount(recordList.size());
+            flowFile = session.append(flowFile, out -> {
+                boolean useDemarcator = demarcateFirstRecord;
+                for (final ConsumerRecord<byte[], byte[]> record : recordList) {
+                    if (useDemarcator) {
+                        out.write(demarcatorBytes);
+                    }
+                    final byte[] value = record.value();
+                    if (value != null) {
+                        out.write(record.value());
+                    }
+                    useDemarcator = true;
+                }
+            });
+
+            tracker.updateFlowFile(flowFile);
+            bundleMap.put(bundleInfo, tracker);
+        }
+    }
+
+    private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause) {
+        handleParseFailure(consumerRecord, session, cause, "Failed to parse message from Kafka using the configured Record Reader. "
+            + "Will route message as its own FlowFile to the 'parse.failure' relationship");
+    }
+
+    private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause, final String message) {
+        // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
+        final Map<String, String> attributes = getAttributes(consumerRecord);
+        attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
+        attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
+        attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());
+
+        FlowFile failureFlowFile = session.create();
+
+        final byte[] value = consumerRecord.value();
+        if (value != null) {
+            failureFlowFile = session.write(failureFlowFile, out -> out.write(value));
+        }
+        failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
+
+        final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic());
+        session.getProvenanceReporter().receive(failureFlowFile, transitUri);
+
+        session.transfer(failureFlowFile, REL_PARSE_FAILURE);
+
+        if (cause == null) {
+            logger.error(message);
+        } else {
+            logger.error(message, cause);
+        }
+
+        session.adjustCounter("Parse Failures", 1, false);
+    }
+
+    private Map<String, String> getAttributes(final ConsumerRecord<?, ?> consumerRecord) {
+        final Map<String, String> attributes = new HashMap<>();
+        if (headerNamePattern == null) {
+            return attributes;
+        }
+
+        for (final Header header : consumerRecord.headers()) {
+            final String attributeName = header.key();
+            if (headerNamePattern.matcher(attributeName).matches()) {
+                attributes.put(attributeName, new String(header.value(), headerCharacterSet));
+            }
+        }
+
+        return attributes;
+    }
+
+    private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
+        // In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile.
+        // We don't want to create a new FlowFile for each record that we receive, so we will just create
+        // a "temporary flowfile" that will be removed in the finally block below and use that to pass to
+        // the createRecordReader method.
+        RecordSetWriter writer = null;
+        try {
+            for (final ConsumerRecord<byte[], byte[]> consumerRecord : records) {
+                final Map<String, String> attributes = getAttributes(consumerRecord);
+
+                final byte[] recordBytes = consumerRecord.value() == null ? new byte[0] : consumerRecord.value();
+                try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
+                    final RecordReader reader;
+
+                    try {
+                        reader = readerFactory.createRecordReader(attributes, in, logger);
+                    } catch (final Exception e) {
+                        handleParseFailure(consumerRecord, session, e);
+                        continue;
+                    }
+
+                    Record record;
+                    while ((record = reader.nextRecord()) != null) {
+                        // Determine the bundle for this record.
+                        final RecordSchema recordSchema = record.getSchema();
+                        final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);
+
+                        BundleTracker tracker = bundleMap.get(bundleInfo);
+                        if (tracker == null) {
+                            FlowFile flowFile = session.create();
+                            flowFile = session.putAllAttributes(flowFile, attributes);
+
+                            final OutputStream rawOut = session.write(flowFile);
+
+                            final RecordSchema writeSchema;
+                            try {
+                                writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+                            } catch (final Exception e) {
+                                logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
+
+                                try {
+                                    rollback(topicPartition);
+                                } catch (final Exception rollbackException) {
+                                    logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
+                                }
+
+                                yield();
+                                throw new ProcessException(e);
+                            }
+
+                            writer = writerFactory.createWriter(logger, writeSchema, rawOut);
+                            writer.beginRecordSet();
+
+                            tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
+                            tracker.updateFlowFile(flowFile);
+                            bundleMap.put(bundleInfo, tracker);
+                        } else {
+                            writer = tracker.recordWriter;
+                        }
+
+                        try {
+                            writer.write(record);
+                        } catch (final RuntimeException re) {
+                            handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+                                + "Will route message as its own FlowFile to the 'parse.failure' relationship");
+                            continue;
+                        }
+
+                        tracker.incrementRecordCount(1L);
+                        session.adjustCounter("Records Received", records.size(), false);
+                    }
+                }
+            }
+        } catch (final Exception e) {
+            logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e);
+
+            try {
+                if (writer != null) {
+                    writer.close();
+                }
+            } catch (final Exception ioe) {
+                logger.warn("Failed to close Record Writer", ioe);
+            }
+
+            try {
+                rollback(topicPartition);
+            } catch (final Exception rollbackException) {
+                logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
+            }
+
+            throw new ProcessException(e);
+        }
+    }
+
+
+    private void rollback(final TopicPartition topicPartition) {
+        OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
+        if (offsetAndMetadata == null) {
+            offsetAndMetadata = kafkaConsumer.committed(topicPartition);
+        }
+
+        final long offset = offsetAndMetadata.offset();
+        kafkaConsumer.seek(topicPartition, offset);
+    }
+
+
+
+    private void populateAttributes(final BundleTracker tracker) {
+        final Map<String, String> kafkaAttrs = new HashMap<>();
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+        if (tracker.key != null && tracker.totalRecords == 1) {
+            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+        }
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
+        if (tracker.totalRecords > 1) {
+            // Add a record.count attribute to remain consistent with other record-oriented processors. If not
+            // reading/writing records, then use "kafka.count" attribute.
+            if (tracker.recordWriter == null) {
+                kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
+            } else {
+                kafkaAttrs.put("record.count", String.valueOf(tracker.totalRecords));
+            }
+        }
+        final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
+        final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+        final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, tracker.topic);
+        getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
+        tracker.updateFlowFile(newFlowFile);
+    }
+
+    private static class BundleTracker {
+
+        final long initialOffset;
+        final int partition;
+        final String topic;
+        final String key;
+        final RecordSetWriter recordWriter;
+        FlowFile flowFile;
+        long totalRecords = 0;
+
+        private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding) {
+            this(initialRecord, topicPartition, keyEncoding, null);
+        }
+
+        private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter) {
+            this.initialOffset = initialRecord.offset();
+            this.partition = topicPartition.partition();
+            this.topic = topicPartition.topic();
+            this.recordWriter = recordWriter;
+            this.key = encodeKafkaKey(initialRecord.key(), keyEncoding);
+        }
+
+        private void incrementRecordCount(final long count) {
+            totalRecords += count;
+        }
+
+        private void updateFlowFile(final FlowFile flowFile) {
+            this.flowFile = flowFile;
+        }
+
+    }
+
+    private static class BundleInformation {
+        private final TopicPartition topicPartition;
+        private final RecordSchema schema;
+        private final Map<String, String> attributes;
+
+        public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema, final Map<String, String> attributes) {
+            this.topicPartition = topicPartition;
+            this.schema = schema;
+            this.attributes = attributes;
+        }
+
+        @Override
+        public int hashCode() {
+            return 41 + 13 * topicPartition.hashCode() + ((schema == null) ? 0 : 13 * schema.hashCode()) + ((attributes == null) ? 0 : 13 * attributes.hashCode());
+        }
+
+        @Override
+        public boolean equals(final Object obj) {
+            if (obj == this) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (!(obj instanceof BundleInformation)) {
+                return false;
+            }
+
+            final BundleInformation other = (BundleInformation) obj;
+            return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema) && Objects.equals(attributes, other.attributes);
+        }
+    }
+}


Mime
View raw message