nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [25/51] [partial] nifi git commit: NIFI-850 removed nifi parent, updated nifi pom, moved all nifi subdirs up one level, fixed readme.
Date Sat, 15 Aug 2015 17:13:14 GMT
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java
new file mode 100644
index 0000000..27d676a
--- /dev/null
+++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLSocket;
+
+import org.apache.nifi.logging.NiFiLog;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class SocketUtils {
+
+    private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketUtils.class));
+
+    public static Socket createSocket(final InetSocketAddress address, final SocketConfiguration config) throws IOException {
+        if (address == null) {
+            throw new IllegalArgumentException("Socket address may not be null.");
+        } else if (config == null) {
+            throw new IllegalArgumentException("Configuration may not be null.");
+        }
+
+        final Socket socket;
+
+        final SSLContext sslContext;
+        try {
+            sslContext = config.createSSLContext();
+        } catch (final Exception e) {
+            throw new IOException("Could not create SSLContext", e);
+        }
+
+        if (sslContext == null) {
+            socket = new Socket(address.getHostName(), address.getPort());
+        } else {
+            socket = sslContext.getSocketFactory().createSocket(address.getHostName(), address.getPort());
+        }
+
+        if (config.getSocketTimeout() != null) {
+            socket.setSoTimeout(config.getSocketTimeout());
+        }
+
+        if (config.getReuseAddress() != null) {
+            socket.setReuseAddress(config.getReuseAddress());
+        }
+
+        if (config.getReceiveBufferSize() != null) {
+            socket.setReceiveBufferSize(config.getReceiveBufferSize());
+        }
+
+        if (config.getSendBufferSize() != null) {
+            socket.setSendBufferSize(config.getSendBufferSize());
+        }
+
+        if (config.getTrafficClass() != null) {
+            socket.setTrafficClass(config.getTrafficClass());
+        }
+
+        if (config.getKeepAlive() != null) {
+            socket.setKeepAlive(config.getKeepAlive());
+        }
+
+        if (config.getOobInline() != null) {
+            socket.setOOBInline(config.getOobInline());
+        }
+
+        if (config.getTcpNoDelay() != null) {
+            socket.setTcpNoDelay(config.getTcpNoDelay());
+        }
+
+        return socket;
+    }
+
+    public static ServerSocket createServerSocket(final int port, final ServerSocketConfiguration config)
+            throws IOException, KeyManagementException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, CertificateException {
+        if (config == null) {
+            throw new NullPointerException("Configuration may not be null.");
+        }
+
+        final SSLContext sslContext = config.createSSLContext();
+        final ServerSocket serverSocket;
+        if (sslContext == null) {
+            serverSocket = new ServerSocket(port);
+        } else {
+            serverSocket = sslContext.getServerSocketFactory().createServerSocket(port);
+            ((SSLServerSocket) serverSocket).setNeedClientAuth(config.getNeedClientAuth());
+        }
+
+        if (config.getSocketTimeout() != null) {
+            serverSocket.setSoTimeout(config.getSocketTimeout());
+        }
+
+        if (config.getReuseAddress() != null) {
+            serverSocket.setReuseAddress(config.getReuseAddress());
+        }
+
+        if (config.getReceiveBufferSize() != null) {
+            serverSocket.setReceiveBufferSize(config.getReceiveBufferSize());
+        }
+
+        return serverSocket;
+    }
+
+    public static void closeQuietly(final Socket socket) {
+        if (socket == null) {
+            return;
+        }
+
+        try {
+            try {
+                // can't shudown input/output individually with secure sockets
+                if ((socket instanceof SSLSocket) == false) {
+                    if (socket.isInputShutdown() == false) {
+                        socket.shutdownInput();
+                    }
+                    if (socket.isOutputShutdown() == false) {
+                        socket.shutdownOutput();
+                    }
+                }
+            } finally {
+                if (socket.isClosed() == false) {
+                    socket.close();
+                }
+            }
+        } catch (final Exception ex) {
+            logger.debug("Failed to close socket due to: " + ex, ex);
+        }
+    }
+
+    public static void closeQuietly(final ServerSocket serverSocket) {
+        if (serverSocket == null) {
+            return;
+        }
+
+        try {
+            serverSocket.close();
+        } catch (final Exception ex) {
+            logger.debug("Failed to close server socket due to: " + ex, ex);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java
new file mode 100644
index 0000000..fc817e9
--- /dev/null
+++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A service that may be discovered at runtime. A service is defined as having a
+ * unique case-sensitive service name and a socket address where it is
+ * available.
+ *
+ */
+public interface DiscoverableService {
+
+    /**
+     * The service's name. Two services are considered equal if they have the
+     * same case sensitive service name.
+     *
+     * @return the service's name
+     */
+    String getServiceName();
+
+    /**
+     * @return the service's address
+     */
+    InetSocketAddress getServiceAddress();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java
new file mode 100644
index 0000000..3737e95
--- /dev/null
+++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+import java.net.InetSocketAddress;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * A basic implementation of the DiscoverableService interface. To services are
+ * considered equal if they have the same case-sensitive service name.
+ *
+ */
+public class DiscoverableServiceImpl implements DiscoverableService {
+
+    private final String serviceName;
+
+    private final InetSocketAddress serviceAddress;
+
+    public DiscoverableServiceImpl(final String serviceName, final InetSocketAddress serviceAddress) {
+        if (StringUtils.isBlank(serviceName)) {
+            throw new IllegalArgumentException("Service name may not be null or empty.");
+        } else if (serviceAddress == null) {
+            throw new IllegalArgumentException("Service address may not be null.");
+        }
+        this.serviceName = serviceName;
+        this.serviceAddress = serviceAddress;
+    }
+
+    @Override
+    public InetSocketAddress getServiceAddress() {
+        return serviceAddress;
+    }
+
+    @Override
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("[Discoverable Service: %s available at %s:%d]", serviceName, serviceAddress.getHostName(), serviceAddress.getPort());
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof DiscoverableService)) {
+            return false;
+        }
+        final DiscoverableService other = (DiscoverableService) obj;
+        return !((this.serviceName == null) ? (other.getServiceName() != null) : !this.serviceName.equals(other.getServiceName()));
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 5;
+        hash = 53 * hash + (this.serviceName != null ? this.serviceName.hashCode() : 0);
+        return hash;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java
new file mode 100644
index 0000000..d1c2156
--- /dev/null
+++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+/**
+ */
+public final class MulticastConfiguration {
+
+    private MulticastTimeToLive ttl = DEFAULT_MULTICAST_TTL;
+
+    private Integer socketTimeout;
+
+    private Integer receiveBufferSize;
+
+    private Integer sendBufferSize;
+
+    private Boolean reuseAddress;
+
+    private Integer trafficClass;
+
+    private Boolean loopbackMode;
+
+    public static final MulticastTimeToLive DEFAULT_MULTICAST_TTL = MulticastTimeToLive.SAME_SUBNET;
+
+    public MulticastTimeToLive getTtl() {
+        return ttl;
+    }
+
+    public void setTtl(final MulticastTimeToLive ttl) {
+        if (ttl == null) {
+            throw new NullPointerException("Multicast TTL may not be null.");
+        }
+        this.ttl = ttl;
+    }
+
+    public Integer getSocketTimeout() {
+        return socketTimeout;
+    }
+
+    public void setSocketTimeout(Integer socketTimeout) {
+        this.socketTimeout = socketTimeout;
+    }
+
+    public Boolean getReuseAddress() {
+        return reuseAddress;
+    }
+
+    public void setReuseAddress(Boolean reuseAddress) {
+        this.reuseAddress = reuseAddress;
+    }
+
+    public Integer getReceiveBufferSize() {
+        return receiveBufferSize;
+    }
+
+    public void setReceiveBufferSize(Integer receiveBufferSize) {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+    public Integer getSendBufferSize() {
+        return sendBufferSize;
+    }
+
+    public void setSendBufferSize(Integer sendBufferSize) {
+        this.sendBufferSize = sendBufferSize;
+    }
+
+    public Integer getTrafficClass() {
+        return trafficClass;
+    }
+
+    public void setTrafficClass(Integer trafficClass) {
+        this.trafficClass = trafficClass;
+    }
+
+    public Boolean getLoopbackMode() {
+        return loopbackMode;
+    }
+
+    public void setLoopbackMode(Boolean loopbackMode) {
+        this.loopbackMode = loopbackMode;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java
new file mode 100644
index 0000000..1ce2ea0
--- /dev/null
+++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements a listener for protocol messages sent over multicast. If a message
+ * is of type MulticastProtocolMessage, then the underlying protocol message is
+ * passed to the handler. If the receiving handler produces a message response,
+ * then the message is wrapped with a MulticastProtocolMessage before being sent
+ * to the originator.
+ *
+ */
+public abstract class MulticastListener {
+
+    // constants
+    private static final int DEFAULT_SHUTDOWN_LISTENER_SECONDS = 5;
+    private static final int DEFAULT_MAX_PACKET_SIZE_BYTES = 512;
+
+    private static final Logger logger = new org.apache.nifi.logging.NiFiLog(LoggerFactory.getLogger(MulticastListener.class));
+
+    // immutable members
+    private final int numThreads;
+    private final InetSocketAddress multicastAddress;
+    private final MulticastConfiguration configuration;
+
+    private volatile ExecutorService executorService;     // volatile to guarantee most current value is visible
+    private volatile MulticastSocket multicastSocket;     // volatile to guarantee most current value is visible
+
+    private int shutdownListenerSeconds = DEFAULT_SHUTDOWN_LISTENER_SECONDS;
+    private int maxPacketSizeBytes = DEFAULT_MAX_PACKET_SIZE_BYTES;
+
+    public MulticastListener(
+            final int numThreads,
+            final InetSocketAddress multicastAddress,
+            final MulticastConfiguration configuration) {
+
+        if (numThreads <= 0) {
+            throw new IllegalArgumentException("Number of threads may not be less than or equal to zero.");
+        } else if (multicastAddress == null) {
+            throw new IllegalArgumentException("Multicast address may not be null.");
+        } else if (multicastAddress.getAddress().isMulticastAddress() == false) {
+            throw new IllegalArgumentException("Multicast group must be a Class D address.");
+        } else if (configuration == null) {
+            throw new IllegalArgumentException("Multicast configuration may not be null.");
+        }
+
+        this.numThreads = numThreads;
+        this.multicastAddress = multicastAddress;
+        this.configuration = configuration;
+    }
+
+    /**
+     * Implements the action to perform when a new datagram is received. This
+     * class must not close the multicast socket.
+     *
+     * @param multicastSocket socket
+     * @param packet the datagram packet
+     */
+    public abstract void dispatchRequest(final MulticastSocket multicastSocket, final DatagramPacket packet);
+
+    public void start() throws IOException {
+
+        if (isRunning()) {
+            return;
+        }
+
+        multicastSocket = MulticastUtils.createMulticastSocket(multicastAddress.getPort(), configuration);
+        multicastSocket.joinGroup(multicastAddress.getAddress());
+
+        executorService = Executors.newFixedThreadPool(numThreads);
+
+        final ExecutorService runnableExecServiceRef = executorService;
+        final MulticastSocket runnableMulticastSocketRef = multicastSocket;
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (runnableExecServiceRef.isShutdown() == false) {
+                    try {
+                        final byte[] buf = new byte[maxPacketSizeBytes];
+                        final DatagramPacket packet = new DatagramPacket(buf, maxPacketSizeBytes);
+                        runnableMulticastSocketRef.receive(packet);
+                        runnableExecServiceRef.execute(new Runnable() {
+                            @Override
+                            public void run() {
+                                dispatchRequest(multicastSocket, packet);
+                            }
+                        });
+                    } catch (final SocketException | SocketTimeoutException ste) {
+                        /* ignore so that we can accept connections in approximately a non-blocking fashion */
+                    } catch (final Exception e) {
+                        logger.warn("Cluster protocol receiver encountered exception: " + e, e);
+                    }
+                }
+            }
+        }).start();
+    }
+
+    public boolean isRunning() {
+        return (executorService != null && executorService.isShutdown() == false);
+    }
+
+    public void stop() throws IOException {
+
+        if (isRunning() == false) {
+            return;
+        }
+
+        // shutdown executor service
+        try {
+            if (getShutdownListenerSeconds() <= 0) {
+                executorService.shutdownNow();
+            } else {
+                executorService.shutdown();
+            }
+            executorService.awaitTermination(getShutdownListenerSeconds(), TimeUnit.SECONDS);
+        } catch (final InterruptedException ex) {
+            Thread.currentThread().interrupt();
+        } finally {
+            if (executorService.isTerminated()) {
+                logger.info("Multicast Listener has been terminated successfully.");
+            } else {
+                logger.warn("Multicast Listener has not terminated properly.  There exists an uninterruptable thread that will take an indeterminate amount of time to stop.");
+            }
+        }
+
+        // shutdown server socket
+        if (multicastSocket.isClosed() == false) {
+            multicastSocket.leaveGroup(multicastAddress.getAddress());
+            multicastSocket.close();
+        }
+
+    }
+
+    public int getShutdownListenerSeconds() {
+        return shutdownListenerSeconds;
+    }
+
+    public void setShutdownListenerSeconds(final int shutdownListenerSeconds) {
+        this.shutdownListenerSeconds = shutdownListenerSeconds;
+    }
+
+    public int getMaxPacketSizeBytes() {
+        return maxPacketSizeBytes;
+    }
+
+    public void setMaxPacketSizeBytes(int maxPacketSizeBytes) {
+        if (maxPacketSizeBytes <= 0) {
+            throw new IllegalArgumentException("Max packet size must be greater than zero bytes.");
+        }
+        this.maxPacketSizeBytes = maxPacketSizeBytes;
+    }
+
+    public MulticastConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public InetSocketAddress getMulticastAddress() {
+        return multicastAddress;
+    }
+
+    public int getNumThreads() {
+        return numThreads;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java
new file mode 100644
index 0000000..212e20c
--- /dev/null
+++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Defines the interface for discovering services based on name. Services are
+ * expected to be exposed via socket address and port.
+ *
+ */
+public interface MulticastServiceDiscovery extends ServiceDiscovery {
+
+    /**
+     * @return the multicast address
+     */
+    InetSocketAddress getMulticastAddress();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java
new file mode 100644
index 0000000..7ef5a5d
--- /dev/null
+++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Defines the interface for broadcasting a service via multicast.
+ *
+ */
+public interface MulticastServicesBroadcaster extends ServicesBroadcaster {
+
+    /**
+     * @return the multicast address
+     */
+    InetSocketAddress getMulticastAddress();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java
new file mode 100644
index 0000000..3e96c61
--- /dev/null
+++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+/**
+ */
+public enum MulticastTimeToLive {
+
+    SAME_HOST(0),
+    SAME_SUBNET(1),
+    SAME_SITE(32),
+    SAME_REGION(64),
+    SAME_CONTINENT(128),
+    UNRESTRICTED(255);
+
+    private final int ttl;
+
+    MulticastTimeToLive(final int ttl) {
+        this.ttl = ttl;
+    }
+
+    public int getTtl() {
+        return ttl;
+    }
+
+    public MulticastTimeToLive valueOfByTtl(final int ttl) {
+        for (final MulticastTimeToLive value : values()) {
+            if (value.getTtl() == ttl) {
+                return value;
+            }
+        }
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java
new file mode 100644
index 0000000..84d76d2
--- /dev/null
+++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public final class MulticastUtils {
+
+    private static final Logger logger = new org.apache.nifi.logging.NiFiLog(LoggerFactory.getLogger(MulticastUtils.class));
+
+    public static MulticastSocket createMulticastSocket(final MulticastConfiguration config) throws IOException {
+        return createMulticastSocket(0, config);
+    }
+
+    public static MulticastSocket createMulticastSocket(final int port, final MulticastConfiguration config) throws IOException {
+        if (config == null) {
+            throw new IllegalArgumentException("Configuration may not be null.");
+        }
+
+        final MulticastSocket socket;
+        if (port <= 0) {
+            socket = new MulticastSocket();
+        } else {
+            socket = new MulticastSocket(port);
+        }
+        socket.setTimeToLive(config.getTtl().getTtl());
+
+        if (config.getSocketTimeout() != null) {
+            socket.setSoTimeout(config.getSocketTimeout());
+        }
+
+        if (config.getReuseAddress() != null) {
+            socket.setReuseAddress(config.getReuseAddress());
+        }
+
+        if (config.getReceiveBufferSize() != null) {
+            socket.setReceiveBufferSize(config.getReceiveBufferSize());
+        }
+
+        if (config.getSendBufferSize() != null) {
+            socket.setSendBufferSize(config.getSendBufferSize());
+        }
+
+        if (config.getTrafficClass() != null) {
+            socket.setTrafficClass(config.getTrafficClass());
+        }
+
+        if (config.getLoopbackMode() != null) {
+            socket.setLoopbackMode(config.getLoopbackMode());
+        }
+
+        return socket;
+    }
+
+    public static void closeQuietly(final MulticastSocket socket) {
+
+        if (socket == null) {
+            return;
+        }
+
+        try {
+            socket.close();
+        } catch (final Exception ex) {
+            logger.debug("Failed to close multicast socket due to: " + ex, ex);
+        }
+
+    }
+
+    public static void closeQuietly(final MulticastSocket socket, final InetAddress groupAddress) {
+
+        if (socket == null) {
+            return;
+        }
+
+        try {
+            socket.leaveGroup(groupAddress);
+        } catch (final Exception ex) {
+            logger.debug("Failed to leave multicast group due to: " + ex, ex);
+        }
+
+        try {
+            socket.close();
+        } catch (final Exception ex) {
+            logger.debug("Failed to close multicast socket due to: " + ex, ex);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java
new file mode 100644
index 0000000..74c1a30
--- /dev/null
+++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+/**
+ * Defines a generic interface for discovering services.
+ *
+ */
+public interface ServiceDiscovery {
+
+    /**
+     * @return the discovered service
+     */
+    DiscoverableService getService();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java
new file mode 100644
index 0000000..2a9f9b2
--- /dev/null
+++ b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+import java.util.Set;
+
+/**
+ * Defines the interface for broadcasting a collection of services for client
+ * discovery.
+ *
+ */
+public interface ServicesBroadcaster {
+
+    /**
+     * @return the delay in milliseconds to wait between successive broadcasts
+     */
+    int getBroadcastDelayMs();
+
+    /**
+     * @return the broadcasted services
+     */
+    Set<DiscoverableService> getServices();
+
+    /**
+     * Adds the given service to the set of broadcasted services.
+     *
+     * @param service a service
+     * @return true if the service was added to the set; false a service with
+     * the given service name already exists in the set.
+     */
+    boolean addService(DiscoverableService service);
+
+    /**
+     * Removes the service with the given service name from the set.
+     *
+     * @param serviceName a service name
+     * @return true if the service was removed; false otherwise
+     */
+    boolean removeService(String serviceName);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java b/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java
new file mode 100644
index 0000000..a266ade
--- /dev/null
+++ b/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio.example;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.io.nio.BufferPool;
+import org.apache.nifi.io.nio.ChannelListener;
+import org.apache.nifi.io.nio.consumer.StreamConsumer;
+import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public final class ServerMain {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ServerMain.class);
+
+    public static void main(final String[] args) throws IOException {
+        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+        System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
+
+        final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
+        final Map<StreamConsumer, ScheduledFuture<?>> consumerMap = new ConcurrentHashMap<>();
+        final BufferPool bufferPool = new BufferPool(10, 5 << 20, false, 40.0);
+        ChannelListener listener = null;
+        try {
+            executor.scheduleWithFixedDelay(bufferPool, 0L, 5L, TimeUnit.SECONDS);
+            listener = new ChannelListener(5, new ExampleStreamConsumerFactory(executor, consumerMap), bufferPool, 5, TimeUnit.MILLISECONDS, false);
+            listener.setChannelReaderSchedulingPeriod(50L, TimeUnit.MILLISECONDS);
+            listener.addDatagramChannel(null, 20000, 32 << 20);
+            LOGGER.info("Listening for UDP data on port 20000");
+            listener.addServerSocket(null, 20001, 64 << 20);
+            LOGGER.info("listening for TCP connections on port 20001");
+            listener.addServerSocket(null, 20002, 64 << 20);
+            LOGGER.info("listening for TCP connections on port 20002");
+            final Calendar endTime = Calendar.getInstance();
+            endTime.add(Calendar.MINUTE, 30);
+            while (true) {
+                processAllConsumers(consumerMap);
+                if (endTime.before(Calendar.getInstance())) {
+                    break; // time to shut down
+                }
+            }
+        } finally {
+            if (listener != null) {
+                LOGGER.info("Shutting down server....");
+                listener.shutdown(1L, TimeUnit.SECONDS);
+                LOGGER.info("Consumer map size = " + consumerMap.size());
+                while (consumerMap.size() > 0) {
+                    processAllConsumers(consumerMap);
+                }
+                LOGGER.info("Consumer map size = " + consumerMap.size());
+            }
+            executor.shutdown();
+        }
+    }
+
+    private static void processAllConsumers(final Map<StreamConsumer, ScheduledFuture<?>> consumerMap) {
+        final Set<StreamConsumer> deadConsumers = new HashSet<>();
+        for (final Map.Entry<StreamConsumer, ScheduledFuture<?>> entry : consumerMap.entrySet()) {
+            if (entry.getKey().isConsumerFinished()) {
+                entry.getValue().cancel(true);
+                deadConsumers.add(entry.getKey());
+            }
+        }
+        for (final StreamConsumer consumer : deadConsumers) {
+            LOGGER.debug("removing consumer " + consumer);
+            consumerMap.remove(consumer);
+        }
+    }
+
+    public static final class ConsumerRunner implements Runnable {
+
+        private final StreamConsumer consumer;
+
+        public ConsumerRunner(final StreamConsumer consumer) {
+            this.consumer = consumer;
+        }
+
+        @Override
+        public void run() {
+            if (consumer.isConsumerFinished()) {
+                return;
+            }
+            try {
+                consumer.process();
+            } catch (IOException ex) {
+                LOGGER.error("", ex);
+            }
+        }
+    }
+
+    public static final class ExampleStreamConsumerFactory implements StreamConsumerFactory {
+
+        final ScheduledExecutorService executor;
+        final Map<StreamConsumer, ScheduledFuture<?>> consumerMap;
+
+        public ExampleStreamConsumerFactory(final ScheduledExecutorService executor, final Map<StreamConsumer, ScheduledFuture<?>> consumerMap) {
+            this.executor = executor;
+            this.consumerMap = consumerMap;
+        }
+
+        @Override
+        public StreamConsumer newInstance(final String streamId) {
+            final StreamConsumer consumer = new UselessStreamConsumer(streamId);
+            final ScheduledFuture<?> future = executor.scheduleWithFixedDelay(new ConsumerRunner(consumer), 0L, 10L, TimeUnit.MILLISECONDS);
+            consumerMap.put(consumer, future);
+            LOGGER.info("Added consumer: " + consumer);
+            return consumer;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java b/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java
new file mode 100644
index 0000000..1c4b70c
--- /dev/null
+++ b/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio.example;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class TCPClient {
+
+    private static final Logger logger = LoggerFactory.getLogger(TCPClient.class);
+
+    public static void main(final String[] args) throws Exception {
+        final byte[] bytes = TCPClient.makeBytes();
+        Thread first = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    for (int i = 0; i < 10; i++) {
+                        sendData(20001, bytes);
+                    }
+                } catch (Exception e) {
+                    logger.error("Blew exception", e);
+                }
+            }
+        });
+        Thread second = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    for (int i = 0; i < 10; i++) {
+                        sendData(20002, bytes);
+                    }
+                } catch (Exception e) {
+                    logger.error("Blew exception", e);
+                }
+            }
+        });
+        first.start();
+        second.start();
+    }
+
+    public static byte[] makeBytes() {
+        byte[] bytes = new byte[2 << 20];
+        return bytes;
+    }
+
+    private static void sendData(final int port, final byte[] bytes) throws SocketException, IOException, InterruptedException {
+        long totalBytes;
+        try (Socket sock = new Socket("localhost", port)) {
+            sock.setTcpNoDelay(true);
+            sock.setSoTimeout(2000);
+            totalBytes = 0L;
+            logger.info("socket established " + sock + " to port " + port + " now waiting 5 seconds to send anything...");
+            Thread.sleep(5000L);
+            for (int i = 0; i < 1000; i++) {
+                sock.getOutputStream().write(bytes);
+                totalBytes += bytes.length;
+            }
+            sock.getOutputStream().flush();
+        }
+        logger.info("Total bytes sent: " + totalBytes + " to port " + port);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java b/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java
new file mode 100644
index 0000000..00a00a1
--- /dev/null
+++ b/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio.example;
+
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class UDPClient {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(UDPClient.class);
+
+    public static void main(final String[] args) throws Exception {
+        final byte[] buffer = UDPClient.makeBytes();
+        final DatagramPacket packet = new DatagramPacket(buffer, buffer.length, new InetSocketAddress("localhost", 20000));
+        final DatagramSocket socket = new DatagramSocket();
+        final long startTime = System.nanoTime();
+        for (int i = 0; i < 819200; i++) { // 100 MB
+            socket.send(packet);
+        }
+        final long endTime = System.nanoTime();
+        final long durationMillis = (endTime - startTime) / 1000000;
+        LOGGER.info("Sent all UDP packets without any obvious errors | duration ms= " + durationMillis);
+    }
+
+    public static byte[] makeBytes() {
+        byte[] bytes = new byte[128];
+        return bytes;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java b/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java
new file mode 100644
index 0000000..107c087
--- /dev/null
+++ b/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio.example;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.nifi.io.nio.consumer.AbstractStreamConsumer;
+
+/**
+ *
+ */
+public class UselessStreamConsumer extends AbstractStreamConsumer {
+
+    public UselessStreamConsumer(final String id) {
+        super(id);
+    }
+
+    @Override
+    protected void processBuffer(final ByteBuffer buffer) throws IOException {
+    }
+
+    @Override
+    protected void onConsumerDone() {
+        System.err.println("IN consumer done");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/src/test/resources/log4j.xml b/nifi-commons/nifi-socket-utils/src/test/resources/log4j.xml
new file mode 100644
index 0000000..8e93769
--- /dev/null
+++ b/nifi-commons/nifi-socket-utils/src/test/resources/log4j.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+    <!-- Appender for printing formatted log statements to the console. -->
+    <appender name="console" class="org.apache.log4j.ConsoleAppender">
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %-5p [%t] %40.40c - %m%n"/>
+        </layout>
+    </appender>
+
+    <!-- Logger for managing logging statements for nifi -->
+    <logger name="nifi">
+        <level value="debug"/>
+    </logger>
+
+    <root>
+        <level value="warn"/>
+        <appender-ref ref="console"/>
+    </root>
+</log4j:configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/.gitignore
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/.gitignore b/nifi-commons/nifi-utils/.gitignore
new file mode 100755
index 0000000..12c5231
--- /dev/null
+++ b/nifi-commons/nifi-utils/.gitignore
@@ -0,0 +1,8 @@
+/target
+/target
+/target
+/target
+/target
+/target
+/target
+/target

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/pom.xml b/nifi-commons/nifi-utils/pom.xml
new file mode 100644
index 0000000..6927fc0
--- /dev/null
+++ b/nifi-commons/nifi-utils/pom.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-commons</artifactId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-utils</artifactId>
+    <version>0.3.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+    <!--
+    This project intentionally has no additional dependencies beyond that pulled in by the parent.  It is a general purpose utility library
+    and should keep its surface/tension minimal.
+    -->
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java
new file mode 100644
index 0000000..9b4c3af
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowfile.attributes;
+
+public enum CoreAttributes implements FlowFileAttributeKey {
+
+    /**
+     * The flowfile's path indicates the relative directory to which a FlowFile belongs and does not contain the filename
+     */
+    PATH("path"),
+    /**
+     * The flowfile's absolute path indicates the absolute directory to which a FlowFile belongs and does not contain the filename
+     */
+    ABSOLUTE_PATH("absolute.path"),
+    /**
+     * The filename of the FlowFile. The filename should not contain any directory structure.
+     */
+    FILENAME("filename"),
+    /**
+     * A unique UUID assigned to this FlowFile
+     */
+    UUID("uuid"),
+    /**
+     * A numeric value indicating the FlowFile priority
+     */
+    PRIORITY("priority"),
+    /**
+     * The MIME Type of this FlowFile
+     */
+    MIME_TYPE("mime.type"),
+    /**
+     * Specifies the reason that a FlowFile is being discarded
+     */
+    DISCARD_REASON("discard.reason"),
+    /**
+     * Indicates an identifier other than the FlowFile's UUID that is known to refer to this FlowFile.
+     */
+    ALTERNATE_IDENTIFIER("alternate.identifier");
+
+    private final String key;
+
+    private CoreAttributes(final String key) {
+        this.key = key;
+    }
+
+    @Override
+    public String key() {
+        return key;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java
new file mode 100644
index 0000000..9637631
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowfile.attributes;
+
+public interface FlowFileAttributeKey {
+
+    String key();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
new file mode 100644
index 0000000..77c34c9
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+public class StandardVersionNegotiator implements VersionNegotiator {
+
+    private final List<Integer> versions;
+    private int curVersion;
+
+    public StandardVersionNegotiator(final int... supportedVersions) {
+        if (Objects.requireNonNull(supportedVersions).length == 0) {
+            throw new IllegalArgumentException("At least one version must be supported");
+        }
+
+        final List<Integer> supported = new ArrayList<>();
+        for (final int version : supportedVersions) {
+            supported.add(version);
+        }
+        this.versions = Collections.unmodifiableList(supported);
+        this.curVersion = supportedVersions[0];
+    }
+
+    @Override
+    public int getVersion() {
+        return curVersion;
+    }
+
+    @Override
+    public void setVersion(final int version) throws IllegalArgumentException {
+        if (!isVersionSupported(version)) {
+            throw new IllegalArgumentException("Version " + version + " is not supported");
+        }
+
+        this.curVersion = version;
+    }
+
+    @Override
+    public int getPreferredVersion() {
+        return versions.get(0);
+    }
+
+    @Override
+    public Integer getPreferredVersion(final int maxVersion) {
+        for (final Integer version : this.versions) {
+            if (maxVersion >= version) {
+                return version;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public boolean isVersionSupported(final int version) {
+        return versions.contains(version);
+    }
+
+    @Override
+    public List<Integer> getSupportedVersions() {
+        return versions;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
new file mode 100644
index 0000000..d8ee27a
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.util.List;
+
+public interface VersionNegotiator {
+
+    /**
+     * @return the currently configured Version of this resource
+     */
+    int getVersion();
+
+    /**
+     * Sets the version of this resource to the specified version. Only the lower byte of the version is relevant.
+     *
+     * @param version the version to set
+     * @throws IllegalArgumentException if the given Version is not supported by this resource, as is indicated by the {@link #isVersionSupported(int)} method
+     */
+    void setVersion(int version) throws IllegalArgumentException;
+
+    /**
+     *
+     * @return the Version of this resource that is preferred
+     */
+    int getPreferredVersion();
+
+    /**
+     * Gets the preferred version of this resource that is no greater than the given maxVersion. If no acceptable version exists that is less than <code>maxVersion</code>, then <code>null</code> is
+     * returned
+     *
+     * @param maxVersion the maximum version desired
+     * @return the preferred version if found; null otherwise
+     */
+    Integer getPreferredVersion(int maxVersion);
+
+    /**
+     * Indicates whether or not the specified version is supported by this resource
+     *
+     * @param version the version to test
+     * @return true if supported; false otherwise
+     */
+    boolean isVersionSupported(int version);
+
+    List<Integer> getSupportedVersions();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
new file mode 100644
index 0000000..d18c807
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+/**
+ * Indicates that the user disabled transmission while communications were taking place with a peer
+ */
+public class TransmissionDisabledException extends RuntimeException {
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
new file mode 100644
index 0000000..6434b2d
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+
+public class CompressionInputStream extends InputStream {
+
+    private final InputStream in;
+    private final Inflater inflater;
+
+    private byte[] compressedBuffer;
+    private byte[] buffer;
+
+    private int bufferIndex;
+    private boolean eos = false;    // whether or not we've reached the end of stream
+    private boolean allDataRead = false;    // different from eos b/c eos means allDataRead == true && buffer is empty
+
+    private final byte[] fourByteBuffer = new byte[4];
+
+    public CompressionInputStream(final InputStream in) {
+        this.in = in;
+        inflater = new Inflater();
+
+        buffer = new byte[0];
+        compressedBuffer = new byte[0];
+        bufferIndex = 1;
+    }
+
+    private String toHex(final byte[] array) {
+        final StringBuilder sb = new StringBuilder("0x");
+        for (final byte b : array) {
+            final String hex = Integer.toHexString(b).toUpperCase();
+            if (hex.length() == 1) {
+                sb.append("0");
+            }
+            sb.append(hex);
+        }
+        return sb.toString();
+    }
+
+    protected void readChunkHeader() throws IOException {
+        // Ensure that we have a valid SYNC chunk
+        fillBuffer(fourByteBuffer);
+        if (!Arrays.equals(CompressionOutputStream.SYNC_BYTES, fourByteBuffer)) {
+            throw new IOException("Invalid CompressionInputStream. Expected first 4 bytes to be 'SYNC' but were " + toHex(fourByteBuffer));
+        }
+
+        // determine the size of the decompressed buffer
+        fillBuffer(fourByteBuffer);
+        buffer = new byte[toInt(fourByteBuffer)];
+
+        // determine the size of the compressed buffer
+        fillBuffer(fourByteBuffer);
+        compressedBuffer = new byte[toInt(fourByteBuffer)];
+
+        bufferIndex = buffer.length;  // indicate that buffer is empty
+    }
+
+    private int toInt(final byte[] data) {
+        return ((data[0] & 0xFF) << 24)
+                | ((data[1] & 0xFF) << 16)
+                | ((data[2] & 0xFF) << 8)
+                | (data[3] & 0xFF);
+    }
+
+    protected void bufferAndDecompress() throws IOException {
+        if (allDataRead) {
+            eos = true;
+            return;
+        }
+
+        readChunkHeader();
+        fillBuffer(compressedBuffer);
+
+        inflater.setInput(compressedBuffer);
+        try {
+            inflater.inflate(buffer);
+        } catch (final DataFormatException e) {
+            throw new IOException(e);
+        }
+        inflater.reset();
+
+        bufferIndex = 0;
+        final int moreDataByte = in.read();
+        if (moreDataByte < 1) {
+            allDataRead = true;
+        } else if (moreDataByte > 1) {
+            throw new IOException("Expected indicator of whether or not more data was to come (-1, 0, or 1) but got " + moreDataByte);
+        }
+    }
+
+    private void fillBuffer(final byte[] buffer) throws IOException {
+        int len;
+        int bytesLeft = buffer.length;
+        int bytesRead = 0;
+        while (bytesLeft > 0 && (len = in.read(buffer, bytesRead, bytesLeft)) > 0) {
+            bytesLeft -= len;
+            bytesRead += len;
+        }
+
+        if (bytesRead < buffer.length) {
+            throw new EOFException();
+        }
+    }
+
+    private boolean isBufferEmpty() {
+        return bufferIndex >= buffer.length;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (eos) {
+            return -1;
+        }
+
+        if (isBufferEmpty()) {
+            bufferAndDecompress();
+        }
+
+        if (isBufferEmpty()) {
+            eos = true;
+            return -1;
+        }
+
+        return buffer[bufferIndex++] & 0xFF;
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        return read(b, 0, b.length);
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, final int len) throws IOException {
+        if (eos) {
+            return -1;
+        }
+
+        if (isBufferEmpty()) {
+            bufferAndDecompress();
+        }
+
+        if (isBufferEmpty()) {
+            eos = true;
+            return -1;
+        }
+
+        final int free = buffer.length - bufferIndex;
+        final int bytesToTransfer = Math.min(len, free);
+        System.arraycopy(buffer, bufferIndex, b, off, bytesToTransfer);
+        bufferIndex += bytesToTransfer;
+
+        return bytesToTransfer;
+    }
+
+    /**
+     * Does nothing. Does NOT close underlying InputStream
+     *
+     * @throws java.io.IOException for any issues closing underlying stream
+     */
+    @Override
+    public void close() throws IOException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
new file mode 100644
index 0000000..525b5b1
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.Deflater;
+
+public class CompressionOutputStream extends OutputStream {
+
+    public static final byte[] SYNC_BYTES = new byte[]{'S', 'Y', 'N', 'C'};
+
+    public static final int DEFAULT_COMPRESSION_LEVEL = 1;
+    public static final int DEFAULT_BUFFER_SIZE = 64 << 10;
+    public static final int MIN_BUFFER_SIZE = 8 << 10;
+
+    private final OutputStream out;
+    private final Deflater deflater;
+
+    private final byte[] buffer;
+    private final byte[] compressed;
+
+    private int bufferIndex = 0;
+    private boolean dataWritten = false;
+
+    public CompressionOutputStream(final OutputStream outStream) {
+        this(outStream, DEFAULT_BUFFER_SIZE);
+    }
+
+    public CompressionOutputStream(final OutputStream outStream, final int bufferSize) {
+        this(outStream, bufferSize, DEFAULT_COMPRESSION_LEVEL, Deflater.DEFAULT_STRATEGY);
+    }
+
+    public CompressionOutputStream(final OutputStream outStream, final int bufferSize, final int level, final int strategy) {
+        if (bufferSize < MIN_BUFFER_SIZE) {
+            throw new IllegalArgumentException("Buffer size must be at least " + MIN_BUFFER_SIZE);
+        }
+
+        this.out = outStream;
+        this.deflater = new Deflater(level);
+        this.deflater.setStrategy(strategy);
+        buffer = new byte[bufferSize];
+        compressed = new byte[bufferSize + 64];
+    }
+
+    /**
+     * Compresses the currently buffered chunk of data and sends it to the output stream
+     *
+     * @throws IOException if issues occur writing to stream
+     */
+    protected void compressAndWrite() throws IOException {
+        if (bufferIndex <= 0) {
+            return;
+        }
+
+        deflater.setInput(buffer, 0, bufferIndex);
+        deflater.finish();
+        final int compressedBytes = deflater.deflate(compressed);
+
+        writeChunkHeader(compressedBytes);
+        out.write(compressed, 0, compressedBytes);
+
+        bufferIndex = 0;
+        deflater.reset();
+    }
+
+    private void writeChunkHeader(final int compressedBytes) throws IOException {
+        // If we have already written data, write out a '1' to indicate that we have more data; when we close
+        // the stream, we instead write a '0' to indicate that we are finished sending data.
+        if (dataWritten) {
+            out.write(1);
+        }
+        out.write(SYNC_BYTES);
+        dataWritten = true;
+
+        writeInt(out, bufferIndex);
+        writeInt(out, compressedBytes);
+    }
+
+    private void writeInt(final OutputStream out, final int val) throws IOException {
+        out.write(val >>> 24);
+        out.write(val >>> 16);
+        out.write(val >>> 8);
+        out.write(val);
+    }
+
+    protected boolean bufferFull() {
+        return bufferIndex >= buffer.length;
+    }
+
+    @Override
+    public void write(final int b) throws IOException {
+        buffer[bufferIndex++] = (byte) (b & 0xFF);
+        if (bufferFull()) {
+            compressAndWrite();
+        }
+    }
+
+    @Override
+    public void write(final byte[] b) throws IOException {
+        write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(final byte[] b, final int off, final int len) throws IOException {
+        int bytesLeft = len;
+        while (bytesLeft > 0) {
+            final int free = buffer.length - bufferIndex;
+            final int bytesThisIteration = Math.min(bytesLeft, free);
+            System.arraycopy(b, off + len - bytesLeft, buffer, bufferIndex, bytesThisIteration);
+            bufferIndex += bytesThisIteration;
+
+            bytesLeft -= bytesThisIteration;
+            if (bufferFull()) {
+                compressAndWrite();
+            }
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        compressAndWrite();
+        super.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+        compressAndWrite();
+        out.write(0);   // indicate that the stream is finished.
+        out.flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
new file mode 100644
index 0000000..e03dfbf
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+
+public class InterruptableInputStream extends InputStream {
+
+    private volatile boolean interrupted = false;
+    private final InputStream in;
+
+    public InterruptableInputStream(final InputStream in) {
+        this.in = in;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        return in.read();
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        return in.read(b);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        return in.read(b, off, len);
+    }
+
+    @Override
+    public int available() throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        return in.available();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        in.close();
+    }
+
+    @Override
+    public synchronized void mark(int readlimit) {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        in.mark(readlimit);
+    }
+
+    @Override
+    public boolean markSupported() {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        return in.markSupported();
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        in.reset();
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        return in.skip(n);
+    }
+
+    public void interrupt() {
+        interrupted = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
new file mode 100644
index 0000000..cba5be6
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+
+public class InterruptableOutputStream extends OutputStream {
+
+    private final OutputStream out;
+    private volatile boolean interrupted = false;
+
+    public InterruptableOutputStream(final OutputStream out) {
+        this.out = out;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        out.write(b);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        out.write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        out.write(b, off, len);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        out.close();
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        out.flush();
+    }
+
+    public void interrupt() {
+        this.interrupted = true;
+    }
+}


Mime
View raw message