nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbe...@apache.org
Subject [1/3] nifi git commit: NIFI-1420 Adding Splunk bundle containing PutSplunk, and GetSplunk, and adding a ListenTCP processor to standard processors. Refactored internal code from PutSyslog to create a generic AbstractPutEventProcessor which PutSplunk exte
Date Mon, 07 Mar 2016 23:21:55 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 4ce7b679e -> 6f5fb5947


http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
new file mode 100644
index 0000000..c79d593
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
+import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.event.StandardEvent;
+import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
+import org.apache.nifi.processor.util.listen.handler.socket.SocketChannelHandlerFactory;
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.ssl.SSLContextService;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"listen", "tcp", "tls", "ssl"})
+@CapabilityDescription("Listens for incoming TCP connections and reads data from each connection using a line separator " +
+        "as the message demarcator. The default behavior is for each message to produce a single FlowFile, however this can " +
+        "be controlled by increasing the Batch Size to a larger value for higher throughput. The Receive Buffer Size must be " +
+        "set as large as the largest messages expected to be received, meaning if every 100kb there is a line separator, then " +
+        "the Receive Buffer Size must be greater than 100kb.")
+@WritesAttributes({
+        @WritesAttribute(attribute="tcp.sender", description="The sending host of the messages."),
+        @WritesAttribute(attribute="tcp.port", description="The sending port the messages were received.")
+})
+public class ListenTCP extends AbstractListenEventProcessor<ListenTCP.TCPEvent> {
+
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
+                    "messages will be received over a secure connection.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
+            .name("Client Auth")
+            .description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
+            .required(false)
+            .allowableValues(SSLContextService.ClientAuth.values())
+            .defaultValue(SSLContextService.ClientAuth.REQUIRED.name())
+            .build();
+
+    // it is only the array reference that is volatile - not the contents.
+    private volatile byte[] messageDemarcatorBytes;
+
+    @Override
+    protected List<PropertyDescriptor> getAdditionalProperties() {
+        return Arrays.asList(
+                MAX_CONNECTIONS,
+                MAX_BATCH_SIZE,
+                MESSAGE_DELIMITER,
+                SSL_CONTEXT_SERVICE,
+                CLIENT_AUTH
+        );
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
+        final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+        if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
+            results.add(new ValidationResult.Builder()
+                    .explanation("Client Auth must be provided when using TLS/SSL")
+                    .valid(false).subject("Client Auth").build());
+        }
+
+        return results;
+    }
+
+    @Override
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws IOException {
+        super.onScheduled(context);
+        final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
+        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+    }
+
+    @Override
+    protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<TCPEvent> events)
+            throws IOException {
+
+        final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
+        final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
+
+        // initialize the buffer pool based on max number of connections and the buffer size
+        final LinkedBlockingQueue<ByteBuffer> bufferPool = new LinkedBlockingQueue<>(maxConnections);
+        for (int i = 0; i < maxConnections; i++) {
+            bufferPool.offer(ByteBuffer.allocate(bufferSize));
+        }
+
+        final EventFactory<TCPEvent> eventFactory = new TCPEventFactory();
+
+        // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
+        SSLContext sslContext = null;
+        SslContextFactory.ClientAuth clientAuth = null;
+
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sslContextService != null) {
+            final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
+            sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuthValue));
+            clientAuth = SslContextFactory.ClientAuth.valueOf(clientAuthValue);
+        }
+
+        final ChannelHandlerFactory<TCPEvent<SocketChannel>, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
+        return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, clientAuth, charSet);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final Map<String,FlowFileEventBatch> batches = getBatches(session, maxBatchSize, messageDemarcatorBytes);
+
+        // if the size is 0 then there was nothing to process so return
+        // we don't need to yield here because we have a long poll in side of getBatches
+        if (batches.size() == 0) {
+            return;
+        }
+
+        for (Map.Entry<String,FlowFileEventBatch> entry : batches.entrySet()) {
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<TCPEvent> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()});
+                continue;
+            }
+
+            // the sender and command will be the same for all events based on the batch key
+            final String sender = events.get(0).getSender();
+
+            final Map<String,String> attributes = new HashMap<>(3);
+            attributes.put("tcp.sender", sender);
+            attributes.put("tcp.port", String.valueOf(port));
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+            getLogger().debug("Transferring {} to success", new Object[] {flowFile});
+            session.transfer(flowFile, REL_SUCCESS);
+            session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
+
+            // create a provenance receive event
+            final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
+            final String transitUri = new StringBuilder().append("tcp").append("://").append(senderHost).append(":")
+                    .append(port).toString();
+            session.getProvenanceReporter().receive(flowFile, transitUri);
+        }
+    }
+
+    /**
+     * Event implementation for TCP.
+     */
+    static class TCPEvent<C extends SelectableChannel> extends StandardEvent<C> {
+
+        public TCPEvent(String sender, byte[] data, ChannelResponder<C> responder) {
+            super(sender, data, responder);
+        }
+    }
+
+    /**
+     * Factory implementation for TCPEvents.
+     */
+    static final class TCPEventFactory implements EventFactory<TCPEvent> {
+
+        @Override
+        public TCPEvent create(byte[] data, Map<String, String> metadata, ChannelResponder responder) {
+            String sender = null;
+            if (metadata != null && metadata.containsKey(EventFactory.SENDER_KEY)) {
+                sender = metadata.get(EventFactory.SENDER_KEY);
+            }
+            return new TCPEvent(sender, data, responder);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
index e555a0c..373e402 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -35,19 +34,17 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.put.sender.ChannelSender;
+import org.apache.nifi.processor.util.put.sender.DatagramChannelSender;
+import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender;
+import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
 import org.apache.nifi.processors.standard.syslog.SyslogParser;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.ObjectHolder;
 import org.apache.nifi.util.StopWatch;
 
 import javax.net.ssl.SSLContext;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.SocketChannel;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -81,12 +78,13 @@ public class PutSyslog extends AbstractSyslogProcessor {
             .defaultValue("localhost")
             .required(true)
             .build();
-    public static final PropertyDescriptor SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
-            .name("Send Buffer Size")
-            .description("The size of each buffer used to send a Syslog message. Adjust this value appropriately based on the expected size of the " +
-                    "Syslog messages being produced. Messages larger than this buffer size will still be sent, but will not make use of the buffer pool.")
+    public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
+            .name("Max Size of Socket Send Buffer")
+            .description("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System " +
+                    "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " +
+                    "the data can be read, and incoming data will be dropped.")
             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .defaultValue("2048 B")
+            .defaultValue("1 MB")
             .required(true)
             .build();
     public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor
@@ -164,8 +162,6 @@ public class PutSyslog extends AbstractSyslogProcessor {
 
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> descriptors;
-
-    private volatile BlockingQueue<ByteBuffer> bufferPool;
     private volatile BlockingQueue<ChannelSender> senderPool;
 
     @Override
@@ -174,9 +170,10 @@ public class PutSyslog extends AbstractSyslogProcessor {
         descriptors.add(HOSTNAME);
         descriptors.add(PROTOCOL);
         descriptors.add(PORT);
+        descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE);
         descriptors.add(SSL_CONTEXT_SERVICE);
         descriptors.add(IDLE_EXPIRATION);
-        descriptors.add(SEND_BUFFER_SIZE);
+        descriptors.add(TIMEOUT);
         descriptors.add(BATCH_SIZE);
         descriptors.add(CHARSET);
         descriptors.add(MSG_PRIORITY);
@@ -221,40 +218,40 @@ public class PutSyslog extends AbstractSyslogProcessor {
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) throws IOException {
-        final int bufferSize = context.getProperty(SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-        this.bufferPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
-        for (int i=0; i < context.getMaxConcurrentTasks(); i++) {
-            this.bufferPool.offer(ByteBuffer.allocate(bufferSize));
-        }
-
         // initialize the queue of senders, one per task, senders will get created on the fly in onTrigger
         this.senderPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
     }
 
-    protected ChannelSender createSender(final ProcessContext context, final BlockingQueue<ByteBuffer> bufferPool) throws IOException {
+    protected ChannelSender createSender(final ProcessContext context) throws IOException {
         final int port = context.getProperty(PORT).asInteger();
         final String host = context.getProperty(HOSTNAME).getValue();
         final String protocol = context.getProperty(PROTOCOL).getValue();
-        final String charSet = context.getProperty(CHARSET).getValue();
+        final int maxSendBuffer = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final int timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
         final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-        return createSender(sslContextService, protocol, host, port, Charset.forName(charSet), bufferPool);
+        return createSender(sslContextService, protocol, host, port, maxSendBuffer, timeout);
     }
 
     // visible for testing to override and provide a mock sender if desired
-    protected ChannelSender createSender(final SSLContextService sslContextService, final String protocol, final String host, final int port,
-                                         final Charset charset, final BlockingQueue<ByteBuffer> bufferPool)
+    protected ChannelSender createSender(final SSLContextService sslContextService, final String protocol, final String host,
+                                         final int port, final int maxSendBufferSize, final int timeout)
             throws IOException {
+
+        ChannelSender sender;
         if (protocol.equals(UDP_VALUE.getValue())) {
-            return new DatagramChannelSender(host, port, bufferPool, charset);
+            sender = new DatagramChannelSender(host, port, maxSendBufferSize, getLogger());
         } else {
             // if an SSLContextService is provided then we make a secure sender
             if (sslContextService != null) {
                 final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
-                return new SSLSocketChannelSender(sslContext, host, port, bufferPool, charset);
+                sender = new SSLSocketChannelSender(host, port, maxSendBufferSize, sslContext, getLogger());
             } else {
-                return new SocketChannelSender(host, port, bufferPool, charset);
+                sender = new SocketChannelSender(host, port, maxSendBufferSize, getLogger());
             }
         }
+        sender.setTimeout(timeout);
+        sender.open();
+        return sender;
     }
 
     @OnStopped
@@ -275,7 +272,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
         // if a connection hasn't been used with in the threshold then it gets closed
         ChannelSender sender;
         while ((sender = senderPool.poll()) != null) {
-            if (currentTime > (sender.lastUsed + idleThreshold)) {
+            if (currentTime > (sender.getLastUsed() + idleThreshold)) {
                 getLogger().debug("Closing idle connection...");
                 sender.close();
             } else {
@@ -309,7 +306,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
         if (sender == null) {
             try {
                 getLogger().debug("No available connections, creating a new one...");
-                sender = createSender(context, bufferPool);
+                sender = createSender(context);
             } catch (IOException e) {
                 for (final FlowFile flowFile : flowFiles) {
                     getLogger().error("No available connections, and unable to create a new one, transferring {} to failure",
@@ -325,6 +322,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
         final String host = context.getProperty(HOSTNAME).getValue();
         final String transitUri = new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
         final ObjectHolder<IOException> exceptionHolder = new ObjectHolder<>(null);
+        final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
 
         try {
             for (FlowFile flowFile : flowFiles) {
@@ -352,7 +350,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
                             messageBuilder.append('\n');
                         }
 
-                        sender.send(messageBuilder.toString());
+                        sender.send(messageBuilder.toString(), charSet);
                         timer.stop();
 
                         final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
@@ -396,157 +394,4 @@ public class PutSyslog extends AbstractSyslogProcessor {
         return false;
     }
 
-    /**
-     * Base class for sending messages over a channel.
-     */
-    protected static abstract class ChannelSender {
-
-        final int port;
-        final String host;
-        final BlockingQueue<ByteBuffer> bufferPool;
-        final Charset charset;
-        volatile long lastUsed;
-
-        ChannelSender(final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
-            this.port = port;
-            this.host = host;
-            this.bufferPool = bufferPool;
-            this.charset = charset;
-        }
-
-        public void send(final String message) throws IOException {
-            final byte[] bytes = message.getBytes(charset);
-
-            boolean shouldReturn = true;
-            ByteBuffer buffer = bufferPool.poll();
-            if (buffer == null) {
-                buffer = ByteBuffer.allocate(bytes.length);
-                shouldReturn = false;
-            } else if (buffer.limit() < bytes.length) {
-                // we need a large buffer so return the one we got and create a new bigger one
-                bufferPool.offer(buffer);
-                buffer = ByteBuffer.allocate(bytes.length);
-                shouldReturn = false;
-            }
-
-            try {
-                buffer.clear();
-                buffer.put(bytes);
-                buffer.flip();
-                write(buffer);
-                lastUsed = System.currentTimeMillis();
-            } finally {
-                if (shouldReturn) {
-                    bufferPool.offer(buffer);
-                }
-            }
-        }
-
-        // write the given buffer to the underlying channel
-        abstract void write(ByteBuffer buffer) throws IOException;
-
-        // returns true if the underlying channel is connected
-        abstract boolean isConnected();
-
-        // close the underlying channel
-        abstract void close();
-    }
-
-    /**
-     * Sends messages over a DatagramChannel.
-     */
-    private static class DatagramChannelSender extends ChannelSender {
-
-        final DatagramChannel channel;
-
-        DatagramChannelSender(final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
-            super(host, port, bufferPool, charset);
-            this.channel = DatagramChannel.open();
-            this.channel.connect(new InetSocketAddress(InetAddress.getByName(host), port));
-        }
-
-        @Override
-        public void write(ByteBuffer buffer) throws IOException {
-            while (buffer.hasRemaining()) {
-                channel.write(buffer);
-            }
-        }
-
-        @Override
-        boolean isConnected() {
-            return channel != null && channel.isConnected();
-        }
-
-        @Override
-        public void close() {
-            IOUtils.closeQuietly(channel);
-        }
-    }
-
-    /**
-     * Sends messages over a SocketChannel.
-     */
-    private static class SocketChannelSender extends ChannelSender {
-
-        final SocketChannel channel;
-
-        SocketChannelSender(final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
-            super(host, port, bufferPool, charset);
-            this.channel = SocketChannel.open();
-            this.channel.connect(new InetSocketAddress(InetAddress.getByName(host), port));
-        }
-
-        @Override
-        public void write(ByteBuffer buffer) throws IOException {
-            while (buffer.hasRemaining()) {
-                channel.write(buffer);
-            }
-        }
-
-        @Override
-        boolean isConnected() {
-            return channel != null && channel.isConnected();
-        }
-
-        @Override
-        public void close() {
-            IOUtils.closeQuietly(channel);
-        }
-    }
-
-    /**
-     * Sends messages over an SSLSocketChannel.
-     */
-    private static class SSLSocketChannelSender extends ChannelSender {
-
-        final SSLSocketChannel channel;
-
-        SSLSocketChannelSender(final SSLContext sslContext, final String host, final int port, final BlockingQueue<ByteBuffer> bufferPool, final Charset charset) throws IOException {
-            super(host, port, bufferPool, charset);
-            this.channel = new SSLSocketChannel(sslContext, host, port, true);
-            this.channel.connect();
-        }
-
-        @Override
-        public void send(final String message) throws IOException {
-            final byte[] bytes = message.getBytes(charset);
-            channel.write(bytes);
-            lastUsed = System.currentTimeMillis();
-        }
-
-        @Override
-        public void write(ByteBuffer buffer) throws IOException {
-            // nothing to do here since we are overriding send() above
-        }
-
-        @Override
-        boolean isConnected() {
-            return channel != null && !channel.isClosed();
-        }
-
-        @Override
-        public void close() {
-            IOUtils.closeQuietly(channel);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 5781fad..d3d765e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -48,6 +48,7 @@ org.apache.nifi.processors.standard.ListFile
 org.apache.nifi.processors.standard.ListenHTTP
 org.apache.nifi.processors.standard.ListenRELP
 org.apache.nifi.processors.standard.ListenSyslog
+org.apache.nifi.processors.standard.ListenTCP
 org.apache.nifi.processors.standard.ListenUDP
 org.apache.nifi.processors.standard.ListSFTP
 org.apache.nifi.processors.standard.LogAttribute

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java
new file mode 100644
index 0000000..29fa690
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+/**
+ * Tests PutSyslog sending messages to ListenSyslog to simulate a syslog server forwarding
+ * to ListenSyslog, or PutSyslog sending to a syslog server.
+ */
+public class TestListenAndPutSyslog {
+
+    static final Logger LOGGER = LoggerFactory.getLogger(TestListenAndPutSyslog.class);
+
+    private ListenSyslog listenSyslog;
+    private TestRunner listenSyslogRunner;
+
+    private PutSyslog putSyslog;
+    private TestRunner putSyslogRunner;
+
+    @Before
+    public void setup() {
+        this.listenSyslog = new ListenSyslog();
+        this.listenSyslogRunner = TestRunners.newTestRunner(listenSyslog);
+
+        this.putSyslog = new PutSyslog();
+        this.putSyslogRunner = TestRunners.newTestRunner(putSyslog);
+    }
+
+    @After
+    public void teardown() {
+        try {
+            putSyslog.onStopped();
+        } catch (Exception e) {
+            LOGGER.error(e.getMessage(), e);
+        }
+        try {
+            listenSyslog.onUnscheduled();
+        } catch (Exception e) {
+            LOGGER.error(e.getMessage(), e);
+        }
+    }
+
+    @Test
+    public void testUDP() throws IOException, InterruptedException {
+        run(ListenSyslog.UDP_VALUE.getValue(), 5, 5);
+    }
+
+    @Test
+    public void testTCP() throws IOException, InterruptedException {
+        run(ListenSyslog.TCP_VALUE.getValue(), 5, 5);
+    }
+
+    @Test
+    public void testTLS() throws InitializationException, IOException, InterruptedException {
+        configureSSLContextService(listenSyslogRunner);
+        listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, "ssl-context");
+
+        configureSSLContextService(putSyslogRunner);
+        putSyslogRunner.setProperty(PutSyslog.SSL_CONTEXT_SERVICE, "ssl-context");
+
+        run(ListenSyslog.TCP_VALUE.getValue(), 7, 7);
+    }
+
+    @Test
+    public void testTLSListenerNoTLSPut() throws InitializationException, IOException, InterruptedException {
+        configureSSLContextService(listenSyslogRunner);
+        listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, "ssl-context");
+
+        // send 7 but expect 0 because sender didn't use TLS
+        run(ListenSyslog.TCP_VALUE.getValue(), 7, 0);
+    }
+
+    private SSLContextService configureSSLContextService(TestRunner runner) throws InitializationException {
+        final SSLContextService sslContextService = new StandardSSLContextService();
+        runner.addControllerService("ssl-context", sslContextService);
+        runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks");
+        runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
+        runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
+        runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks");
+        runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
+        runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS");
+        runner.enableControllerService(sslContextService);
+        return sslContextService;
+    }
+
+    /**
+     * Sends numMessages from PutSyslog to ListenSyslog.
+     */
+    private void run(String protocol, int numMessages, int expectedMessages) throws IOException, InterruptedException {
+        // set the same protocol on both processors
+        putSyslogRunner.setProperty(PutSyslog.PROTOCOL, protocol);
+        listenSyslogRunner.setProperty(ListenSyslog.PROTOCOL, protocol);
+
+        // set a listening port of 0 to get a random available port
+        listenSyslogRunner.setProperty(ListenSyslog.PORT, "0");
+
+        // call onScheduled to start ListenSyslog listening
+        final ProcessSessionFactory processSessionFactory = listenSyslogRunner.getProcessSessionFactory();
+        final ProcessContext context = listenSyslogRunner.getProcessContext();
+        listenSyslog.onScheduled(context);
+
+        // get the real port it is listening on and set that in PutSyslog
+        final int listeningPort = listenSyslog.getPort();
+        putSyslogRunner.setProperty(PutSyslog.PORT, String.valueOf(listeningPort));
+
+        // configure the message properties on PutSyslog
+        final String pri = "34";
+        final String version = "1";
+        final String stamp = "2016-02-05T22:14:15.003Z";
+        final String host = "localhost";
+        final String body = "some message";
+        final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body;
+
+        putSyslogRunner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+        putSyslogRunner.setProperty(PutSyslog.MSG_VERSION, version);
+        putSyslogRunner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+        putSyslogRunner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+        putSyslogRunner.setProperty(PutSyslog.MSG_BODY, body);
+
+        // send the messages
+        for (int i=0; i < numMessages; i++) {
+            putSyslogRunner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")));
+        }
+        putSyslogRunner.run(numMessages, false);
+
+        // trigger ListenSyslog until we've seen all the messages
+        int numTransfered = 0;
+        long timeout = System.currentTimeMillis() + 30000;
+
+        while (numTransfered < expectedMessages && System.currentTimeMillis() < timeout) {
+            Thread.sleep(10);
+            listenSyslog.onTrigger(context, processSessionFactory);
+            numTransfered = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
+        }
+        Assert.assertEquals("Did not process all the messages", expectedMessages, numTransfered);
+
+        if (expectedMessages > 0) {
+            // check that one of flow files has the expected content
+            MockFlowFile mockFlowFile = listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
+            mockFlowFile.assertContentEquals(expectedMessage);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
index 877a55a..b885e49 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
@@ -194,11 +194,24 @@ public class TestListenRELP {
             // send the frames to the port the processors is listening on
             sendFrames(frames, socket);
 
-            // call onTrigger until we processed all the frames, or a certain amount of time passes
-            long responseTimeout = 10000;
-            long startTime = System.currentTimeMillis();
-            while (proc.responses.size() < expectedTransferred
-                    && (System.currentTimeMillis() - startTime < responseTimeout)) {
+            long responseTimeout = 30000;
+
+            // this first loop waits until the internal queue of the processor has the expected
+            // number of messages ready before proceeding, we want to guarantee they are all there
+            // before onTrigger gets a chance to run
+            long startTimeQueueSizeCheck = System.currentTimeMillis();
+            while (proc.getQueueSize() < expectedResponses
+                    && (System.currentTimeMillis() - startTimeQueueSizeCheck < responseTimeout)) {
+                Thread.sleep(100);
+            }
+
+            // want to fail here if the queue size isn't what we expect
+            Assert.assertEquals(expectedResponses, proc.getQueueSize());
+
+            // call onTrigger until we got a respond for all the frames, or a certain amount of time passes
+            long startTimeProcessing = System.currentTimeMillis();
+            while (proc.responses.size() < expectedResponses
+                    && (System.currentTimeMillis() - startTimeProcessing < responseTimeout)) {
                 proc.onTrigger(context, processSessionFactory);
                 Thread.sleep(100);
             }
@@ -221,7 +234,6 @@ public class TestListenRELP {
         for (final RELPFrame frame : frames) {
             byte[] encodedFrame = encoder.encode(frame);
             socket.getOutputStream().write(encodedFrame);
-            Thread.sleep(1);
         }
         socket.getOutputStream().flush();
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index cd8621c..2743caf 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -62,7 +62,8 @@ public class TestListenSyslog {
     static final String HOST = "localhost.home";
     static final String BODY = "some message";
 
-    static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n";
+    static final String VALID_MESSAGE = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY ;
+    static final String VALID_MESSAGE_TCP = "<" + PRI + ">" + TIME + " " + HOST + " " + BODY + "\n";
     static final String INVALID_MESSAGE = "this is not valid\n";
 
     @Test
@@ -135,7 +136,7 @@ public class TestListenSyslog {
         Assert.assertTrue(port > 0);
 
         // write some TCP messages to the port in the background
-        final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE));
+        final Thread sender = new Thread(new SingleConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP));
         sender.setDaemon(true);
         sender.start();
 
@@ -185,7 +186,7 @@ public class TestListenSyslog {
         Assert.assertTrue(port > 0);
 
         // send 3 messages as 1
-        final String multipleMessages = VALID_MESSAGE + "\n" + VALID_MESSAGE + "\n" + VALID_MESSAGE;
+        final String multipleMessages = VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n" + VALID_MESSAGE_TCP + "\n";
         final Thread sender = new Thread(new SingleConnectionSocketSender(port, 1, 10, multipleMessages));
         sender.setDaemon(true);
         sender.start();
@@ -237,7 +238,7 @@ public class TestListenSyslog {
         Assert.assertTrue(port > 0);
 
         // write some TCP messages to the port in the background
-        final Thread sender = new Thread(new MultiConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE));
+        final Thread sender = new Thread(new MultiConnectionSocketSender(port, numMessages, 10, VALID_MESSAGE_TCP));
         sender.setDaemon(true);
         sender.start();
 
@@ -292,7 +293,7 @@ public class TestListenSyslog {
         Assert.assertTrue(port > 0);
 
         // write some UDP messages to the port in the background
-        final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE.replaceAll("\\n", "")));
+        final Thread sender = new Thread(new DatagramSender(port, numMessages, 10, VALID_MESSAGE));
         sender.setDaemon(true);
         sender.start();
         sender.join();
@@ -432,7 +433,7 @@ public class TestListenSyslog {
 
 
     private void checkFlowFile(final MockFlowFile flowFile, final int port, final String protocol) {
-        flowFile.assertContentEquals(VALID_MESSAGE);
+        flowFile.assertContentEquals(VALID_MESSAGE.replace("\n", ""));
         Assert.assertEquals(PRI, flowFile.getAttribute(SyslogAttributes.PRIORITY.key()));
         Assert.assertEquals(SEV, flowFile.getAttribute(SyslogAttributes.SEVERITY.key()));
         Assert.assertEquals(FAC, flowFile.getAttribute(SyslogAttributes.FACILITY.key()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
new file mode 100644
index 0000000..ef05eab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestListenTCP {
+
+    private ListenTCP proc;
+    private TestRunner runner;
+
+    @Before
+    public void setup() {
+        proc = new ListenTCP();
+        runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(ListenTCP.PORT, "0");
+    }
+
+    @Test
+    public void testCustomValidate() throws InitializationException {
+        runner.setProperty(ListenTCP.PORT, "1");
+        runner.assertValid();
+
+        configureProcessorSslContextService();
+        runner.setProperty(ListenTCP.CLIENT_AUTH, "");
+        runner.assertNotValid();
+
+        runner.setProperty(ListenTCP.CLIENT_AUTH, SslContextFactory.ClientAuth.REQUIRED.name());
+        runner.assertValid();
+    }
+
+    @Test
+    public void testListenTCP() throws IOException, InterruptedException {
+        final List<String> messages = new ArrayList<>();
+        messages.add("This is message 1\n");
+        messages.add("This is message 2\n");
+        messages.add("This is message 3\n");
+        messages.add("This is message 4\n");
+        messages.add("This is message 5\n");
+
+        runTCP(messages, messages.size(), null);
+
+        List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
+        for (int i=0; i < mockFlowFiles.size(); i++) {
+            mockFlowFiles.get(i).assertContentEquals("This is message " + (i + 1));
+        }
+    }
+
+    @Test
+    public void testListenTCPBatching() throws IOException, InterruptedException {
+        runner.setProperty(ListenTCP.MAX_BATCH_SIZE, "3");
+
+        final List<String> messages = new ArrayList<>();
+        messages.add("This is message 1\n");
+        messages.add("This is message 2\n");
+        messages.add("This is message 3\n");
+        messages.add("This is message 4\n");
+        messages.add("This is message 5\n");
+
+        runTCP(messages, 2, null);
+
+        List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
+
+        MockFlowFile mockFlowFile1 = mockFlowFiles.get(0);
+        mockFlowFile1.assertContentEquals("This is message 1\nThis is message 2\nThis is message 3");
+
+        MockFlowFile mockFlowFile2 = mockFlowFiles.get(1);
+        mockFlowFile2.assertContentEquals("This is message 4\nThis is message 5");
+    }
+
+    @Test
+    public void testTLSClienAuthRequiredAndClientCertProvided() throws InitializationException, IOException, InterruptedException,
+            UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
+
+        runner.setProperty(ListenTCP.CLIENT_AUTH, SSLContextService.ClientAuth.REQUIRED.name());
+        configureProcessorSslContextService();
+
+        final List<String> messages = new ArrayList<>();
+        messages.add("This is message 1\n");
+        messages.add("This is message 2\n");
+        messages.add("This is message 3\n");
+        messages.add("This is message 4\n");
+        messages.add("This is message 5\n");
+
+        // Make an SSLContext with a key and trust store to send the test messages
+        final SSLContext clientSslContext = SslContextFactory.createSslContext(
+                "src/test/resources/localhost-ks.jks",
+                "localtest".toCharArray(),
+                "jks",
+                "src/test/resources/localhost-ts.jks",
+                "localtest".toCharArray(),
+                "jks",
+                org.apache.nifi.security.util.SslContextFactory.ClientAuth.valueOf("NONE"),
+                "TLS");
+
+        runTCP(messages, messages.size(), clientSslContext);
+
+        List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
+        for (int i=0; i < mockFlowFiles.size(); i++) {
+            mockFlowFiles.get(i).assertContentEquals("This is message " + (i + 1));
+        }
+    }
+
+    @Test
+    public void testTLSClienAuthRequiredAndClientCertNotProvided() throws InitializationException, IOException, InterruptedException,
+            UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
+
+        runner.setProperty(ListenTCP.CLIENT_AUTH, SSLContextService.ClientAuth.REQUIRED.name());
+        configureProcessorSslContextService();
+
+        final List<String> messages = new ArrayList<>();
+        messages.add("This is message 1\n");
+        messages.add("This is message 2\n");
+        messages.add("This is message 3\n");
+        messages.add("This is message 4\n");
+        messages.add("This is message 5\n");
+
+        // Make an SSLContext that only has the trust store, this should not work since the processor has client auth REQUIRED
+        final SSLContext clientSslContext = SslContextFactory.createTrustSslContext(
+                "src/test/resources/localhost-ts.jks",
+                "localtest".toCharArray(),
+                "jks",
+                "TLS");
+
+        try {
+            runTCP(messages, messages.size(), clientSslContext);
+            Assert.fail("Should have thrown exception");
+        } catch (Exception e) {
+
+        }
+    }
+
+    @Test
+    public void testTLSClienAuthNoneAndClientCertNotProvided() throws InitializationException, IOException, InterruptedException,
+            UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
+
+        runner.setProperty(ListenTCP.CLIENT_AUTH, SSLContextService.ClientAuth.NONE.name());
+        configureProcessorSslContextService();
+
+        final List<String> messages = new ArrayList<>();
+        messages.add("This is message 1\n");
+        messages.add("This is message 2\n");
+        messages.add("This is message 3\n");
+        messages.add("This is message 4\n");
+        messages.add("This is message 5\n");
+
+        // Make an SSLContext that only has the trust store, this should not work since the processor has client auth REQUIRED
+        final SSLContext clientSslContext = SslContextFactory.createTrustSslContext(
+                "src/test/resources/localhost-ts.jks",
+                "localtest".toCharArray(),
+                "jks",
+                "TLS");
+
+        runTCP(messages, messages.size(), clientSslContext);
+
+        List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
+        for (int i=0; i < mockFlowFiles.size(); i++) {
+            mockFlowFiles.get(i).assertContentEquals("This is message " + (i + 1));
+        }
+    }
+
+    protected void runTCP(final List<String> messages, final int expectedTransferred, final SSLContext sslContext)
+            throws IOException, InterruptedException {
+
+        Socket socket = null;
+        try {
+            // schedule to start listening on a random port
+            final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+            final ProcessContext context = runner.getProcessContext();
+            proc.onScheduled(context);
+
+            // create a client connection to the port the dispatcher is listening on
+            final int realPort = proc.getDispatcherPort();
+
+            // create either a regular socket or ssl socket based on context being passed in
+            if (sslContext != null) {
+                socket = sslContext.getSocketFactory().createSocket("localhost", realPort);
+            } else {
+                socket = new Socket("localhost", realPort);
+            }
+            Thread.sleep(100);
+
+            // send the frames to the port the processors is listening on
+            for (final String message : messages) {
+                socket.getOutputStream().write(message.getBytes(StandardCharsets.UTF_8));
+                Thread.sleep(1);
+            }
+            socket.getOutputStream().flush();
+
+            long responseTimeout = 10000;
+
+            // this first loop waits until the internal queue of the processor has the expected
+            // number of messages ready before proceeding, we want to guarantee they are all there
+            // before onTrigger gets a chance to run
+            long startTimeQueueSizeCheck = System.currentTimeMillis();
+            while (proc.getQueueSize() < messages.size()
+                    && (System.currentTimeMillis() - startTimeQueueSizeCheck < responseTimeout)) {
+                Thread.sleep(100);
+            }
+
+            // want to fail here if the queue size isn't what we expect
+            Assert.assertEquals(messages.size(), proc.getQueueSize());
+
+            // call onTrigger until we processed all the frames, or a certain amount of time passes
+            int numTransferred = 0;
+            long startTime = System.currentTimeMillis();
+            while (numTransferred < expectedTransferred  && (System.currentTimeMillis() - startTime < responseTimeout)) {
+                proc.onTrigger(context, processSessionFactory);
+                numTransferred = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS).size();
+                Thread.sleep(100);
+            }
+
+            // should have transferred the expected events
+            runner.assertTransferCount(ListenTCP.REL_SUCCESS, expectedTransferred);
+        } finally {
+            // unschedule to close connections
+            proc.onUnscheduled();
+            IOUtils.closeQuietly(socket);
+        }
+    }
+
+    private SSLContextService configureProcessorSslContextService() throws InitializationException {
+        final SSLContextService sslContextService = new StandardSSLContextService();
+        runner.addControllerService("ssl-context", sslContextService);
+        runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks");
+        runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
+        runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
+        runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks");
+        runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
+        runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS");
+        runner.enableControllerService(sslContextService);
+
+        runner.setProperty(ListenTCP.SSL_CONTEXT_SERVICE, "ssl-context");
+        return sslContextService;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
index c96d105..60fe37d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.util.put.sender.ChannelSender;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.ssl.SSLContextService;
@@ -27,14 +28,11 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 public class TestPutSyslog {
 
@@ -327,8 +325,9 @@ public class TestPutSyslog {
         }
 
         @Override
-        protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host, int port,
-                                             Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
+        protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host,
+                                             int port, int maxSendBuffer, int timeout)
+                throws IOException {
             return mockSender;
         }
     }
@@ -346,8 +345,9 @@ public class TestPutSyslog {
         }
 
         @Override
-        protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host, int port,
-                                             Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException {
+        protected ChannelSender createSender(SSLContextService sslContextService, String protocol, String host,
+                                             int port, int maxSendBuffer, int timeout)
+                throws IOException {
             if (numSendersCreated >= numSendersAllowed) {
                 throw new IOException("too many senders");
             }
@@ -357,61 +357,70 @@ public class TestPutSyslog {
     }
 
     // Mock sender that saves any messages passed to send()
-    static class MockCollectingSender extends PutSyslog.ChannelSender {
+    static class MockCollectingSender extends ChannelSender {
 
         List<String> messages = new ArrayList<>();
 
         public MockCollectingSender() throws IOException {
-            super("myhost", 0, new LinkedBlockingQueue<ByteBuffer>(1), Charset.forName("UTF-8"));
-            this.bufferPool.offer(ByteBuffer.allocate(1024));
+            super("myhost", 0, 0, null);
         }
 
         @Override
-        public void send(String message) throws IOException {
+        public void open() throws IOException {
+
+        }
+
+        @Override
+        public void send(String message, Charset charset) throws IOException {
             messages.add(message);
-            super.send(message);
+            super.send(message, charset);
         }
 
         @Override
-        void write(ByteBuffer buffer) throws IOException {
+        protected void write(byte[] buffer) throws IOException {
 
         }
 
         @Override
-        boolean isConnected() {
+        public boolean isConnected() {
             return true;
         }
 
         @Override
-        void close() {
+        public void close() {
 
         }
     }
 
     // Mock sender that throws IOException on calls to write() or send()
-    static class MockErrorSender extends PutSyslog.ChannelSender {
+    static class MockErrorSender extends ChannelSender {
 
         public MockErrorSender() throws IOException {
-            super(null, 0, null, null);
+            super(null, 0, 0, null);
+        }
+
+        @Override
+        public void open() throws IOException {
+
         }
 
         @Override
-        public void send(String message) throws IOException {
+        public void send(String message, Charset charset) throws IOException {
             throw new IOException("error");
         }
 
         @Override
-        void write(ByteBuffer buffer) throws IOException {
+        protected void write(byte[] data) throws IOException {
             throw new IOException("error");
         }
 
         @Override
-        boolean isConnected() {
+       public boolean isConnected() {
             return false;
         }
 
         @Override
-        void close() {
+        public void close() {
 
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index e6df432..0639d5a 100644
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -54,6 +54,7 @@
         <module>nifi-scripting-bundle</module>
         <module>nifi-elasticsearch-bundle</module>
         <module>nifi-amqp-bundle</module>
+	<module>nifi-splunk-bundle</module>
     </modules>
     <dependencyManagement>
         <dependencies>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ac29b01..6b20a7d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1064,6 +1064,12 @@ language governing permissions and limitations under the License. -->
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-elasticsearch-nar</artifactId>
                 <version>0.6.0-SNAPSHOT</version>
+		<type>nar</type>
+	    </dependency>
+	    <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-splunk-nar</artifactId>
+                <version>0.6.0-SNAPSHOT</version>
                 <type>nar</type>
             </dependency>
             <dependency>


Mime
View raw message