nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mcgil...@apache.org
Subject [40/59] [abbrv] [partial] incubator-nifi git commit: Reworked overall directory structure to make releasing nifi vs maven plugis easier
Date Mon, 19 Jan 2015 18:16:14 GMT
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java
deleted file mode 100644
index 8a8b7c0..0000000
--- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.MulticastSocket;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author unattributed
- */
-public final class MulticastUtils {
-
-    private static final Logger logger = new org.apache.nifi.logging.NiFiLog(LoggerFactory.getLogger(MulticastUtils.class));
-
-    public static MulticastSocket createMulticastSocket(final MulticastConfiguration config) throws IOException {
-        return createMulticastSocket(0, config);
-    }
-
-    public static MulticastSocket createMulticastSocket(final int port, final MulticastConfiguration config) throws IOException {
-        if (config == null) {
-            throw new IllegalArgumentException("Configuration may not be null.");
-        }
-
-        final MulticastSocket socket;
-        if (port <= 0) {
-            socket = new MulticastSocket();
-        } else {
-            socket = new MulticastSocket(port);
-        }
-        socket.setTimeToLive(config.getTtl().getTtl());
-
-        if (config.getSocketTimeout() != null) {
-            socket.setSoTimeout(config.getSocketTimeout());
-        }
-
-        if (config.getReuseAddress() != null) {
-            socket.setReuseAddress(config.getReuseAddress());
-        }
-
-        if (config.getReceiveBufferSize() != null) {
-            socket.setReceiveBufferSize(config.getReceiveBufferSize());
-        }
-
-        if (config.getSendBufferSize() != null) {
-            socket.setSendBufferSize(config.getSendBufferSize());
-        }
-
-        if (config.getTrafficClass() != null) {
-            socket.setTrafficClass(config.getTrafficClass());
-        }
-
-        if (config.getLoopbackMode() != null) {
-            socket.setLoopbackMode(config.getLoopbackMode());
-        }
-
-        return socket;
-    }
-
-    public static void closeQuietly(final MulticastSocket socket) {
-
-        if (socket == null) {
-            return;
-        }
-
-        try {
-            socket.close();
-        } catch (final Exception ex) {
-            logger.debug("Failed to close multicast socket due to: " + ex, ex);
-        }
-
-    }
-
-    public static void closeQuietly(final MulticastSocket socket, final InetAddress groupAddress) {
-
-        if (socket == null) {
-            return;
-        }
-
-        try {
-            socket.leaveGroup(groupAddress);
-        } catch (final Exception ex) {
-            logger.debug("Failed to leave multicast group due to: " + ex, ex);
-        }
-
-        try {
-            socket.close();
-        } catch (final Exception ex) {
-            logger.debug("Failed to close multicast socket due to: " + ex, ex);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java
deleted file mode 100644
index 173146e..0000000
--- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-/**
- * Defines a generic interface for discovering services.
- *
- * @author unattributed
- */
-public interface ServiceDiscovery {
-
-    /**
-     * @return the discovered service
-     */
-    DiscoverableService getService();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java
deleted file mode 100644
index 86260d8..0000000
--- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.socket.multicast;
-
-import java.util.Set;
-
-/**
- * Defines the interface for broadcasting a collection of services for client
- * discovery.
- *
- * @author unattributed
- */
-public interface ServicesBroadcaster {
-
-    /**
-     * @return the delay in milliseconds to wait between successive broadcasts
-     */
-    int getBroadcastDelayMs();
-
-    /**
-     * @return the broadcasted services
-     */
-    Set<DiscoverableService> getServices();
-
-    /**
-     * Adds the given service to the set of broadcasted services.
-     *
-     * @param service a service
-     * @return true if the service was added to the set; false a service with
-     * the given service name already exists in the set.
-     */
-    boolean addService(DiscoverableService service);
-
-    /**
-     * Removes the service with the given service name from the set.
-     *
-     * @param serviceName a service name
-     * @return true if the service was removed; false otherwise
-     */
-    boolean removeService(String serviceName);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java
deleted file mode 100644
index b5240c9..0000000
--- a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio.example;
-
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.io.nio.BufferPool;
-import org.apache.nifi.io.nio.ChannelListener;
-import org.apache.nifi.io.nio.consumer.StreamConsumer;
-import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * @author none
- */
-public final class ServerMain {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(ServerMain.class);
-
-    public static void main(final String[] args) throws IOException {
-        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
-        System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
-
-        final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
-        final Map<StreamConsumer, ScheduledFuture<?>> consumerMap = new ConcurrentHashMap<>();
-        final BufferPool bufferPool = new BufferPool(10, 5 << 20, false, 40.0);
-        ChannelListener listener = null;
-        try {
-            executor.scheduleWithFixedDelay(bufferPool, 0L, 5L, TimeUnit.SECONDS);
-            listener = new ChannelListener(5, new ExampleStreamConsumerFactory(executor, consumerMap), bufferPool, 5, TimeUnit.MILLISECONDS);
-            listener.setChannelReaderSchedulingPeriod(50L, TimeUnit.MILLISECONDS);
-            listener.addDatagramChannel(null, 20000, 32 << 20);
-            LOGGER.info("Listening for UDP data on port 20000");
-            listener.addServerSocket(null, 20001, 64 << 20);
-            LOGGER.info("listening for TCP connections on port 20001");
-            listener.addServerSocket(null, 20002, 64 << 20);
-            LOGGER.info("listening for TCP connections on port 20002");
-            final Calendar endTime = Calendar.getInstance();
-            endTime.add(Calendar.MINUTE, 30);
-            while (true) {
-                processAllConsumers(consumerMap);
-                if (endTime.before(Calendar.getInstance())) {
-                    break; // time to shut down
-                }
-            }
-        } finally {
-            if (listener != null) {
-                LOGGER.info("Shutting down server....");
-                listener.shutdown(1L, TimeUnit.SECONDS);
-                LOGGER.info("Consumer map size = " + consumerMap.size());
-                while (consumerMap.size() > 0) {
-                    processAllConsumers(consumerMap);
-                }
-                LOGGER.info("Consumer map size = " + consumerMap.size());
-            }
-            executor.shutdown();
-        }
-    }
-
-    private static void processAllConsumers(final Map<StreamConsumer, ScheduledFuture<?>> consumerMap) {
-        final Set<StreamConsumer> deadConsumers = new HashSet<>();
-        for (final Map.Entry<StreamConsumer, ScheduledFuture<?>> entry : consumerMap.entrySet()) {
-            if (entry.getKey().isConsumerFinished()) {
-                entry.getValue().cancel(true);
-                deadConsumers.add(entry.getKey());
-            }
-        }
-        for (final StreamConsumer consumer : deadConsumers) {
-            LOGGER.debug("removing consumer " + consumer);
-            consumerMap.remove(consumer);
-        }
-    }
-
-    public static final class ConsumerRunner implements Runnable {
-
-        private final StreamConsumer consumer;
-
-        public ConsumerRunner(final StreamConsumer consumer) {
-            this.consumer = consumer;
-        }
-
-        @Override
-        public void run() {
-            if (consumer.isConsumerFinished()) {
-                return;
-            }
-            try {
-                consumer.process();
-            } catch (IOException ex) {
-                LOGGER.error("", ex);
-            }
-        }
-    }
-
-    public static final class ExampleStreamConsumerFactory implements StreamConsumerFactory {
-
-        final ScheduledExecutorService executor;
-        final Map<StreamConsumer, ScheduledFuture<?>> consumerMap;
-
-        public ExampleStreamConsumerFactory(final ScheduledExecutorService executor, final Map<StreamConsumer, ScheduledFuture<?>> consumerMap) {
-            this.executor = executor;
-            this.consumerMap = consumerMap;
-        }
-
-        @Override
-        public StreamConsumer newInstance(final String streamId) {
-            final StreamConsumer consumer = new UselessStreamConsumer(streamId);
-            final ScheduledFuture<?> future = executor.scheduleWithFixedDelay(new ConsumerRunner(consumer), 0L, 10L, TimeUnit.MILLISECONDS);
-            consumerMap.put(consumer, future);
-            LOGGER.info("Added consumer: " + consumer);
-            return consumer;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java
deleted file mode 100644
index b3d214e..0000000
--- a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio.example;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.net.SocketException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author none
- */
-public class TCPClient {
-
-    private static final Logger logger = LoggerFactory.getLogger(TCPClient.class);
-
-    public static void main(final String[] args) throws Exception {
-        final byte[] bytes = TCPClient.makeBytes();
-        Thread first = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                try {
-                    for (int i = 0; i < 10; i++) {
-                        sendData(20001, bytes);
-                    }
-                } catch (Exception e) {
-                    logger.error("Blew exception", e);
-                }
-            }
-        });
-        Thread second = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                try {
-                    for (int i = 0; i < 10; i++) {
-                        sendData(20002, bytes);
-                    }
-                } catch (Exception e) {
-                    logger.error("Blew exception", e);
-                }
-            }
-        });
-        first.start();
-        second.start();
-    }
-
-    public static byte[] makeBytes() {
-        byte[] bytes = new byte[2 << 20];
-        return bytes;
-    }
-
-    private static void sendData(final int port, final byte[] bytes) throws SocketException, IOException, InterruptedException {
-        long totalBytes;
-        try (Socket sock = new Socket("localhost", port)) {
-            sock.setTcpNoDelay(true);
-            sock.setSoTimeout(2000);
-            totalBytes = 0L;
-            logger.info("socket established " + sock + " to port " + port + " now waiting 5 seconds to send anything...");
-            Thread.sleep(5000L);
-            for (int i = 0; i < 1000; i++) {
-                sock.getOutputStream().write(bytes);
-                totalBytes += bytes.length;
-            }   sock.getOutputStream().flush();
-        }
-        logger.info("Total bytes sent: " + totalBytes + " to port " + port);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java
deleted file mode 100644
index 90f4c42..0000000
--- a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio.example;
-
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetSocketAddress;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author none
- */
-public class UDPClient {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(UDPClient.class);
-
-    public static void main(final String[] args) throws Exception {
-        final byte[] buffer = UDPClient.makeBytes();
-        final DatagramPacket packet = new DatagramPacket(buffer, buffer.length, new InetSocketAddress("localhost", 20000));
-        final DatagramSocket socket = new DatagramSocket();
-        final long startTime = System.nanoTime();
-        for (int i = 0; i < 819200; i++) { // 100 MB
-            socket.send(packet);
-        }
-        final long endTime = System.nanoTime();
-        final long durationMillis = (endTime - startTime) / 1000000;
-        LOGGER.info("Sent all UDP packets without any obvious errors | duration ms= " + durationMillis);
-    }
-
-    public static byte[] makeBytes() {
-        byte[] bytes = new byte[128];
-        return bytes;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java
deleted file mode 100644
index 9ec26e9..0000000
--- a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io.nio.example;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.nifi.io.nio.consumer.AbstractStreamConsumer;
-
-/**
- *
- * @author none
- */
-public class UselessStreamConsumer extends AbstractStreamConsumer {
-
-    public UselessStreamConsumer(final String id) {
-        super(id);
-    }
-
-    @Override
-    protected void processBuffer(final ByteBuffer buffer) throws IOException {
-    }
-
-    @Override
-    protected void onConsumerDone() {
-        System.err.println("IN consumer done");
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/test/resources/log4j.xml b/commons/nifi-socket-utils/src/test/resources/log4j.xml
deleted file mode 100644
index 8e93769..0000000
--- a/commons/nifi-socket-utils/src/test/resources/log4j.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
-
-<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
-
-    <!-- Appender for printing formatted log statements to the console. -->
-    <appender name="console" class="org.apache.log4j.ConsoleAppender">
-        <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d %-5p [%t] %40.40c - %m%n"/>
-        </layout>
-    </appender>
-
-    <!-- Logger for managing logging statements for nifi -->
-    <logger name="nifi">
-        <level value="debug"/>
-    </logger>
-
-    <root>
-        <level value="warn"/>
-        <appender-ref ref="console"/>
-    </root>
-</log4j:configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/.gitignore
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/.gitignore b/commons/nifi-utils/.gitignore
deleted file mode 100755
index 12c5231..0000000
--- a/commons/nifi-utils/.gitignore
+++ /dev/null
@@ -1,8 +0,0 @@
-/target
-/target
-/target
-/target
-/target
-/target
-/target
-/target

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/pom.xml
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/pom.xml b/commons/nifi-utils/pom.xml
deleted file mode 100644
index c5c2a68..0000000
--- a/commons/nifi-utils/pom.xml
+++ /dev/null
@@ -1,33 +0,0 @@
-<?xml version="1.0"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.nifi</groupId>
-        <artifactId>nifi-commons-parent</artifactId>
-        <version>0.0.1-SNAPSHOT</version>
-    </parent>
-
-    <artifactId>nifi-utils</artifactId>
-    <version>0.0.1-SNAPSHOT</version>
-    <packaging>jar</packaging>
-    <name>NiFi Utils</name>
-    <!--
-    This project intentionally has no additional dependencies beyond that pulled in by the parent.  It is a general purpose utility library
-    and should keep its surface/tension minimal.
-    -->
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java b/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java
deleted file mode 100644
index 24f43ca..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.flowfile.attributes;
-
-public enum CoreAttributes implements FlowFileAttributeKey {
-    /**
-     * The flowfile's path indicates the relative directory to which a FlowFile belongs and does not
-     * contain the filename
-     */
-    PATH("path"),
-    
-    /**
-     * The flowfile's absolute path indicates the absolute directory to which a FlowFile belongs and does not
-     * contain the filename
-     */
-    ABSOLUTE_PATH("absolute.path"),
-    
-    /**
-     * The filename of the FlowFile. The filename should not contain any directory structure.
-     */
-    FILENAME("filename"),
-    
-    /**
-     * A unique UUID assigned to this FlowFile
-     */
-    UUID("uuid"),
-    
-    /**
-     * A numeric value indicating the FlowFile priority
-     */
-    PRIORITY("priority"),
-    
-    /**
-     * The MIME Type of this FlowFile
-     */
-    MIME_TYPE("mime.type"),
-    
-    /**
-     * Specifies the reason that a FlowFile is being discarded
-     */
-    DISCARD_REASON("discard.reason"),
-
-    /**
-     * Indicates an identifier other than the FlowFile's UUID that is known to refer to this FlowFile.
-     */
-    ALTERNATE_IDENTIFIER("alternate.identifier");
-    
-    private final String key;
-    private CoreAttributes(final String key) {
-        this.key = key;
-    }
-    
-    @Override
-    public String key() {
-        return key;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java b/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java
deleted file mode 100644
index cc6c28e..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.flowfile.attributes;
-
-public interface FlowFileAttributeKey {
-    String key();
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
deleted file mode 100644
index 77c34c9..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-
-public class StandardVersionNegotiator implements VersionNegotiator {
-
-    private final List<Integer> versions;
-    private int curVersion;
-
-    public StandardVersionNegotiator(final int... supportedVersions) {
-        if (Objects.requireNonNull(supportedVersions).length == 0) {
-            throw new IllegalArgumentException("At least one version must be supported");
-        }
-
-        final List<Integer> supported = new ArrayList<>();
-        for (final int version : supportedVersions) {
-            supported.add(version);
-        }
-        this.versions = Collections.unmodifiableList(supported);
-        this.curVersion = supportedVersions[0];
-    }
-
-    @Override
-    public int getVersion() {
-        return curVersion;
-    }
-
-    @Override
-    public void setVersion(final int version) throws IllegalArgumentException {
-        if (!isVersionSupported(version)) {
-            throw new IllegalArgumentException("Version " + version + " is not supported");
-        }
-
-        this.curVersion = version;
-    }
-
-    @Override
-    public int getPreferredVersion() {
-        return versions.get(0);
-    }
-
-    @Override
-    public Integer getPreferredVersion(final int maxVersion) {
-        for (final Integer version : this.versions) {
-            if (maxVersion >= version) {
-                return version;
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public boolean isVersionSupported(final int version) {
-        return versions.contains(version);
-    }
-
-    @Override
-    public List<Integer> getSupportedVersions() {
-        return versions;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
deleted file mode 100644
index 74f9b3d..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-import java.util.List;
-
-public interface VersionNegotiator {
-
-    /**
-     * @return the currently configured Version of this resource
-     */
-    int getVersion();
-
-    /**
-     * Sets the version of this resource to the specified version. Only the
-     * lower byte of the version is relevant.
-     *
-     * @param version
-     * @throws IllegalArgumentException if the given Version is not supported by
-     * this resource, as is indicated by the {@link #isVersionSupported(int)}
-     * method
-     */
-    void setVersion(int version) throws IllegalArgumentException;
-
-    /**
-     *
-     * @return the Version of this resource that is preferred
-     */
-    int getPreferredVersion();
-
-    /**
-     * Gets the preferred version of this resource that is no greater than the
-     * given maxVersion. If no acceptable version exists that is less than
-     * <code>maxVersion</code>, then <code>null</code> is returned
-     *
-     * @param maxVersion
-     * @return
-     */
-    Integer getPreferredVersion(int maxVersion);
-
-    /**
-     * Indicates whether or not the specified version is supported by this
-     * resource
-     *
-     * @param version
-     * @return
-     */
-    boolean isVersionSupported(int version);
-
-    List<Integer> getSupportedVersions();
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
deleted file mode 100644
index 05fd915..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-/**
- * Indicates that the user disabled transmission while communications were
- * taking place with a peer
- */
-public class TransmissionDisabledException extends RuntimeException {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
deleted file mode 100644
index 71cf894..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.zip.DataFormatException;
-import java.util.zip.Inflater;
-
-public class CompressionInputStream extends InputStream {
-
-    private final InputStream in;
-    private final Inflater inflater;
-
-    private byte[] compressedBuffer;
-    private byte[] buffer;
-
-    private int bufferIndex;
-    private boolean eos = false;    // whether or not we've reached the end of stream
-    private boolean allDataRead = false;    // different from eos b/c eos means allDataRead == true && buffer is empty
-
-    private final byte[] fourByteBuffer = new byte[4];
-
-    public CompressionInputStream(final InputStream in) {
-        this.in = in;
-        inflater = new Inflater();
-
-        buffer = new byte[0];
-        compressedBuffer = new byte[0];
-        bufferIndex = 1;
-    }
-
-    private String toHex(final byte[] array) {
-        final StringBuilder sb = new StringBuilder("0x");
-        for (final byte b : array) {
-            final String hex = Integer.toHexString(b).toUpperCase();
-            if (hex.length() == 1) {
-                sb.append("0");
-            }
-            sb.append(hex);
-        }
-        return sb.toString();
-    }
-
-    protected void readChunkHeader() throws IOException {
-        // Ensure that we have a valid SYNC chunk
-        fillBuffer(fourByteBuffer);
-        if (!Arrays.equals(CompressionOutputStream.SYNC_BYTES, fourByteBuffer)) {
-            throw new IOException("Invalid CompressionInputStream. Expected first 4 bytes to be 'SYNC' but were " + toHex(fourByteBuffer));
-        }
-
-        // determine the size of the decompressed buffer
-        fillBuffer(fourByteBuffer);
-        buffer = new byte[toInt(fourByteBuffer)];
-
-        // determine the size of the compressed buffer
-        fillBuffer(fourByteBuffer);
-        compressedBuffer = new byte[toInt(fourByteBuffer)];
-
-        bufferIndex = buffer.length;	// indicate that buffer is empty
-    }
-
-    private int toInt(final byte[] data) {
-        return ((data[0] & 0xFF) << 24)
-                | ((data[1] & 0xFF) << 16)
-                | ((data[2] & 0xFF) << 8)
-                | (data[3] & 0xFF);
-    }
-
-    protected void bufferAndDecompress() throws IOException {
-        if (allDataRead) {
-            eos = true;
-            return;
-        }
-
-        readChunkHeader();
-        fillBuffer(compressedBuffer);
-
-        inflater.setInput(compressedBuffer);
-        try {
-            inflater.inflate(buffer);
-        } catch (final DataFormatException e) {
-            throw new IOException(e);
-        }
-        inflater.reset();
-
-        bufferIndex = 0;
-        final int moreDataByte = in.read();
-        if (moreDataByte < 1) {
-            allDataRead = true;
-        } else if (moreDataByte > 1) {
-            throw new IOException("Expected indicator of whether or not more data was to come (-1, 0, or 1) but got " + moreDataByte);
-        }
-    }
-
-    private void fillBuffer(final byte[] buffer) throws IOException {
-        int len;
-        int bytesLeft = buffer.length;
-        int bytesRead = 0;
-        while (bytesLeft > 0 && (len = in.read(buffer, bytesRead, bytesLeft)) > 0) {
-            bytesLeft -= len;
-            bytesRead += len;
-        }
-
-        if (bytesRead < buffer.length) {
-            throw new EOFException();
-        }
-    }
-
-    private boolean isBufferEmpty() {
-        return bufferIndex >= buffer.length;
-    }
-
-    @Override
-    public int read() throws IOException {
-        if (eos) {
-            return -1;
-        }
-
-        if (isBufferEmpty()) {
-            bufferAndDecompress();
-        }
-
-        if (isBufferEmpty()) {
-            eos = true;
-            return -1;
-        }
-
-        return buffer[bufferIndex++];
-    }
-
-    @Override
-    public int read(final byte[] b) throws IOException {
-        return read(b, 0, b.length);
-    }
-
-    @Override
-    public int read(final byte[] b, final int off, final int len) throws IOException {
-        if (eos) {
-            return -1;
-        }
-
-        if (isBufferEmpty()) {
-            bufferAndDecompress();
-        }
-
-        if (isBufferEmpty()) {
-            eos = true;
-            return -1;
-        }
-
-        final int free = buffer.length - bufferIndex;
-        final int bytesToTransfer = Math.min(len, free);
-        System.arraycopy(buffer, bufferIndex, b, off, bytesToTransfer);
-        bufferIndex += bytesToTransfer;
-
-        return bytesToTransfer;
-    }
-
-    /**
-     * Does nothing. Does NOT close underlying InputStream
-     * @throws java.io.IOException
-     */
-    @Override
-    public void close() throws IOException {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
deleted file mode 100644
index bc46b0f..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.zip.Deflater;
-
-public class CompressionOutputStream extends OutputStream {
-
-    public static final byte[] SYNC_BYTES = new byte[]{'S', 'Y', 'N', 'C'};
-
-    public static final int DEFAULT_COMPRESSION_LEVEL = 1;
-    public static final int DEFAULT_BUFFER_SIZE = 64 << 10;
-    public static final int MIN_BUFFER_SIZE = 8 << 10;
-
-    private final OutputStream out;
-    private final Deflater deflater;
-
-    private final byte[] buffer;
-    private final byte[] compressed;
-
-    private int bufferIndex = 0;
-    private boolean dataWritten = false;
-
-    public CompressionOutputStream(final OutputStream outStream) {
-        this(outStream, DEFAULT_BUFFER_SIZE);
-    }
-
-    public CompressionOutputStream(final OutputStream outStream, final int bufferSize) {
-        this(outStream, bufferSize, DEFAULT_COMPRESSION_LEVEL, Deflater.DEFAULT_STRATEGY);
-    }
-
-    public CompressionOutputStream(final OutputStream outStream, final int bufferSize, final int level, final int strategy) {
-        if (bufferSize < MIN_BUFFER_SIZE) {
-            throw new IllegalArgumentException("Buffer size must be at least " + MIN_BUFFER_SIZE);
-        }
-
-        this.out = outStream;
-        this.deflater = new Deflater(level);
-        this.deflater.setStrategy(strategy);
-        buffer = new byte[bufferSize];
-        compressed = new byte[bufferSize + 64];
-    }
-
-    /**
-     * Compresses the currently buffered chunk of data and sends it to the
-     * output stream
-     *
-     * @throws IOException
-     */
-    protected void compressAndWrite() throws IOException {
-        if (bufferIndex <= 0) {
-            return;
-        }
-
-        deflater.setInput(buffer, 0, bufferIndex);
-        deflater.finish();
-        final int compressedBytes = deflater.deflate(compressed);
-
-        writeChunkHeader(compressedBytes);
-        out.write(compressed, 0, compressedBytes);
-
-        bufferIndex = 0;
-        deflater.reset();
-    }
-
-    private void writeChunkHeader(final int compressedBytes) throws IOException {
-        // If we have already written data, write out a '1' to indicate that we have more data; when we close
-        // the stream, we instead write a '0' to indicate that we are finished sending data.
-        if (dataWritten) {
-            out.write(1);
-        }
-        out.write(SYNC_BYTES);
-        dataWritten = true;
-
-        writeInt(out, bufferIndex);
-        writeInt(out, compressedBytes);
-    }
-
-    private void writeInt(final OutputStream out, final int val) throws IOException {
-        out.write(val >>> 24);
-        out.write(val >>> 16);
-        out.write(val >>> 8);
-        out.write(val);
-    }
-
-    protected boolean bufferFull() {
-        return bufferIndex >= buffer.length;
-    }
-
-    @Override
-    public void write(final int b) throws IOException {
-        buffer[bufferIndex++] = (byte) (b & 0xFF);
-        if (bufferFull()) {
-            compressAndWrite();
-        }
-    }
-
-    @Override
-    public void write(final byte[] b) throws IOException {
-        write(b, 0, b.length);
-    }
-
-    @Override
-    public void write(final byte[] b, final int off, final int len) throws IOException {
-        int bytesLeft = len;
-        while (bytesLeft > 0) {
-            final int free = buffer.length - bufferIndex;
-            final int bytesThisIteration = Math.min(bytesLeft, free);
-            System.arraycopy(b, off + len - bytesLeft, buffer, bufferIndex, bytesThisIteration);
-            bufferIndex += bytesThisIteration;
-
-            bytesLeft -= bytesThisIteration;
-            if (bufferFull()) {
-                compressAndWrite();
-            }
-        }
-    }
-
-    @Override
-    public void flush() throws IOException {
-        compressAndWrite();
-        super.flush();
-    }
-
-    @Override
-    public void close() throws IOException {
-        compressAndWrite();
-        out.write(0);   // indicate that the stream is finished.
-        out.flush();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
deleted file mode 100644
index e03dfbf..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
-
-public class InterruptableInputStream extends InputStream {
-
-    private volatile boolean interrupted = false;
-    private final InputStream in;
-
-    public InterruptableInputStream(final InputStream in) {
-        this.in = in;
-    }
-
-    @Override
-    public int read() throws IOException {
-        if (interrupted) {
-            throw new TransmissionDisabledException();
-        }
-
-        return in.read();
-    }
-
-    @Override
-    public int read(byte[] b) throws IOException {
-        if (interrupted) {
-            throw new TransmissionDisabledException();
-        }
-
-        return in.read(b);
-    }
-
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-        if (interrupted) {
-            throw new TransmissionDisabledException();
-        }
-
-        return in.read(b, off, len);
-    }
-
-    @Override
-    public int available() throws IOException {
-        if (interrupted) {
-            throw new TransmissionDisabledException();
-        }
-
-        return in.available();
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (interrupted) {
-            throw new TransmissionDisabledException();
-        }
-
-        in.close();
-    }
-
-    @Override
-    public synchronized void mark(int readlimit) {
-        if (interrupted) {
-            throw new TransmissionDisabledException();
-        }
-
-        in.mark(readlimit);
-    }
-
-    @Override
-    public boolean markSupported() {
-        if (interrupted) {
-            throw new TransmissionDisabledException();
-        }
-
-        return in.markSupported();
-    }
-
-    @Override
-    public synchronized void reset() throws IOException {
-        if (interrupted) {
-            throw new TransmissionDisabledException();
-        }
-
-        in.reset();
-    }
-
-    @Override
-    public long skip(long n) throws IOException {
-        if (interrupted) {
-            throw new TransmissionDisabledException();
-        }
-
-        return in.skip(n);
-    }
-
-    public void interrupt() {
-        interrupted = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
deleted file mode 100644
index cba5be6..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
-
-public class InterruptableOutputStream extends OutputStream {
-
-    private final OutputStream out;
-    private volatile boolean interrupted = false;
-
-    public InterruptableOutputStream(final OutputStream out) {
-        this.out = out;
-    }
-
-    @Override
-    public void write(int b) throws IOException {
-        if (interrupted) {
-            throw new TransmissionDisabledException();
-        }
-
-        out.write(b);
-    }
-
-    @Override
-    public void write(byte[] b) throws IOException {
-        if (interrupted) {
-            throw new TransmissionDisabledException();
-        }
-
-        out.write(b);
-    }
-
-    @Override
-    public void write(byte[] b, int off, int len) throws IOException {
-        if (interrupted) {
-            throw new TransmissionDisabledException();
-        }
-
-        out.write(b, off, len);
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (interrupted) {
-            throw new TransmissionDisabledException();
-        }
-
-        out.close();
-    }
-
-    @Override
-    public void flush() throws IOException {
-        if (interrupted) {
-            throw new TransmissionDisabledException();
-        }
-
-        out.flush();
-    }
-
-    public void interrupt() {
-        this.interrupted = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
deleted file mode 100644
index 68913bd..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket;
-
-import java.nio.ByteBuffer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BufferStateManager {
-
-    private static final Logger logger = LoggerFactory.getLogger(BufferStateManager.class);
-
-    private ByteBuffer buffer;
-    private Direction direction = Direction.WRITE;
-
-    public BufferStateManager(final ByteBuffer buffer) {
-        this.buffer = buffer;
-    }
-
-    public BufferStateManager(final ByteBuffer buffer, final Direction direction) {
-        this.buffer = buffer;
-        this.direction = direction;
-    }
-
-    /**
-     * Ensures that the buffer is at least as big as the size specified,
-     * resizing the buffer if necessary. This operation MAY change the direction
-     * of the buffer.
-     *
-     * @param requiredSize
-     */
-    public void ensureSize(final int requiredSize) {
-        if (buffer.capacity() < requiredSize) {
-            final ByteBuffer newBuffer = ByteBuffer.allocate(requiredSize);
-
-            // we have to read buffer so make sure the direction is correct.
-            if (direction == Direction.WRITE) {
-                buffer.flip();
-            }
-
-            // Copy from buffer to newBuffer
-            newBuffer.put(buffer);
-
-            // Swap the buffers
-            buffer = newBuffer;
-
-            // the new buffer is ready to be written to
-            direction = Direction.WRITE;
-        }
-    }
-
-    public ByteBuffer prepareForWrite(final int requiredSize) {
-        ensureSize(requiredSize);
-
-        if (direction == Direction.READ) {
-            direction = Direction.WRITE;
-            buffer.position(buffer.limit());
-        }
-
-        buffer.limit(buffer.capacity());
-        return buffer;
-    }
-
-    public ByteBuffer prepareForRead(final int requiredSize) {
-        ensureSize(requiredSize);
-
-        if (direction == Direction.WRITE) {
-            direction = Direction.READ;
-            buffer.flip();
-        }
-
-        return buffer;
-    }
-
-    /**
-     * Clears the contents of the buffer and sets direction to WRITE
-     */
-    public void clear() {
-        logger.debug("Clearing {}", buffer);
-        buffer.clear();
-        direction = Direction.WRITE;
-    }
-
-    public void compact() {
-        final String before = buffer.toString();
-        buffer.compact();
-        logger.debug("Before compact: {}, after: {}", before, buffer);
-        direction = Direction.WRITE;
-    }
-
-    public static enum Direction {
-
-        READ, WRITE;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
deleted file mode 100644
index 32a3f26..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.SocketChannel;
-import java.util.concurrent.TimeUnit;
-
-public class SocketChannelInputStream extends InputStream {
-
-    private static final long CHANNEL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
-    private final SocketChannel channel;
-    private volatile int timeoutMillis = 30000;
-
-    private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
-    private Byte bufferedByte = null;
-
-    public SocketChannelInputStream(final SocketChannel socketChannel) throws IOException {
-        // this class expects a non-blocking channel
-        socketChannel.configureBlocking(false);
-        this.channel = socketChannel;
-    }
-
-    public void setTimeout(final int timeoutMillis) {
-        this.timeoutMillis = timeoutMillis;
-    }
-
-    @Override
-    public int read() throws IOException {
-        if (bufferedByte != null) {
-            final int retVal = bufferedByte & 0xFF;
-            bufferedByte = null;
-            return retVal;
-        }
-
-        oneByteBuffer.flip();
-        oneByteBuffer.clear();
-
-        final long maxTime = System.currentTimeMillis() + timeoutMillis;
-        int bytesRead;
-        do {
-            bytesRead = channel.read(oneByteBuffer);
-            if (bytesRead == 0) {
-                if (System.currentTimeMillis() > maxTime) {
-                    throw new SocketTimeoutException("Timed out reading from socket");
-                }
-                try {
-                    TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS);
-                } catch (InterruptedException e) {
-                    close();
-                    Thread.currentThread().interrupt(); // set the interrupt status
-                    throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation
-                }
-            }
-        } while (bytesRead == 0);
-
-        if (bytesRead == -1) {
-            return -1;
-        }
-        oneByteBuffer.flip();
-        return oneByteBuffer.get() & 0xFF;
-    }
-
-    @Override
-    public int read(final byte[] b) throws IOException {
-        return read(b, 0, b.length);
-    }
-
-    @Override
-    public int read(final byte[] b, final int off, final int len) throws IOException {
-        if (bufferedByte != null) {
-            final byte retVal = bufferedByte;
-            bufferedByte = null;
-            b[off] = retVal;
-            return 1;
-        }
-
-        final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
-
-        final long maxTime = System.currentTimeMillis() + timeoutMillis;
-        int bytesRead;
-        do {
-            bytesRead = channel.read(buffer);
-            if (bytesRead == 0) {
-                if (System.currentTimeMillis() > maxTime) {
-                    throw new SocketTimeoutException("Timed out reading from socket");
-                }
-                try {
-                    TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS);
-                } catch (InterruptedException e) {
-                    close();
-                    Thread.currentThread().interrupt(); // set the interrupt status
-                    throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation
-                }
-            }
-        } while (bytesRead == 0);
-
-        return bytesRead;
-    }
-
-    @Override
-    public int available() throws IOException {
-        if (bufferedByte != null) {
-            return 1;
-        }
-
-        isDataAvailable(); // attempt to read from socket
-        return (bufferedByte == null) ? 0 : 1;
-    }
-
-    public boolean isDataAvailable() throws IOException {
-        if (bufferedByte != null) {
-            return true;
-        }
-
-        oneByteBuffer.flip();
-        oneByteBuffer.clear();
-        final int bytesRead = channel.read(oneByteBuffer);
-        if (bytesRead == -1) {
-            throw new EOFException("Peer has closed the stream");
-        }
-        if (bytesRead > 0) {
-            oneByteBuffer.flip();
-            bufferedByte = oneByteBuffer.get();
-            return true;
-        }
-        return false;
-    }
-
-    /**
-     * Closes the underlying socket channel.
-     * @throws java.io.IOException
-     */
-    @Override
-    public void close() throws IOException {
-        channel.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
deleted file mode 100644
index 77049ad..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.SocketChannel;
-import java.util.concurrent.TimeUnit;
-
-public class SocketChannelOutputStream extends OutputStream {
-
-    private static final long CHANNEL_FULL_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
-    private final SocketChannel channel;
-    private volatile int timeout = 30000;
-
-    private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
-
-    public SocketChannelOutputStream(final SocketChannel socketChannel) throws IOException {
-        // this class expects a non-blocking channel
-        socketChannel.configureBlocking(false);
-        this.channel = socketChannel;
-    }
-
-    public void setTimeout(final int timeoutMillis) {
-        this.timeout = timeoutMillis;
-    }
-
-    @Override
-    public void write(final int b) throws IOException {
-        oneByteBuffer.flip();
-        oneByteBuffer.clear();
-        oneByteBuffer.put((byte) b);
-        oneByteBuffer.flip();
-
-        final int timeoutMillis = this.timeout;
-        long maxTime = System.currentTimeMillis() + timeoutMillis;
-        int bytesWritten;
-        while (oneByteBuffer.hasRemaining()) {
-            bytesWritten = channel.write(oneByteBuffer);
-            if (bytesWritten == 0) {
-                if (System.currentTimeMillis() > maxTime) {
-                    throw new SocketTimeoutException("Timed out writing to socket");
-                }
-                try {
-                    TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS);
-                } catch (InterruptedException e) {
-                    close();
-                    Thread.currentThread().interrupt(); // set the interrupt status
-                    throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
-                }
-            } else {
-                return;
-            }
-        }
-    }
-
-    @Override
-    public void write(final byte[] b) throws IOException {
-        write(b, 0, b.length);
-    }
-
-    @Override
-    public void write(final byte[] b, final int off, final int len) throws IOException {
-        final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
-
-        final int timeoutMillis = this.timeout;
-        long maxTime = System.currentTimeMillis() + timeoutMillis;
-        int bytesWritten;
-        while (buffer.hasRemaining()) {
-            bytesWritten = channel.write(buffer);
-            if (bytesWritten == 0) {
-                if (System.currentTimeMillis() > maxTime) {
-                    throw new SocketTimeoutException("Timed out writing to socket");
-                }
-                try {
-                    TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS);
-                } catch (InterruptedException e) {
-                    close();
-                    Thread.currentThread().interrupt(); // set the interrupt status
-                    throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
-                }
-            } else {
-                maxTime = System.currentTimeMillis() + timeoutMillis;
-            }
-        }
-    }
-
-    /**
-     * Closes the underlying SocketChannel
-     * @throws java.io.IOException
-     */
-    @Override
-    public void close() throws IOException {
-        channel.close();
-    }
-}


Mime
View raw message