nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbe...@apache.org
Subject [3/5] nifi git commit: NIFI-2865: Refactored PublishKafka and PublishKafka_0_10 to allow batching of FlowFiles within a single publish and to let messages timeout if not acknowledged
Date Thu, 06 Oct 2016 14:06:16 GMT
http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
deleted file mode 100644
index b3f1bd1..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
-import org.mockito.Mockito;
-import static org.mockito.Mockito.times;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.verify;
-
-public class PublishKafkaTest {
-
-    @Test
-    public void validateCustomSerilaizerDeserializerSettings() throws Exception {
-        PublishKafka_0_10 publishKafka = new PublishKafka_0_10();
-        TestRunner runner = TestRunners.newTestRunner(publishKafka);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
-        runner.setProperty(PublishKafka_0_10.TOPIC, "foo");
-        runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "3 sec");
-        runner.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
-        runner.assertValid();
-        runner.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Foo");
-        runner.assertNotValid();
-    }
-
-    @Test
-    public void validatePropertiesValidation() throws Exception {
-        PublishKafka_0_10 publishKafka = new PublishKafka_0_10();
-        TestRunner runner = TestRunners.newTestRunner(publishKafka);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
-        runner.setProperty(PublishKafka_0_10.TOPIC, "foo");
-        runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "foo");
-
-        try {
-            runner.assertValid();
-            fail();
-        } catch (AssertionError e) {
-            assertTrue(e.getMessage().contains("'max.block.ms' validated against 'foo' is invalid"));
-        }
-    }
-
-    @Test
-    public void validateCustomValidation() {
-        String topicName = "validateCustomValidation";
-        PublishKafka_0_10 publishKafka = new PublishKafka_0_10();
-
-        /*
-         * Validates that Kerberos principle is required if one of SASL set for
-         * secirity protocol
-         */
-        TestRunner runner = TestRunners.newTestRunner(publishKafka);
-        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
-        try {
-            runner.run();
-            fail();
-        } catch (Throwable e) {
-            assertTrue(e.getMessage().contains("'Kerberos Service Name' is invalid because"));
-        }
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateSingleCharacterDemarcatedMessages() {
-        String topicName = "validateSingleCharacterDemarcatedMessages";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
-        runner.setProperty(PublishKafka_0_10.KEY, "key1");
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
-
-        runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerA() {
-        String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
-        runner.setProperty(PublishKafka_0_10.KEY, "key1");
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka_0_10.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName());
-        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "foo");
-
-        runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
-
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerB() {
-        String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
-        StubPublishKafka putKafka = new StubPublishKafka(1);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
-        runner.setProperty(PublishKafka_0_10.KEY, "key1");
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka_0_10.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName());
-        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "foo");
-
-        runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
-
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateOnSendFailureAndThenResendSuccessA() throws Exception {
-        String topicName = "validateSendFailureAndThenResendSuccess";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
-        runner.setProperty(PublishKafka_0_10.KEY, "key1");
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
-        runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "3000 millis");
-
-        final String text = "Hello World\nGoodbye\nfail\n2";
-        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-        putKafka.destroy();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateOnSendFailureAndThenResendSuccessB() throws Exception {
-        String topicName = "validateSendFailureAndThenResendSuccess";
-        StubPublishKafka putKafka = new StubPublishKafka(1);
-
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
-        runner.setProperty(PublishKafka_0_10.KEY, "key1");
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
-        runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "500 millis");
-
-        final String text = "Hello World\nGoodbye\nfail\n2";
-        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateOnFutureGetFailureAndThenResendSuccessFirstMessageFail() throws Exception {
-        String topicName = "validateSendFailureAndThenResendSuccess";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
-        runner.setProperty(PublishKafka_0_10.KEY, "key1");
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
-        runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "500 millis");
-
-        final String text = "futurefail\nHello World\nGoodbye\n2";
-        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_FAILURE).get(0);
-        assertNotNull(ff);
-        runner.enqueue(ff);
-
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        // 6 sends due to duplication
-        verify(producer, times(5)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateOnFutureGetFailureAndThenResendSuccess() throws Exception {
-        String topicName = "validateSendFailureAndThenResendSuccess";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
-        runner.setProperty(PublishKafka_0_10.KEY, "key1");
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
-        runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "500 millis");
-
-        final String text = "Hello World\nGoodbye\nfuturefail\n2";
-        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_FAILURE).get(0);
-        assertNotNull(ff);
-        runner.enqueue(ff);
-
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        // 6 sends due to duplication
-        verify(producer, times(6)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateDemarcationIntoEmptyMessages() {
-        String topicName = "validateDemarcationIntoEmptyMessages";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        final TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
-        runner.setProperty(PublishKafka_0_10.KEY, "key1");
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
-
-        final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(StandardCharsets.UTF_8);
-        runner.enqueue(bytes);
-        runner.run(1);
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateComplexRightPartialDemarcatedMessages() {
-        String topicName = "validateComplexRightPartialDemarcatedMessages";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "僠<僠WILDSTUFF僠>僠");
-
-        runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>".getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(3)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateComplexLeftPartialDemarcatedMessages() {
-        String topicName = "validateComplexLeftPartialDemarcatedMessages";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "僠<僠WILDSTUFF僠>僠");
-
-        runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>僠<僠WILDSTUFF僠>僠".getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-
-        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateComplexPartialMatchDemarcatedMessages() {
-        String topicName = "validateComplexPartialMatchDemarcatedMessages";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "僠<僠WILDSTUFF僠>僠");
-
-        runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDBOOMSTUFF僠>僠".getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-
-        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(2)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @Test
-    public void validateUtf8Key() {
-        String topicName = "validateUtf8Key";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka_0_10.KEY, "${myKey}");
-
-        final Map<String, String> attributes = Collections.singletonMap("myKey", "key1");
-        runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
-        runner.run(1);
-
-        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
-        final Map<Object, Object> msgs = putKafka.getMessagesSent();
-        assertEquals(1, msgs.size());
-        final byte[] msgKey = (byte[]) msgs.keySet().iterator().next();
-        assertTrue(Arrays.equals("key1".getBytes(StandardCharsets.UTF_8), msgKey));
-    }
-
-    @Test
-    public void validateHexKey() {
-        String topicName = "validateUtf8Key";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka_0_10.KEY_ATTRIBUTE_ENCODING, KafkaProcessorUtils.HEX_ENCODING);
-        runner.setProperty(PublishKafka_0_10.KEY, "${myKey}");
-
-        final Map<String, String> attributes = Collections.singletonMap("myKey", "6B657931");
-        runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
-        runner.run(1);
-
-        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
-        final Map<Object, Object> msgs = putKafka.getMessagesSent();
-        assertEquals(1, msgs.size());
-        final byte[] msgKey = (byte[]) msgs.keySet().iterator().next();
-
-        assertTrue(Arrays.equals(new byte[] {0x6B, 0x65, 0x79, 0x31}, msgKey));
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
deleted file mode 100644
index 76c29cd..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-
-import org.junit.Test;
-
-public class PublishingContextTest {
-
-    @Test
-    public void failInvalidConstructorArgs() {
-        try {
-            new PublishingContext(null, null);
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-        try {
-            new PublishingContext(mock(InputStream.class), null);
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-
-        try {
-            new PublishingContext(mock(InputStream.class), "");
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-
-        try {
-            new PublishingContext(mock(InputStream.class), "mytopic", -3);
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-    }
-
-    @Test
-    public void validateFullSetting() {
-        PublishingContext publishingContext = new PublishingContext(mock(InputStream.class), "topic", 3);
-        publishingContext.setDelimiterBytes("delimiter".getBytes(StandardCharsets.UTF_8));
-        publishingContext.setKeyBytes("key".getBytes(StandardCharsets.UTF_8));
-
-        assertEquals("delimiter", new String(publishingContext.getDelimiterBytes(), StandardCharsets.UTF_8));
-        assertEquals("key", new String(publishingContext.getKeyBytes(), StandardCharsets.UTF_8));
-        assertEquals("topic", publishingContext.getTopic());
-        assertEquals("topic: 'topic'; delimiter: 'delimiter'", publishingContext.toString());
-    }
-
-    @Test
-    public void validateOnlyOnceSetPerInstance() {
-        PublishingContext publishingContext = new PublishingContext(mock(InputStream.class), "topic");
-        publishingContext.setKeyBytes(new byte[]{0});
-        try {
-            publishingContext.setKeyBytes(new byte[]{0});
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-
-        publishingContext.setDelimiterBytes(new byte[]{0});
-        try {
-            publishingContext.setDelimiterBytes(new byte[]{0});
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
deleted file mode 100644
index c009014..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.lang.reflect.Field;
-import static org.mockito.Mockito.when;
-
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TopicAuthorizationException;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.BOOTSTRAP_SERVERS;
-import org.mockito.Mockito;
-import static org.mockito.Mockito.mock;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-public class StubPublishKafka extends PublishKafka_0_10 {
-
-    private volatile Producer<byte[], byte[]> producer;
-
-    private volatile boolean failed;
-
-    private final int ackCheckSize;
-
-    private final ExecutorService executor = Executors.newCachedThreadPool();
-    private final Map<Object, Object> msgsSent = new ConcurrentHashMap<>();
-
-    StubPublishKafka(int ackCheckSize) {
-        this.ackCheckSize = ackCheckSize;
-    }
-
-    public Producer<byte[], byte[]> getProducer() {
-        return producer;
-    }
-
-    public void destroy() {
-        this.executor.shutdownNow();
-    }
-
-    public Map<Object, Object> getMessagesSent() {
-        return new HashMap<>(msgsSent);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session)
-            throws ProcessException {
-        final Map<String, String> kafkaProperties = new HashMap<>();
-        KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
-        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
-        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
-        KafkaPublisher publisher;
-        try {
-            Field f = PublishKafka_0_10.class.getDeclaredField("brokers");
-            f.setAccessible(true);
-            f.set(this, context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue());
-            publisher = (KafkaPublisher) TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class);
-            publisher.setAckWaitTime(15000);
-            producer = mock(Producer.class);
-            this.instrumentProducer(producer, false);
-            Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer");
-            kf.setAccessible(true);
-            kf.set(publisher, producer);
-
-            Field componentLogF = KafkaPublisher.class.getDeclaredField("componentLog");
-            componentLogF.setAccessible(true);
-            componentLogF.set(publisher, mock(ComponentLog.class));
-
-            Field ackCheckSizeField = KafkaPublisher.class.getDeclaredField("ackCheckSize");
-            ackCheckSizeField.setAccessible(true);
-            ackCheckSizeField.set(publisher, this.ackCheckSize);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new IllegalStateException(e);
-        }
-        return publisher;
-    }
-
-    @SuppressWarnings("unchecked")
-    private void instrumentProducer(Producer<byte[], byte[]> producer, boolean failRandomly) {
-
-        when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer<Future<RecordMetadata>>() {
-            @Override
-            public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
-                final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
-                if (record != null && record.key() != null) {
-                    msgsSent.put(record.key(), record.value());
-                }
-
-                String value = new String(record.value(), StandardCharsets.UTF_8);
-                if ("fail".equals(value) && !StubPublishKafka.this.failed) {
-                    StubPublishKafka.this.failed = true;
-                    throw new RuntimeException("intentional");
-                }
-                Future<RecordMetadata> future = executor.submit(new Callable<RecordMetadata>() {
-                    @Override
-                    public RecordMetadata call() throws Exception {
-                        if ("futurefail".equals(value) && !StubPublishKafka.this.failed) {
-                            StubPublishKafka.this.failed = true;
-                            throw new TopicAuthorizationException("Unauthorized");
-                        } else {
-                            TopicPartition partition = new TopicPartition("foo", 0);
-                            RecordMetadata meta = new RecordMetadata(partition, 0, 0);
-                            return meta;
-                        }
-                    }
-                });
-                return future;
-            }
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
new file mode 100644
index 0000000..e54a10c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestInFlightMessageTracker {
+
+    @Test(timeout = 5000L)
+    public void testAwaitCompletionWhenComplete() throws InterruptedException, TimeoutException {
+        final MockFlowFile flowFile = new MockFlowFile(1L);
+
+        final InFlightMessageTracker tracker = new InFlightMessageTracker();
+        tracker.incrementSentCount(flowFile);
+
+        verifyNotComplete(tracker);
+
+        tracker.incrementSentCount(flowFile);
+        verifyNotComplete(tracker);
+
+        tracker.incrementAcknowledgedCount(flowFile);
+        verifyNotComplete(tracker);
+
+        tracker.incrementAcknowledgedCount(flowFile);
+        tracker.awaitCompletion(1L);
+    }
+
+    @Test(timeout = 5000L)
+    public void testAwaitCompletionWhileWaiting() throws InterruptedException, ExecutionException {
+        final MockFlowFile flowFile = new MockFlowFile(1L);
+
+        final InFlightMessageTracker tracker = new InFlightMessageTracker();
+        tracker.incrementSentCount(flowFile);
+
+        verifyNotComplete(tracker);
+
+        tracker.incrementSentCount(flowFile);
+        verifyNotComplete(tracker);
+
+        final ExecutorService exec = Executors.newFixedThreadPool(1);
+        final Future<?> future = exec.submit(() -> {
+            try {
+                tracker.awaitCompletion(10000L);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        tracker.incrementAcknowledgedCount(flowFile);
+        tracker.incrementAcknowledgedCount(flowFile);
+
+        future.get();
+    }
+
+    private void verifyNotComplete(final InFlightMessageTracker tracker) throws InterruptedException {
+        try {
+            tracker.awaitCompletion(10L);
+            Assert.fail("Expected timeout");
+        } catch (final TimeoutException te) {
+            // expected
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
new file mode 100644
index 0000000..c7d1a60
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPublishKafka {
+    private static final String TOPIC_NAME = "unit-test";
+
+    private PublisherPool mockPool;
+    private PublisherLease mockLease;
+    private TestRunner runner;
+
+    @Before
+    public void setup() {
+        mockPool = mock(PublisherPool.class);
+        mockLease = mock(PublisherLease.class);
+
+        when(mockPool.obtainPublisher()).thenReturn(mockLease);
+
+        runner = TestRunners.newTestRunner(new PublishKafka_0_10() {
+            @Override
+            protected PublisherPool createPublisherPool(final ProcessContext context) {
+                return mockPool;
+            }
+        });
+
+        runner.setProperty(PublishKafka_0_10.TOPIC, TOPIC_NAME);
+    }
+
+    @Test
+    public void testSingleSuccess() throws IOException {
+        final MockFlowFile flowFile = runner.enqueue("hello world");
+
+        when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFile, 1));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleSuccess() throws IOException {
+        final Set<FlowFile> flowFiles = new HashSet<>();
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+
+
+        when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 1));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 3);
+
+        verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testSingleFailure() throws IOException {
+        final MockFlowFile flowFile = runner.enqueue("hello world");
+
+        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_FAILURE, 1);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleFailures() throws IOException {
+        final Set<FlowFile> flowFiles = new HashSet<>();
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+
+        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_FAILURE, 3);
+
+        verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleMessagesPerFlowFile() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        msgCounts.put(flowFiles.get(0), 10);
+        msgCounts.put(flowFiles.get(1), 20);
+
+        final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap());
+
+        when(mockLease.complete()).thenReturn(result);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 2);
+
+        verify(mockLease, times(2)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+
+        runner.assertAllFlowFilesContainAttribute("msg.count");
+        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_SUCCESS).stream()
+            .filter(ff -> ff.getAttribute("msg.count").equals("10"))
+            .count());
+        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_SUCCESS).stream()
+            .filter(ff -> ff.getAttribute("msg.count").equals("20"))
+            .count());
+    }
+
+
+    @Test
+    public void testSomeSuccessSomeFailure() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        msgCounts.put(flowFiles.get(0), 10);
+        msgCounts.put(flowFiles.get(1), 20);
+
+        final Map<FlowFile, Exception> failureMap = new HashMap<>();
+        failureMap.put(flowFiles.get(2), new RuntimeException("Intentional Unit Test Exception"));
+        failureMap.put(flowFiles.get(3), new RuntimeException("Intentional Unit Test Exception"));
+
+        final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles.subList(0, 2)), failureMap);
+
+        when(mockLease.complete()).thenReturn(result);
+
+        runner.run();
+        runner.assertTransferCount(PublishKafka_0_10.REL_SUCCESS, 2);
+        runner.assertTransferCount(PublishKafka_0_10.REL_FAILURE, 2);
+
+        verify(mockLease, times(4)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+
+        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_SUCCESS).stream()
+            .filter(ff -> "10".equals(ff.getAttribute("msg.count")))
+            .count());
+        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_SUCCESS).stream()
+            .filter(ff -> "20".equals(ff.getAttribute("msg.count")))
+            .count());
+
+        assertTrue(runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_FAILURE).stream()
+            .noneMatch(ff -> ff.getAttribute("msg.count") != null));
+    }
+
+
+    private PublishResult createAllSuccessPublishResult(final FlowFile successfulFlowFile, final int msgCount) {
+        return createAllSuccessPublishResult(Collections.singleton(successfulFlowFile), msgCount);
+    }
+
+    private PublishResult createAllSuccessPublishResult(final Set<FlowFile> successfulFlowFiles, final int msgCountPerFlowFile) {
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        for (final FlowFile ff : successfulFlowFiles) {
+            msgCounts.put(ff, msgCountPerFlowFile);
+        }
+        return createPublishResult(msgCounts, successfulFlowFiles, Collections.emptyMap());
+    }
+
+    private PublishResult createFailurePublishResult(final FlowFile failure) {
+        return createFailurePublishResult(Collections.singleton(failure));
+    }
+
+    private PublishResult createFailurePublishResult(final Set<FlowFile> failures) {
+        final Map<FlowFile, Exception> failureMap = failures.stream().collect(Collectors.toMap(ff -> ff, ff -> new RuntimeException("Intentional Unit Test Exception")));
+        return createPublishResult(Collections.emptyMap(), Collections.emptySet(), failureMap);
+    }
+
+    private PublishResult createPublishResult(final Map<FlowFile, Integer> msgCounts, final Set<FlowFile> successFlowFiles, final Map<FlowFile, Exception> failures) {
+        // sanity check.
+        for (final FlowFile success : successFlowFiles) {
+            if (failures.containsKey(success)) {
+                throw new IllegalArgumentException("Found same FlowFile in both 'success' and 'failures' collections: " + success);
+            }
+        }
+
+        return new PublishResult() {
+            @Override
+            public Collection<FlowFile> getSuccessfulFlowFiles() {
+                return successFlowFiles;
+            }
+
+            @Override
+            public Collection<FlowFile> getFailedFlowFiles() {
+                return failures.keySet();
+            }
+
+            @Override
+            public int getSuccessfulMessageCount(FlowFile flowFile) {
+                Integer count = msgCounts.get(flowFile);
+                return count == null ? 0 : count.intValue();
+            }
+
+            @Override
+            public Exception getReasonForFailure(FlowFile flowFile) {
+                return failures.get(flowFile);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
new file mode 100644
index 0000000..c2d143c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+
+public class TestPublisherLease {
+    private ComponentLog logger;
+    private Producer<byte[], byte[]> producer;
+
+    @Before
+    @SuppressWarnings("unchecked")
+    public void setup() {
+        logger = Mockito.mock(ComponentLog.class);
+        producer = Mockito.mock(Producer.class);
+    }
+
+    @Test
+    public void testPoisonOnException() throws IOException {
+        final AtomicInteger poisonCount = new AtomicInteger(0);
+
+        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger) {
+            @Override
+            public void poison() {
+                poisonCount.incrementAndGet();
+                super.poison();
+            }
+        };
+
+        final FlowFile flowFile = new MockFlowFile(1L);
+        final String topic = "unit-test";
+        final byte[] messageKey = null;
+        final byte[] demarcatorBytes = null;
+
+        final InputStream failureInputStream = new InputStream() {
+            @Override
+            public int read() throws IOException {
+                throw new IOException("Intentional Unit Test Exception");
+            }
+        };
+
+        try {
+            lease.publish(flowFile, failureInputStream, messageKey, demarcatorBytes, topic);
+            Assert.fail("Expected IOException");
+        } catch (final IOException ioe) {
+            // expected
+        }
+
+        assertEquals(1, poisonCount.get());
+
+        final PublishResult result = lease.complete();
+        assertTrue(result.getFailedFlowFiles().contains(flowFile));
+        assertFalse(result.getSuccessfulFlowFiles().contains(flowFile));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testPoisonOnFailure() throws IOException {
+        final AtomicInteger poisonCount = new AtomicInteger(0);
+
+        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger) {
+            @Override
+            public void poison() {
+                poisonCount.incrementAndGet();
+                super.poison();
+            }
+        };
+
+        final FlowFile flowFile = new MockFlowFile(1L);
+        final String topic = "unit-test";
+        final byte[] messageKey = null;
+        final byte[] demarcatorBytes = null;
+
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws Throwable {
+                final Callback callback = invocation.getArgumentAt(1, Callback.class);
+                callback.onCompletion(null, new RuntimeException("Unit Test Intentional Exception"));
+                return null;
+            }
+        }).when(producer).send(any(ProducerRecord.class), any(Callback.class));
+
+        lease.publish(flowFile, new ByteArrayInputStream(new byte[1]), messageKey, demarcatorBytes, topic);
+
+        assertEquals(1, poisonCount.get());
+
+        final PublishResult result = lease.complete();
+        assertTrue(result.getFailedFlowFiles().contains(flowFile));
+        assertFalse(result.getSuccessfulFlowFiles().contains(flowFile));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testAllDelimitedMessagesSent() throws IOException {
+        final AtomicInteger poisonCount = new AtomicInteger(0);
+
+        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger) {
+            @Override
+            protected void poison() {
+                poisonCount.incrementAndGet();
+                super.poison();
+            }
+        };
+
+        final AtomicInteger correctMessages = new AtomicInteger(0);
+        final AtomicInteger incorrectMessages = new AtomicInteger(0);
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
+                final byte[] value = record.value();
+                final String valueString = new String(value, StandardCharsets.UTF_8);
+                if ("1234567890".equals(valueString)) {
+                    correctMessages.incrementAndGet();
+                } else {
+                    incorrectMessages.incrementAndGet();
+                }
+
+                return null;
+            }
+        }).when(producer).send(any(ProducerRecord.class), any(Callback.class));
+
+        final FlowFile flowFile = new MockFlowFile(1L);
+        final String topic = "unit-test";
+        final byte[] messageKey = null;
+        final byte[] demarcatorBytes = "\n".getBytes(StandardCharsets.UTF_8);
+
+        final byte[] flowFileContent = "1234567890\n1234567890\n1234567890\n\n\n\n1234567890\n\n\n1234567890\n\n\n\n".getBytes(StandardCharsets.UTF_8);
+        lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic);
+
+        final byte[] flowFileContent2 = new byte[0];
+        lease.publish(new MockFlowFile(2L), new ByteArrayInputStream(flowFileContent2), messageKey, demarcatorBytes, topic);
+
+        final byte[] flowFileContent3 = "1234567890\n1234567890".getBytes(StandardCharsets.UTF_8); // no trailing new line
+        lease.publish(new MockFlowFile(3L), new ByteArrayInputStream(flowFileContent3), messageKey, demarcatorBytes, topic);
+
+        final byte[] flowFileContent4 = "\n\n\n".getBytes(StandardCharsets.UTF_8);
+        lease.publish(new MockFlowFile(4L), new ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic);
+
+        assertEquals(0, poisonCount.get());
+
+        verify(producer, times(0)).flush();
+
+        final PublishResult result = lease.complete();
+        assertTrue(result.getFailedFlowFiles().contains(flowFile));
+        assertFalse(result.getSuccessfulFlowFiles().contains(flowFile));
+
+        assertEquals(7, correctMessages.get());
+        assertEquals(0, incorrectMessages.get());
+
+        verify(producer, times(1)).flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
new file mode 100644
index 0000000..7c70194
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+public class TestPublisherPool {
+
+    @Test
+    public void testLeaseCloseReturnsToPool() {
+        final Map<String, Object> kafkaProperties = new HashMap<>();
+        kafkaProperties.put("bootstrap.servers", "localhost:1111");
+        kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName());
+        kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName());
+
+        final PublisherPool pool = new PublisherPool(kafkaProperties, Mockito.mock(ComponentLog.class), 1024 * 1024, 1000L);
+        assertEquals(0, pool.available());
+
+        final PublisherLease lease = pool.obtainPublisher();
+        assertEquals(0, pool.available());
+
+        lease.close();
+        assertEquals(1, pool.available());
+    }
+
+    @Test
+    public void testPoisonedLeaseNotReturnedToPool() {
+        final Map<String, Object> kafkaProperties = new HashMap<>();
+        kafkaProperties.put("bootstrap.servers", "localhost:1111");
+        kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName());
+        kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName());
+
+        final PublisherPool pool = new PublisherPool(kafkaProperties, Mockito.mock(ComponentLog.class), 1024 * 1024, 1000L);
+        assertEquals(0, pool.available());
+
+        final PublisherLease lease = pool.obtainPublisher();
+        assertEquals(0, pool.available());
+
+        lease.poison();
+        lease.close();
+        assertEquals(0, pool.available());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java
deleted file mode 100644
index 0ed00fb..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.test;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Properties;
-
-import kafka.producer.KeyedMessage;
-import kafka.producer.OldProducer;
-
-/**
- * Helper class which helps to produce events targeting {@link EmbeddedKafka}
- * server.
- */
-public class EmbeddedKafkaProducerHelper implements Closeable {
-
-    private final EmbeddedKafka kafkaServer;
-
-    private final OldProducer producer;
-
-    /**
-     * Will create an instance of EmbeddedKafkaProducerHelper based on default
-     * configurations.<br>
-     * Default configuration includes:<br>
-     * <i>
-     * metadata.broker.list=[determined from the instance of EmbeddedKafka]<br>
-     * serializer.class=kafka.serializer.DefaultEncoder<br>
-     * key.serializer.class=kafka.serializer.DefaultEncoder<br>
-     * auto.create.topics.enable=true
-     * </i><br>
-     * <br>
-     * If you wish to supply additional configuration properties or override
-     * existing use
-     * {@link EmbeddedKafkaProducerHelper#EmbeddedKafkaProducerHelper(EmbeddedKafka, Properties)}
-     * constructor.
-     *
-     * @param kafkaServer
-     *            instance of {@link EmbeddedKafka}
-     */
-    public EmbeddedKafkaProducerHelper(EmbeddedKafka kafkaServer) {
-        this(kafkaServer, null);
-    }
-
-    /**
-     * Will create an instance of EmbeddedKafkaProducerHelper based on default
-     * configurations and additional configuration properties.<br>
-     * Default configuration includes:<br>
-     * metadata.broker.list=[determined from the instance of EmbeddedKafka]<br>
-     * serializer.class=kafka.serializer.DefaultEncoder<br>
-     * key.serializer.class=kafka.serializer.DefaultEncoder<br>
-     * auto.create.topics.enable=true<br>
-     * <br>
-     *
-     * @param kafkaServer
-     *            instance of {@link EmbeddedKafka}
-     * @param additionalProperties
-     *            instance of {@link Properties} specifying additional producer
-     *            configuration properties.
-     */
-    public EmbeddedKafkaProducerHelper(EmbeddedKafka kafkaServer, Properties additionalProperties) {
-        this.kafkaServer = kafkaServer;
-        Properties producerProperties = new Properties();
-        producerProperties.put("metadata.broker.list", "localhost:" + this.kafkaServer.getKafkaPort());
-        producerProperties.put("serializer.class", "kafka.serializer.DefaultEncoder");
-        producerProperties.put("key.serializer.class", "kafka.serializer.DefaultEncoder");
-        producerProperties.put("auto.create.topics.enable", "true");
-        if (additionalProperties != null) {
-            producerProperties.putAll(additionalProperties);
-        }
-        this.producer = new OldProducer(producerProperties);
-    }
-
-    /**
-     * Will send an event to a Kafka topic. If topic doesn't exist it will be
-     * auto-created.
-     *
-     * @param topicName
-     *            Kafka topic name.
-     * @param event
-     *            string representing an event(message) to be sent to Kafka.
-     */
-    public void sendEvent(String topicName, String event) {
-        KeyedMessage<byte[], byte[]> data = new KeyedMessage<byte[], byte[]>(topicName, event.getBytes());
-        this.producer.send(data.topic(), data.key(), data.message());
-    }
-
-    /**
-     * Will close the underlying Kafka producer.
-     */
-    @Override
-    public void close() throws IOException {
-        this.producer.close();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
index 5bc0e0e..e524589 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.kafka;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
@@ -111,8 +112,9 @@ class KafkaPublisher implements Closeable {
      *            instance of {@link PublishingContext} which hold context
      *            information about the message(s) to be sent.
      * @return The index of the last successful offset.
+     * @throws IOException if unable to read from the Input Stream
      */
-    KafkaPublisherResult publish(PublishingContext publishingContext) {
+    KafkaPublisherResult publish(PublishingContext publishingContext) throws IOException {
         StreamDemarcator streamTokenizer = new StreamDemarcator(publishingContext.getContentStream(),
             publishingContext.getDelimiterBytes(), publishingContext.getMaxRequestSize());
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
index 5792e64..64fdb1d 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
@@ -223,7 +223,8 @@ public class ConsumeKafka extends AbstractProcessor {
         final byte[] demarcator = context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).isSet()
                 ? context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
                 : null;
-        final Map<String, String> props = new HashMap<>();
+
+        final Map<String, Object> props = new HashMap<>();
         KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index fba8cb5..e13a8c3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -74,7 +74,7 @@ public class ConsumerPool implements Closeable {
     public ConsumerPool(
             final int maxConcurrentLeases,
             final byte[] demarcator,
-            final Map<String, String> kafkaProperties,
+        final Map<String, Object> kafkaProperties,
             final List<String> topics,
             final long maxWaitMillis,
             final String keyEncoding,
@@ -115,6 +115,7 @@ public class ConsumerPool implements Closeable {
              * sitting idle which could prompt excessive rebalances.
              */
             lease = new SimpleConsumerLease(consumer);
+
             /**
              * This subscription tightly couples the lease to the given
              * consumer. They cannot be separated from then on.
@@ -148,7 +149,7 @@ public class ConsumerPool implements Closeable {
         });
     }
 
-    private void closeConsumer(final Consumer consumer) {
+    private void closeConsumer(final Consumer<?, ?> consumer) {
         consumerClosedCountRef.incrementAndGet();
         try {
             consumer.unsubscribe();

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
new file mode 100644
index 0000000..e7d5cb7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+public class InFlightMessageTracker {
+    private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile = new ConcurrentHashMap<>();
+    private final ConcurrentMap<FlowFile, Exception> failures = new ConcurrentHashMap<>();
+    private final Object progressMutex = new Object();
+
+    public void incrementAcknowledgedCount(final FlowFile flowFile) {
+        final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
+        counter.incrementAcknowledgedCount();
+
+        synchronized (progressMutex) {
+            progressMutex.notify();
+        }
+    }
+
+    public int getAcknowledgedCount(final FlowFile flowFile) {
+        final Counts counter = messageCountsByFlowFile.get(flowFile);
+        return (counter == null) ? 0 : counter.getAcknowledgedCount();
+    }
+
+    public void incrementSentCount(final FlowFile flowFile) {
+        final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
+        counter.incrementSentCount();
+    }
+
+    public int getSentCount(final FlowFile flowFile) {
+        final Counts counter = messageCountsByFlowFile.get(flowFile);
+        return (counter == null) ? 0 : counter.getSentCount();
+    }
+
+    public void fail(final FlowFile flowFile, final Exception exception) {
+        failures.putIfAbsent(flowFile, exception);
+
+        synchronized (progressMutex) {
+            progressMutex.notify();
+        }
+    }
+
+    public Exception getFailure(final FlowFile flowFile) {
+        return failures.get(flowFile);
+    }
+
+    public boolean isFailed(final FlowFile flowFile) {
+        return getFailure(flowFile) != null;
+    }
+
+    public void reset() {
+        messageCountsByFlowFile.clear();
+        failures.clear();
+    }
+
+    public PublishResult failOutstanding(final Exception exception) {
+        messageCountsByFlowFile.keySet().stream()
+            .filter(ff -> !isComplete(ff))
+            .filter(ff -> !failures.containsKey(ff))
+            .forEach(ff -> failures.put(ff, exception));
+
+        return createPublishResult();
+    }
+
+    private boolean isComplete(final FlowFile flowFile) {
+        final Counts counts = messageCountsByFlowFile.get(flowFile);
+        if (counts.getAcknowledgedCount() == counts.getSentCount()) {
+            // all messages received successfully.
+            return true;
+        }
+
+        if (failures.containsKey(flowFile)) {
+            // FlowFile failed so is complete
+            return true;
+        }
+
+        return false;
+    }
+
+    private boolean isComplete() {
+        return messageCountsByFlowFile.keySet().stream()
+            .allMatch(flowFile -> isComplete(flowFile));
+    }
+
+    void awaitCompletion(final long millis) throws InterruptedException, TimeoutException {
+        final long startTime = System.nanoTime();
+        final long maxTime = startTime + TimeUnit.MILLISECONDS.toNanos(millis);
+
+        while (System.nanoTime() < maxTime) {
+            synchronized (progressMutex) {
+                if (isComplete()) {
+                    return;
+                }
+
+                progressMutex.wait(millis);
+            }
+        }
+
+        throw new TimeoutException();
+    }
+
+
+    PublishResult createPublishResult() {
+        return new PublishResult() {
+            @Override
+            public Collection<FlowFile> getSuccessfulFlowFiles() {
+                if (failures.isEmpty()) {
+                    return messageCountsByFlowFile.keySet();
+                }
+
+                final Set<FlowFile> flowFiles = new HashSet<>(messageCountsByFlowFile.keySet());
+                flowFiles.removeAll(failures.keySet());
+                return flowFiles;
+            }
+
+            @Override
+            public Collection<FlowFile> getFailedFlowFiles() {
+                return failures.keySet();
+            }
+
+            @Override
+            public int getSuccessfulMessageCount(final FlowFile flowFile) {
+                return getAcknowledgedCount(flowFile);
+            }
+
+            @Override
+            public Exception getReasonForFailure(final FlowFile flowFile) {
+                return getFailure(flowFile);
+            }
+        };
+    }
+
+    public static class Counts {
+        private final AtomicInteger sentCount = new AtomicInteger(0);
+        private final AtomicInteger acknowledgedCount = new AtomicInteger(0);
+
+        public void incrementSentCount() {
+            sentCount.incrementAndGet();
+        }
+
+        public void incrementAcknowledgedCount() {
+            acknowledgedCount.incrementAndGet();
+        }
+
+        public int getAcknowledgedCount() {
+            return acknowledgedCount.get();
+        }
+
+        public int getSentCount() {
+            return sentCount.get();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 707a431..3d09f2d 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -27,8 +27,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
-import org.apache.kafka.clients.CommonClientConfigs;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.SaslConfigs;
@@ -186,7 +187,7 @@ final class KafkaProcessorUtils {
 
         final Class<?> classType;
 
-        public KafkaConfigValidator(final Class classType) {
+        public KafkaConfigValidator(final Class<?> classType) {
             this.classType = classType;
         }
 
@@ -211,7 +212,8 @@ final class KafkaProcessorUtils {
         return builder.toString();
     }
 
-    static void buildCommonKafkaProperties(final ProcessContext context, final Class kafkaConfigClass, final Map<String, String> mapToPopulate) {
+
+    static void buildCommonKafkaProperties(final ProcessContext context, final Class<?> kafkaConfigClass, final Map<String, Object> mapToPopulate) {
         for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) {
             if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) {
                 // Translate SSLContext Service configuration into Kafka properties
@@ -230,28 +232,33 @@ final class KafkaProcessorUtils {
                     mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType());
                 }
             }
-            String pName = propertyDescriptor.getName();
-            String pValue = propertyDescriptor.isExpressionLanguageSupported()
+
+            String propertyName = propertyDescriptor.getName();
+            String propertyValue = propertyDescriptor.isExpressionLanguageSupported()
                     ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
                     : context.getProperty(propertyDescriptor).getValue();
-            if (pValue != null) {
-                if (pName.endsWith(".ms")) { // kafka standard time notation
-                    pValue = String.valueOf(FormatUtils.getTimeDuration(pValue.trim(), TimeUnit.MILLISECONDS));
+
+            if (propertyValue != null) {
+                // If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
+                // or the standard NiFi time period such as "5 secs"
+                if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
+                    propertyValue = String.valueOf(FormatUtils.getTimeDuration(propertyValue.trim(), TimeUnit.MILLISECONDS));
                 }
-                if (isStaticStringFieldNamePresent(pName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
-                    mapToPopulate.put(pName, pValue);
+
+                if (isStaticStringFieldNamePresent(propertyName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
+                    mapToPopulate.put(propertyName, propertyValue);
                 }
             }
         }
     }
 
-    private static boolean isStaticStringFieldNamePresent(final String name, final Class... classes) {
+    private static boolean isStaticStringFieldNamePresent(final String name, final Class<?>... classes) {
         return KafkaProcessorUtils.getPublicStaticStringFieldValues(classes).contains(name);
     }
 
-    private static Set<String> getPublicStaticStringFieldValues(final Class... classes) {
+    private static Set<String> getPublicStaticStringFieldValues(final Class<?>... classes) {
         final Set<String> strings = new HashSet<>();
-        for (final Class classType : classes) {
+        for (final Class<?> classType : classes) {
             for (final Field field : classType.getDeclaredFields()) {
                 if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers()) && field.getType().equals(String.class)) {
                     try {


Mime
View raw message