nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mcgil...@apache.org
Subject [1/2] nifi git commit: NIFI-4664, NIFI-4662, NIFI-4660, NIFI-4659 moved tests which are timing/threading/network dependent and brittle to integration tests and un-ignored tests that are IT. Updated travis to reduce impact on infra and appveyor now skips
Date Wed, 06 Dec 2017 15:53:22 GMT
Repository: nifi
Updated Branches:
  refs/heads/master c730f802b -> cdc1facf3


http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java
new file mode 100644
index 0000000..69956e7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ITConsumeKafka {
+
+    ConsumerLease mockLease = null;
+    ConsumerPool mockConsumerPool = null;
+
+    @Before
+    public void setup() {
+        mockLease = mock(ConsumerLease.class);
+        mockConsumerPool = mock(ConsumerPool.class);
+    }
+
+    @Test
+    public void validateGetAllMessages() throws Exception {
+        String groupName = "validateGetAllMessages";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+        ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void validateGetAllMessagesPattern() throws Exception {
+        String groupName = "validateGetAllMessagesPattern";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+        ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_1_0.TOPICS, "(fo.*)|(ba)");
+        runner.setProperty(ConsumeKafka_1_0.TOPIC_TYPE, "pattern");
+        runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void validateGetErrorMessages() throws Exception {
+        String groupName = "validateGetErrorMessages";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(true, false);
+        when(mockLease.commit()).thenReturn(Boolean.FALSE);
+
+        ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+        verify(mockLease, times(2)).continuePolling();
+        verify(mockLease, times(1)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java
new file mode 100644
index 0000000..7d54180
--- /dev/null
+++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.lumberjack.handler;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.net.ssl.SSLContext;
+import javax.xml.bind.DatatypeConverter;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.processors.lumberjack.event.LumberjackMetadata;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+
+
+public class ITLumberjackSocketChannelHandler {
+    private EventFactory<TestEvent> eventFactory;
+    private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> channelHandlerFactory;
+    private BlockingQueue<ByteBuffer> byteBuffers;
+    private BlockingQueue<TestEvent> events;
+    private ComponentLog logger = Mockito.mock(ComponentLog.class);
+    private int maxConnections;
+    private SSLContext sslContext;
+    private Charset charset;
+    private ChannelDispatcher dispatcher;
+
+    @Before
+    public void setup() {
+        eventFactory = new TestEventHolderFactory();
+        channelHandlerFactory = new LumberjackSocketChannelHandlerFactory<>();
+
+        byteBuffers = new LinkedBlockingQueue<>();
+        byteBuffers.add(ByteBuffer.allocate(4096));
+
+        events = new LinkedBlockingQueue<>();
+        logger = Mockito.mock(ComponentLog.class);
+
+        maxConnections = 1;
+        sslContext = null;
+        charset = StandardCharsets.UTF_8;
+
+        dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger,
+                maxConnections, sslContext, charset);
+
+    }
+
+    @Test
+    public void testBasicHandling() throws IOException, InterruptedException {
+        final String multiFrameData = "3143000000d7785ec48fcf6ac4201087b3bbe9defb06be40ab669b1602bdf5d49728" +
+                "031957a97f82232979fbaaa7c0924b2e018701f537f37df2ab699a53aea75cad321673ffe43a38e4e04c043f02" +
+                "1f71461b26873e711bee9480f48b0af10fe2889113b8c9e28f4322b82395413a50cafd79957c253d0b992faf41" +
+                "29c2f27c12e5af35be2cedbec133d9b34e0ee27db87db05596fd62f4680796b421964fc9b032ac4dcb54d2575" +
+                "a28a3559df3413ae7be12edf6e9367c2e07f95ca4a848bb856e1b42ed61427d45da2df4f628f40f0000ffff01000" +
+                "0ffff35e0eff0";
+        final List<String> messages = new ArrayList<>();
+        messages.add(multiFrameData);
+
+        run(messages);
+
+        // Check for the 4 frames (from the hex string above) are back...
+        Assert.assertEquals(4, events.size());
+
+        boolean found1 = false;
+        boolean found2 = false;
+        boolean found3 = false;
+        boolean found4 = false;
+
+        TestEvent event;
+        while((event = events.poll()) != null) {
+            Map<String,String> metadata = event.metadata;
+            Assert.assertTrue(metadata.containsKey(LumberjackMetadata.SEQNUMBER_KEY));
+
+            final String seqNum = metadata.get(LumberjackMetadata.SEQNUMBER_KEY);
+            if (seqNum.equals("1")) {
+                found1 = true;
+            } else if (seqNum.equals("2")) {
+                found2 = true;
+            } else if (seqNum.equals("3")) {
+                found3 = true;
+            } else if (seqNum.equals("4")) {
+                found4 = true;
+            }
+        }
+
+        Assert.assertTrue(found1);
+        Assert.assertTrue(found2);
+        Assert.assertTrue(found3);
+        Assert.assertTrue(found4);
+    }
+
+    protected void run(List<String> messages) throws IOException, InterruptedException {
+        final ByteBuffer buffer = ByteBuffer.allocate(1024);
+        try {
+            // starts the dispatcher listening on port 0 so it selects a random port
+            dispatcher.open(null, 0, 4096);
+
+            // starts a thread to run the dispatcher which will accept/read connections
+            Thread dispatcherThread = new Thread(dispatcher);
+            dispatcherThread.start();
+
+
+            // create a client connection to the port the dispatcher is listening on
+            final int realPort = dispatcher.getPort();
+            try (SocketChannel channel = SocketChannel.open()) {
+                channel.connect(new InetSocketAddress("localhost", realPort));
+                Thread.sleep(100);
+
+                // send the provided messages
+                for (int i=0; i < messages.size(); i++) {
+                    buffer.clear();
+                    buffer.put(DatatypeConverter.parseHexBinary(messages.get(i)));
+                    buffer.flip();
+
+                    while (buffer.hasRemaining()) {
+                        channel.write(buffer);
+                    }
+                    Thread.sleep(1);
+                }
+            }
+
+            // wait up to 10 seconds to verify the responses
+            long timeout = 10000;
+            long startTime = System.currentTimeMillis();
+            while (events.size() < messages.size() && (System.currentTimeMillis() - startTime < timeout)) {
+                Thread.sleep(100);
+            }
+
+            // should have gotten an event for each message sent
+            Assert.assertEquals(4, events.size());
+
+        } finally {
+            // stop the dispatcher thread and ensure we shut down handler threads
+            dispatcher.close();
+        }
+    }
+
+    // Test event to produce from the data
+    private static class TestEvent implements Event<SocketChannel> {
+
+        private byte[] data;
+        private Map<String, String> metadata;
+
+        public TestEvent(byte[] data, Map<String, String> metadata) {
+            this.data = data;
+            this.metadata = metadata;
+        }
+
+        @Override
+        public String getSender() {
+            return metadata.get(EventFactory.SENDER_KEY);
+        }
+
+        @Override
+        public byte[] getData() {
+            return data;
+        }
+
+        @Override
+        public ChannelResponder<SocketChannel> getResponder() {
+            return null;
+        }
+    }
+
+    // Factory to create test events and send responses for testing
+    private static class TestEventHolderFactory implements EventFactory<TestEvent> {
+
+        @Override
+        public TestEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) {
+            return new TestEvent(data, metadata);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java
deleted file mode 100644
index ee5a040..0000000
--- a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.lumberjack.handler;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.net.ssl.SSLContext;
-import javax.xml.bind.DatatypeConverter;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
-import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-import org.apache.nifi.processors.lumberjack.event.LumberjackMetadata;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-
-
-
-public class TestLumberjackSocketChannelHandler {
-    private EventFactory<TestEvent> eventFactory;
-    private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> channelHandlerFactory;
-    private BlockingQueue<ByteBuffer> byteBuffers;
-    private BlockingQueue<TestEvent> events;
-    private ComponentLog logger = Mockito.mock(ComponentLog.class);
-    private int maxConnections;
-    private SSLContext sslContext;
-    private Charset charset;
-    private ChannelDispatcher dispatcher;
-
-    @Before
-    public void setup() {
-        eventFactory = new TestEventHolderFactory();
-        channelHandlerFactory = new LumberjackSocketChannelHandlerFactory<>();
-
-        byteBuffers = new LinkedBlockingQueue<>();
-        byteBuffers.add(ByteBuffer.allocate(4096));
-
-        events = new LinkedBlockingQueue<>();
-        logger = Mockito.mock(ComponentLog.class);
-
-        maxConnections = 1;
-        sslContext = null;
-        charset = StandardCharsets.UTF_8;
-
-        dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger,
-                maxConnections, sslContext, charset);
-
-    }
-
-    @Test
-    public void testBasicHandling() throws IOException, InterruptedException {
-        final String multiFrameData = "3143000000d7785ec48fcf6ac4201087b3bbe9defb06be40ab669b1602bdf5d49728" +
-                "031957a97f82232979fbaaa7c0924b2e018701f537f37df2ab699a53aea75cad321673ffe43a38e4e04c043f02" +
-                "1f71461b26873e711bee9480f48b0af10fe2889113b8c9e28f4322b82395413a50cafd79957c253d0b992faf41" +
-                "29c2f27c12e5af35be2cedbec133d9b34e0ee27db87db05596fd62f4680796b421964fc9b032ac4dcb54d2575" +
-                "a28a3559df3413ae7be12edf6e9367c2e07f95ca4a848bb856e1b42ed61427d45da2df4f628f40f0000ffff01000" +
-                "0ffff35e0eff0";
-        final List<String> messages = new ArrayList<>();
-        messages.add(multiFrameData);
-
-        run(messages);
-
-        // Check for the 4 frames (from the hex string above) are back...
-        Assert.assertEquals(4, events.size());
-
-        boolean found1 = false;
-        boolean found2 = false;
-        boolean found3 = false;
-        boolean found4 = false;
-
-        TestEvent event;
-        while((event = events.poll()) != null) {
-            Map<String,String> metadata = event.metadata;
-            Assert.assertTrue(metadata.containsKey(LumberjackMetadata.SEQNUMBER_KEY));
-
-            final String seqNum = metadata.get(LumberjackMetadata.SEQNUMBER_KEY);
-            if (seqNum.equals("1")) {
-                found1 = true;
-            } else if (seqNum.equals("2")) {
-                found2 = true;
-            } else if (seqNum.equals("3")) {
-                found3 = true;
-            } else if (seqNum.equals("4")) {
-                found4 = true;
-            }
-        }
-
-        Assert.assertTrue(found1);
-        Assert.assertTrue(found2);
-        Assert.assertTrue(found3);
-        Assert.assertTrue(found4);
-    }
-
-    protected void run(List<String> messages) throws IOException, InterruptedException {
-        final ByteBuffer buffer = ByteBuffer.allocate(1024);
-        try {
-            // starts the dispatcher listening on port 0 so it selects a random port
-            dispatcher.open(null, 0, 4096);
-
-            // starts a thread to run the dispatcher which will accept/read connections
-            Thread dispatcherThread = new Thread(dispatcher);
-            dispatcherThread.start();
-
-
-            // create a client connection to the port the dispatcher is listening on
-            final int realPort = dispatcher.getPort();
-            try (SocketChannel channel = SocketChannel.open()) {
-                channel.connect(new InetSocketAddress("localhost", realPort));
-                Thread.sleep(100);
-
-                // send the provided messages
-                for (int i=0; i < messages.size(); i++) {
-                    buffer.clear();
-                    buffer.put(DatatypeConverter.parseHexBinary(messages.get(i)));
-                    buffer.flip();
-
-                    while (buffer.hasRemaining()) {
-                        channel.write(buffer);
-                    }
-                    Thread.sleep(1);
-                }
-            }
-
-            // wait up to 10 seconds to verify the responses
-            long timeout = 10000;
-            long startTime = System.currentTimeMillis();
-            while (events.size() < messages.size() && (System.currentTimeMillis() - startTime < timeout)) {
-                Thread.sleep(100);
-            }
-
-            // should have gotten an event for each message sent
-            Assert.assertEquals(4, events.size());
-
-        } finally {
-            // stop the dispatcher thread and ensure we shut down handler threads
-            dispatcher.close();
-        }
-    }
-
-    // Test event to produce from the data
-    private static class TestEvent implements Event<SocketChannel> {
-
-        private byte[] data;
-        private Map<String, String> metadata;
-
-        public TestEvent(byte[] data, Map<String, String> metadata) {
-            this.data = data;
-            this.metadata = metadata;
-        }
-
-        @Override
-        public String getSender() {
-            return metadata.get(EventFactory.SENDER_KEY);
-        }
-
-        @Override
-        public byte[] getData() {
-            return data;
-        }
-
-        @Override
-        public ChannelResponder<SocketChannel> getResponder() {
-            return null;
-        }
-    }
-
-    // Factory to create test events and send responses for testing
-    private static class TestEventHolderFactory implements EventFactory<TestEvent> {
-
-        @Override
-        public TestEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) {
-            return new TestEvent(data, metadata);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ITListenSyslogGroovy.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ITListenSyslogGroovy.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ITListenSyslogGroovy.groovy
new file mode 100644
index 0000000..1f44225
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ITListenSyslogGroovy.groovy
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard
+
+import org.apache.nifi.processor.ProcessContext
+import org.apache.nifi.processor.ProcessSessionFactory
+import org.apache.nifi.processors.standard.syslog.SyslogParser
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.bouncycastle.util.encoders.Hex
+import org.junit.After
+import org.junit.Assert
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+@RunWith(JUnit4.class)
+class ListenSyslogGroovyTest extends GroovyTestCase {
+    private static final Logger logger = LoggerFactory.getLogger(ListenSyslogGroovyTest.class)
+
+    static final String ZERO_LENGTH_MESSAGE = "     \n"
+
+    @BeforeClass
+    static void setUpOnce() throws Exception {
+        logger.metaClass.methodMissing = { String name, args ->
+            logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+        }
+    }
+
+    @Before
+    void setUp() throws Exception {
+    }
+
+    @After
+    void tearDown() throws Exception {
+    }
+
+    @Test
+    void testShouldHandleZeroLengthUDP() throws Exception {
+        // Arrange
+        final ListenSyslog proc = new ListenSyslog()
+        final TestRunner runner = TestRunners.newTestRunner(proc)
+        runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue())
+        runner.setProperty(ListenSyslog.PORT, "0")
+
+        // schedule to start listening on a random port
+        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory()
+        final ProcessContext context = runner.getProcessContext()
+        proc.onScheduled(context)
+
+        // Inject a SyslogParser which will always return null
+        def nullEventParser = [parseEvent: { byte[] bytes, String sender ->
+            logger.mock("Regardless of input bytes: [${Hex.toHexString(bytes)}] and sender: [${sender}], this parser will return null")
+            return null
+        }] as SyslogParser
+        proc.parser = nullEventParser
+
+        final int numMessages = 10
+        final int port = proc.getPort()
+        Assert.assertTrue(port > 0)
+
+        // write some TCP messages to the port in the background
+        final Thread sender = new Thread(new ITListenSyslog.SingleConnectionSocketSender(port, numMessages, 100, ZERO_LENGTH_MESSAGE))
+        sender.setDaemon(true)
+        sender.start()
+
+        // Act
+
+        // call onTrigger until we read all messages, or 30 seconds passed
+        try {
+            int numFailed = 0
+            long timeout = System.currentTimeMillis() + 30000
+
+            while (numFailed < numMessages && System.currentTimeMillis() < timeout) {
+                Thread.sleep(50)
+                proc.onTrigger(context, processSessionFactory)
+                numFailed = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size()
+            }
+
+            int numSuccess = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size()
+            logger.info("Transferred " + numSuccess + " to SUCCESS and " + numFailed + " to INVALID")
+
+            // Assert
+
+            // all messages should be transferred to invalid
+            Assert.assertEquals("Did not process all the messages", numMessages, numFailed)
+
+        } finally {
+            // unschedule to close connections
+            proc.onUnscheduled()
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ListenSyslogGroovyTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ListenSyslogGroovyTest.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ListenSyslogGroovyTest.groovy
deleted file mode 100644
index 1c6b4f8..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ListenSyslogGroovyTest.groovy
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard
-
-import org.apache.nifi.processor.ProcessContext
-import org.apache.nifi.processor.ProcessSessionFactory
-import org.apache.nifi.processors.standard.syslog.SyslogParser
-import org.apache.nifi.util.TestRunner
-import org.apache.nifi.util.TestRunners
-import org.bouncycastle.util.encoders.Hex
-import org.junit.After
-import org.junit.Assert
-import org.junit.Before
-import org.junit.BeforeClass
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.JUnit4
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
-
-@RunWith(JUnit4.class)
-class ListenSyslogGroovyTest extends GroovyTestCase {
-    private static final Logger logger = LoggerFactory.getLogger(ListenSyslogGroovyTest.class)
-
-    static final String ZERO_LENGTH_MESSAGE = "     \n"
-
-    @BeforeClass
-    static void setUpOnce() throws Exception {
-        logger.metaClass.methodMissing = { String name, args ->
-            logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
-        }
-    }
-
-    @Before
-    void setUp() throws Exception {
-    }
-
-    @After
-    void tearDown() throws Exception {
-    }
-
-    @Test
-    void testShouldHandleZeroLengthUDP() throws Exception {
-        // Arrange
-        final ListenSyslog proc = new ListenSyslog()
-        final TestRunner runner = TestRunners.newTestRunner(proc)
-        runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue())
-        runner.setProperty(ListenSyslog.PORT, "0")
-
-        // schedule to start listening on a random port
-        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory()
-        final ProcessContext context = runner.getProcessContext()
-        proc.onScheduled(context)
-
-        // Inject a SyslogParser which will always return null
-        def nullEventParser = [parseEvent: { byte[] bytes, String sender ->
-            logger.mock("Regardless of input bytes: [${Hex.toHexString(bytes)}] and sender: [${sender}], this parser will return null")
-            return null
-        }] as SyslogParser
-        proc.parser = nullEventParser
-
-        final int numMessages = 10
-        final int port = proc.getPort()
-        Assert.assertTrue(port > 0)
-
-        // write some TCP messages to the port in the background
-        final Thread sender = new Thread(new TestListenSyslog.SingleConnectionSocketSender(port, numMessages, 100, ZERO_LENGTH_MESSAGE))
-        sender.setDaemon(true)
-        sender.start()
-
-        // Act
-
-        // call onTrigger until we read all messages, or 30 seconds passed
-        try {
-            int numFailed = 0
-            long timeout = System.currentTimeMillis() + 30000
-
-            while (numFailed < numMessages && System.currentTimeMillis() < timeout) {
-                Thread.sleep(50)
-                proc.onTrigger(context, processSessionFactory)
-                numFailed = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size()
-            }
-
-            int numSuccess = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size()
-            logger.info("Transferred " + numSuccess + " to SUCCESS and " + numFailed + " to INVALID")
-
-            // Assert
-
-            // all messages should be transferred to invalid
-            Assert.assertEquals("Did not process all the messages", numMessages, numFailed)
-
-        } finally {
-            // unschedule to close connections
-            proc.onUnscheduled()
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenSyslog.java
new file mode 100644
index 0000000..a94bb20
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenSyslog.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processors.standard.syslog.SyslogAttributes;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.List;
+import org.apache.nifi.processors.standard.TestListenSyslog.DatagramSender;
+
+public class ITListenSyslog {
+
+    static final Logger LOGGER = LoggerFactory.getLogger(ITListenSyslog.class);
+
+    static final String PRI = "34";
+    static final String SEV = "2";
+    static final String FAC = "4";
+    static final String TIME = "Oct 13 15:43:23";
+    static final String HOST = "localhost.home";
+    static final String BODY = "some message";
+
+    static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY;
+    static final String VALID_MESSAGE_TCP = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n";
+    static final String INVALID_MESSAGE = "this is not valid\n";
+
+    @Test
+    public void testUDP() throws IOException, InterruptedException {
+        final ListenSyslog proc = new ListenSyslog();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue());
+        runner.setProperty(ListenSyslog.PORT, "0");
+
+        // schedule to start listening on a random port
+        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+        final ProcessContext context = runner.getProcessContext();
+        proc.onScheduled(context);
+
+        final int numMessages = 20;
+        final int port = proc.getPort();
+        Assert.assertTrue(port > 0);
+
+        // write some UDP messages to the port in the background
+        final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE));
+        sender.setDaemon(true);
+        sender.start();
+
+        // call onTrigger until we read all datagrams, or 30 seconds passed
+        try {
+            int numTransferred = 0;
+            long timeout = System.currentTimeMillis() + 30000;
+
+            while (numTransferred < numMessages && System.currentTimeMillis() < timeout) {
+                Thread.sleep(10);
+                proc.onTrigger(context, processSessionFactory);
+                numTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
+            }
+            Assert.assertEquals("Did not process all the datagrams", numMessages, numTransferred);
+
+            MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
+            checkFlowFile(flowFile, 0, ListenSyslog.UDP_VALUE.getValue());
+
+            final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
+            Assert.assertNotNull(events);
+            Assert.assertEquals(numMessages, events.size());
+
+            final ProvenanceEventRecord event = events.get(0);
+            Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
+            Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("udp"));
+
+        } finally {
+            // unschedule to close connections
+            proc.onUnscheduled();
+        }
+    }
+
+    @Test
+    public void testTCPSingleConnection() throws IOException, InterruptedException {
+        final ListenSyslog proc = new ListenSyslog();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
+        runner.setProperty(ListenSyslog.PORT, "0");
+
+        // schedule to start listening on a random port
+        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+        final ProcessContext context = runner.getProcessContext();
+        proc.onScheduled(context);
+
+        // Allow time for the processor to perform its scheduled start
+        Thread.sleep(500);
+
+        final int numMessages = 20;
+        final int port = proc.getPort();
+        Assert.assertTrue(port > 0);
+
+        // write some TCP messages to the port in the background
+        final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP));
+        sender.setDaemon(true);
+        sender.start();
+
+        // call onTrigger until we read all messages, or 30 seconds passed
+        try {
+            int nubTransferred = 0;
+            long timeout = System.currentTimeMillis() + 30000;
+
+            while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) {
+                Thread.sleep(10);
+                proc.onTrigger(context, processSessionFactory);
+                nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
+            }
+            Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred);
+
+            MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
+            checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
+
+            final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
+            Assert.assertNotNull(events);
+            Assert.assertEquals(numMessages, events.size());
+
+            final ProvenanceEventRecord event = events.get(0);
+            Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
+            Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp"));
+
+        } finally {
+            // unschedule to close connections
+            proc.onUnscheduled();
+        }
+    }
+
+    @Test
+    public void testTCPSingleConnectionWithNewLines() throws IOException, InterruptedException {
+        final ListenSyslog proc = new ListenSyslog();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
+        runner.setProperty(ListenSyslog.PORT, "0");
+
+        // schedule to start listening on a random port
+        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+        final ProcessContext context = runner.getProcessContext();
+        proc.onScheduled(context);
+
+        final int numMessages = 3;
+        final int port = proc.getPort();
+        Assert.assertTrue(port > 0);
+
+        // send 3 messages as 1
+        final String multipleMessages = VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n";
+        final Thread sender = new Thread(new SingleConnectionSocketSender(port, 1, 10, multipleMessages));
+        sender.setDaemon(true);
+        sender.start();
+
+        // call onTrigger until we read all messages, or 30 seconds passed
+        try {
+            int nubTransferred = 0;
+            long timeout = System.currentTimeMillis() + 30000;
+
+            while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) {
+                Thread.sleep(10);
+                proc.onTrigger(context, processSessionFactory);
+                nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
+            }
+            Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred);
+
+            MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
+            checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
+
+            final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
+            Assert.assertNotNull(events);
+            Assert.assertEquals(numMessages, events.size());
+
+            final ProvenanceEventRecord event = events.get(0);
+            Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
+            Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp"));
+
+        } finally {
+            // unschedule to close connections
+            proc.onUnscheduled();
+        }
+    }
+
+    @Test
+    public void testTCPMultipleConnection() throws IOException, InterruptedException {
+        final ListenSyslog proc = new ListenSyslog();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
+        runner.setProperty(ListenSyslog.MAX_CONNECTIONS, "5");
+        runner.setProperty(ListenSyslog.PORT, "0");
+
+        // schedule to start listening on a random port
+        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+        final ProcessContext context = runner.getProcessContext();
+        proc.onScheduled(context);
+
+        final int numMessages = 20;
+        final int port = proc.getPort();
+        Assert.assertTrue(port > 0);
+
+        // write some TCP messages to the port in the background
+        final Thread sender = new Thread(new MultiConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP));
+        sender.setDaemon(true);
+        sender.start();
+
+        // call onTrigger until we read all messages, or 30 seconds passed
+        try {
+            int nubTransferred = 0;
+            long timeout = System.currentTimeMillis() + 30000;
+
+            while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) {
+                Thread.sleep(10);
+                proc.onTrigger(context, processSessionFactory);
+                nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
+            }
+            Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred);
+
+            MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
+            checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
+
+            final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
+            Assert.assertNotNull(events);
+            Assert.assertEquals(numMessages, events.size());
+
+            final ProvenanceEventRecord event = events.get(0);
+            Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
+            Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp"));
+
+        } finally {
+            // unschedule to close connections
+            proc.onUnscheduled();
+        }
+    }
+
+    @Test
+    public void testInvalid() throws IOException, InterruptedException {
+        final ListenSyslog proc = new ListenSyslog();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
+        runner.setProperty(ListenSyslog.PORT, "0");
+
+        // schedule to start listening on a random port
+        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+        final ProcessContext context = runner.getProcessContext();
+        proc.onScheduled(context);
+
+        final int numMessages = 10;
+        final int port = proc.getPort();
+        Assert.assertTrue(port > 0);
+
+        // write some TCP messages to the port in the background
+        final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 100, INVALID_MESSAGE));
+        sender.setDaemon(true);
+        sender.start();
+
+        // call onTrigger until we read all messages, or 30 seconds passed
+        try {
+            int nubTransferred = 0;
+            long timeout = System.currentTimeMillis() + 30000;
+
+            while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) {
+                Thread.sleep(50);
+                proc.onTrigger(context, processSessionFactory);
+                nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size();
+            }
+
+            // all messages should be transferred to invalid
+            Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred);
+
+        } finally {
+            // unschedule to close connections
+            proc.onUnscheduled();
+        }
+    }
+
+    private void checkFlowFile(final MockFlowFile flowFile, final int port, final String protocol) {
+        flowFile.assertContentEquals(VALID_MESSAGE.replace("\n", ""));
+        Assert.assertEquals(PRI, flowFile.getAttribute(SyslogAttributes.PRIORITY.key()));
+        Assert.assertEquals(SEV, flowFile.getAttribute(SyslogAttributes.SEVERITY.key()));
+        Assert.assertEquals(FAC, flowFile.getAttribute(SyslogAttributes.FACILITY.key()));
+        Assert.assertEquals(TIME, flowFile.getAttribute(SyslogAttributes.TIMESTAMP.key()));
+        Assert.assertEquals(HOST, flowFile.getAttribute(SyslogAttributes.HOSTNAME.key()));
+        Assert.assertEquals(BODY, flowFile.getAttribute(SyslogAttributes.BODY.key()));
+        Assert.assertEquals("true", flowFile.getAttribute(SyslogAttributes.VALID.key()));
+        Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(SyslogAttributes.PORT.key()));
+        Assert.assertEquals(protocol, flowFile.getAttribute(SyslogAttributes.PROTOCOL.key()));
+        Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(SyslogAttributes.SENDER.key())));
+    }
+
+    /**
+     * Sends a given number of datagrams to the given port.
+     */
+    public static final class SingleConnectionSocketSender implements Runnable {
+
+        final int port;
+        final int numMessages;
+        final long delay;
+        final String message;
+
+        public SingleConnectionSocketSender(int port, int numMessages, long delay, String message) {
+            this.port = port;
+            this.numMessages = numMessages;
+            this.delay = delay;
+            this.message = message;
+        }
+
+        @Override
+        public void run() {
+            byte[] bytes = message.getBytes(Charset.forName("UTF-8"));
+            final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+
+            try (SocketChannel channel = SocketChannel.open()) {
+                channel.connect(new InetSocketAddress("localhost", port));
+
+                for (int i = 0; i < numMessages; i++) {
+                    buffer.clear();
+                    buffer.put(bytes);
+                    buffer.flip();
+
+                    while (buffer.hasRemaining()) {
+                        channel.write(buffer);
+                    }
+                    Thread.sleep(delay);
+                }
+            } catch (IOException e) {
+                LOGGER.error(e.getMessage(), e);
+            } catch (InterruptedException e) {
+                LOGGER.error(e.getMessage(), e);
+            }
+        }
+    }
+
+    /**
+     * Sends a given number of datagrams to the given port.
+     */
+    public static final class MultiConnectionSocketSender implements Runnable {
+
+        final int port;
+        final int numMessages;
+        final long delay;
+        final String message;
+
+        public MultiConnectionSocketSender(int port, int numMessages, long delay, String message) {
+            this.port = port;
+            this.numMessages = numMessages;
+            this.delay = delay;
+            this.message = message;
+        }
+
+        @Override
+        public void run() {
+            byte[] bytes = message.getBytes(Charset.forName("UTF-8"));
+            final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+
+            for (int i = 0; i < numMessages; i++) {
+                try (SocketChannel channel = SocketChannel.open()) {
+                    channel.connect(new InetSocketAddress("localhost", port));
+
+                    buffer.clear();
+                    buffer.put(bytes);
+                    buffer.flip();
+
+                    while (buffer.hasRemaining()) {
+                        channel.write(buffer);
+                    }
+                    Thread.sleep(delay);
+                } catch (IOException e) {
+                    LOGGER.error(e.getMessage(), e);
+                } catch (InterruptedException e) {
+                    LOGGER.error(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index f96ff22..2c199c1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -41,7 +41,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.DatagramChannel;
-import java.nio.channels.SocketChannel;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -62,216 +61,11 @@ public class TestListenSyslog {
     static final String HOST = "localhost.home";
     static final String BODY = "some message";
 
-    static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY ;
+    static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY;
     static final String VALID_MESSAGE_TCP = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n";
     static final String INVALID_MESSAGE = "this is not valid\n";
 
     @Test
-    public void testUDP() throws IOException, InterruptedException {
-        final ListenSyslog proc = new ListenSyslog();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.UDP_VALUE.getValue());
-        runner.setProperty(ListenSyslog.PORT, "0");
-
-        // schedule to start listening on a random port
-        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
-        final ProcessContext context = runner.getProcessContext();
-        proc.onScheduled(context);
-
-        final int numMessages = 20;
-        final int port = proc.getPort();
-        Assert.assertTrue(port > 0);
-
-        // write some UDP messages to the port in the background
-        final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE));
-        sender.setDaemon(true);
-        sender.start();
-
-        // call onTrigger until we read all datagrams, or 30 seconds passed
-        try {
-            int numTransferred = 0;
-            long timeout = System.currentTimeMillis() + 30000;
-
-            while (numTransferred < numMessages && System.currentTimeMillis() < timeout) {
-                Thread.sleep(10);
-                proc.onTrigger(context, processSessionFactory);
-                numTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
-            }
-            Assert.assertEquals("Did not process all the datagrams", numMessages, numTransferred);
-
-            MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            checkFlowFile(flowFile, 0, ListenSyslog.UDP_VALUE.getValue());
-
-            final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
-            Assert.assertNotNull(events);
-            Assert.assertEquals(numMessages, events.size());
-
-            final ProvenanceEventRecord event = events.get(0);
-            Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
-            Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("udp"));
-
-        } finally {
-            // unschedule to close connections
-            proc.onUnscheduled();
-        }
-    }
-
-    @Test
-    public void testTCPSingleConnection() throws IOException, InterruptedException {
-        final ListenSyslog proc = new ListenSyslog();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
-        runner.setProperty(ListenSyslog.PORT, "0");
-
-        // schedule to start listening on a random port
-        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
-        final ProcessContext context = runner.getProcessContext();
-        proc.onScheduled(context);
-
-        // Allow time for the processor to perform its scheduled start
-        Thread.sleep(500);
-
-        final int numMessages = 20;
-        final int port = proc.getPort();
-        Assert.assertTrue(port > 0);
-
-        // write some TCP messages to the port in the background
-        final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP));
-        sender.setDaemon(true);
-        sender.start();
-
-        // call onTrigger until we read all messages, or 30 seconds passed
-        try {
-            int nubTransferred = 0;
-            long timeout = System.currentTimeMillis() + 30000;
-
-            while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) {
-                Thread.sleep(10);
-                proc.onTrigger(context, processSessionFactory);
-                nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
-            }
-            Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred);
-
-            MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
-
-            final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
-            Assert.assertNotNull(events);
-            Assert.assertEquals(numMessages, events.size());
-
-            final ProvenanceEventRecord event = events.get(0);
-            Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
-            Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp"));
-
-        } finally {
-            // unschedule to close connections
-            proc.onUnscheduled();
-        }
-    }
-
-    @Test
-    public void testTCPSingleConnectionWithNewLines() throws IOException, InterruptedException {
-        final ListenSyslog proc = new ListenSyslog();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
-        runner.setProperty(ListenSyslog.PORT, "0");
-
-        // schedule to start listening on a random port
-        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
-        final ProcessContext context = runner.getProcessContext();
-        proc.onScheduled(context);
-
-        final int numMessages = 3;
-        final int port = proc.getPort();
-        Assert.assertTrue(port > 0);
-
-        // send 3 messages as 1
-        final String multipleMessages = VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n";
-        final Thread sender = new Thread(new SingleConnectionSocketSender(port, 1, 10, multipleMessages));
-        sender.setDaemon(true);
-        sender.start();
-
-        // call onTrigger until we read all messages, or 30 seconds passed
-        try {
-            int nubTransferred = 0;
-            long timeout = System.currentTimeMillis() + 30000;
-
-            while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) {
-                Thread.sleep(10);
-                proc.onTrigger(context, processSessionFactory);
-                nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
-            }
-            Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred);
-
-            MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
-
-            final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
-            Assert.assertNotNull(events);
-            Assert.assertEquals(numMessages, events.size());
-
-            final ProvenanceEventRecord event = events.get(0);
-            Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
-            Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp"));
-
-        } finally {
-            // unschedule to close connections
-            proc.onUnscheduled();
-        }
-    }
-
-    @Test
-    public void testTCPMultipleConnection() throws IOException, InterruptedException {
-        final ListenSyslog proc = new ListenSyslog();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
-        runner.setProperty(ListenSyslog.MAX_CONNECTIONS, "5");
-        runner.setProperty(ListenSyslog.PORT, "0");
-
-        // schedule to start listening on a random port
-        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
-        final ProcessContext context = runner.getProcessContext();
-        proc.onScheduled(context);
-
-        final int numMessages = 20;
-        final int port = proc.getPort();
-        Assert.assertTrue(port > 0);
-
-        // write some TCP messages to the port in the background
-        final Thread sender = new Thread(new MultiConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP));
-        sender.setDaemon(true);
-        sender.start();
-
-        // call onTrigger until we read all messages, or 30 seconds passed
-        try {
-            int nubTransferred = 0;
-            long timeout = System.currentTimeMillis() + 30000;
-
-            while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) {
-                Thread.sleep(10);
-                proc.onTrigger(context, processSessionFactory);
-                nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
-            }
-            Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred);
-
-            MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
-
-            final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
-            Assert.assertNotNull(events);
-            Assert.assertEquals(numMessages, events.size());
-
-            final ProvenanceEventRecord event = events.get(0);
-            Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
-            Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("tcp"));
-
-        } finally {
-            // unschedule to close connections
-            proc.onUnscheduled();
-        }
-    }
-
-    @Test
     public void testBatching() throws IOException, InterruptedException {
         final ListenSyslog proc = new ListenSyslog();
         final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -325,47 +119,6 @@ public class TestListenSyslog {
     }
 
     @Test
-    public void testInvalid() throws IOException, InterruptedException {
-        final ListenSyslog proc = new ListenSyslog();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue());
-        runner.setProperty(ListenSyslog.PORT, "0");
-
-        // schedule to start listening on a random port
-        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
-        final ProcessContext context = runner.getProcessContext();
-        proc.onScheduled(context);
-
-        final int numMessages = 10;
-        final int port = proc.getPort();
-        Assert.assertTrue(port > 0);
-
-        // write some TCP messages to the port in the background
-        final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 100, INVALID_MESSAGE));
-        sender.setDaemon(true);
-        sender.start();
-
-        // call onTrigger until we read all messages, or 30 seconds passed
-        try {
-            int nubTransferred = 0;
-            long timeout = System.currentTimeMillis() + 30000;
-
-            while (nubTransferred < numMessages && System.currentTimeMillis() < timeout) {
-                Thread.sleep(50);
-                proc.onTrigger(context, processSessionFactory);
-                nubTransferred = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size();
-            }
-
-            // all messages should be transferred to invalid
-            Assert.assertEquals("Did not process all the messages", numMessages, nubTransferred);
-
-        } finally {
-            // unschedule to close connections
-            proc.onUnscheduled();
-        }
-    }
-
-    @Test
     public void testParsingError() throws IOException {
         final FailParseProcessor proc = new FailParseProcessor();
         final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -431,21 +184,6 @@ public class TestListenSyslog {
         runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0).assertContentEquals(VALID_MESSAGE);
     }
 
-
-    private void checkFlowFile(final MockFlowFile flowFile, final int port, final String protocol) {
-        flowFile.assertContentEquals(VALID_MESSAGE.replace("\n", ""));
-        Assert.assertEquals(PRI, flowFile.getAttribute(SyslogAttributes.PRIORITY.key()));
-        Assert.assertEquals(SEV, flowFile.getAttribute(SyslogAttributes.SEVERITY.key()));
-        Assert.assertEquals(FAC, flowFile.getAttribute(SyslogAttributes.FACILITY.key()));
-        Assert.assertEquals(TIME, flowFile.getAttribute(SyslogAttributes.TIMESTAMP.key()));
-        Assert.assertEquals(HOST, flowFile.getAttribute(SyslogAttributes.HOSTNAME.key()));
-        Assert.assertEquals(BODY, flowFile.getAttribute(SyslogAttributes.BODY.key()));
-        Assert.assertEquals("true", flowFile.getAttribute(SyslogAttributes.VALID.key()));
-        Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(SyslogAttributes.PORT.key()));
-        Assert.assertEquals(protocol, flowFile.getAttribute(SyslogAttributes.PROTOCOL.key()));
-        Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(SyslogAttributes.SENDER.key())));
-    }
-
     /**
      * Sends a given number of datagrams to the given port.
      */
@@ -470,51 +208,7 @@ public class TestListenSyslog {
 
             try (DatagramChannel channel = DatagramChannel.open()) {
                 channel.connect(new InetSocketAddress("localhost", port));
-                for (int i=0; i < numMessages; i++) {
-                    buffer.clear();
-                    buffer.put(bytes);
-                    buffer.flip();
-
-                    while(buffer.hasRemaining()) {
-                        channel.write(buffer);
-                    }
-
-                    Thread.sleep(delay);
-                }
-            } catch (IOException e) {
-                LOGGER.error(e.getMessage(), e);
-            } catch (InterruptedException e) {
-                LOGGER.error(e.getMessage(), e);
-            }
-        }
-    }
-
-    /**
-     * Sends a given number of datagrams to the given port.
-     */
-    public static final class SingleConnectionSocketSender implements Runnable {
-
-        final int port;
-        final int numMessages;
-        final long delay;
-        final String message;
-
-        public SingleConnectionSocketSender(int port, int numMessages, long delay, String message) {
-            this.port = port;
-            this.numMessages = numMessages;
-            this.delay = delay;
-            this.message = message;
-        }
-
-        @Override
-        public void run() {
-            byte[] bytes = message.getBytes(Charset.forName("UTF-8"));
-            final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
-
-            try (SocketChannel channel = SocketChannel.open()) {
-                channel.connect(new InetSocketAddress("localhost", port));
-
-                for (int i=0; i < numMessages; i++) {
+                for (int i = 0; i < numMessages; i++) {
                     buffer.clear();
                     buffer.put(bytes);
                     buffer.flip();
@@ -522,6 +216,7 @@ public class TestListenSyslog {
                     while (buffer.hasRemaining()) {
                         channel.write(buffer);
                     }
+
                     Thread.sleep(delay);
                 }
             } catch (IOException e) {
@@ -532,51 +227,9 @@ public class TestListenSyslog {
         }
     }
 
-    /**
-     * Sends a given number of datagrams to the given port.
-     */
-    public static final class MultiConnectionSocketSender implements Runnable {
-
-        final int port;
-        final int numMessages;
-        final long delay;
-        final String message;
-
-        public MultiConnectionSocketSender(int port, int numMessages, long delay, String message) {
-            this.port = port;
-            this.numMessages = numMessages;
-            this.delay = delay;
-            this.message = message;
-        }
-
-        @Override
-        public void run() {
-            byte[] bytes = message.getBytes(Charset.forName("UTF-8"));
-            final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
-
-            for (int i=0; i < numMessages; i++) {
-                try (SocketChannel channel = SocketChannel.open()) {
-                    channel.connect(new InetSocketAddress("localhost", port));
-
-                    buffer.clear();
-                    buffer.put(bytes);
-                    buffer.flip();
-
-                    while (buffer.hasRemaining()) {
-                        channel.write(buffer);
-                    }
-                    Thread.sleep(delay);
-                } catch (IOException e) {
-                    LOGGER.error(e.getMessage(), e);
-                } catch (InterruptedException e) {
-                    LOGGER.error(e.getMessage(), e);
-                }
-            }
-        }
-    }
-
     // A mock version of ListenSyslog that will queue the provided events
     private static class FailParseProcessor extends ListenSyslog {
+
         @Override
         protected SyslogParser getParser() {
             return new SyslogParser(StandardCharsets.UTF_8) {
@@ -589,6 +242,7 @@ public class TestListenSyslog {
     }
 
     private static class CannedMessageProcessor extends ListenSyslog {
+
         private final Iterator<RawSyslogEvent> eventItr;
 
         public CannedMessageProcessor(final List<RawSyslogEvent> events) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a5f408c..146a3a2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1985,7 +1985,9 @@
             <!-- Performs execution of Integration Tests using the Maven 
                 FailSafe Plugin. The view of integration tests in this context are those 
                 tests interfacing with external sources and services requiring additional 
-                resources or credentials that cannot be explicitly provided. -->
+                resources or credentials that cannot be explicitly provided. Also appropriate
+                for tests which depend on inter-thread and/or network or having timing
+                considerations which could make the tests brittle on various environments.-->
             <id>integration-tests</id>
             <build>
                 <plugins>


Mime
View raw message