nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [38/51] [partial] incubator-nifi git commit: Reworked overall directory structure to make releasing nifi vs maven plugis easier
Date Fri, 16 Jan 2015 02:29:55 GMT
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
deleted file mode 100644
index ae075b5..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class LeakyBucketStreamThrottler implements StreamThrottler {
-
-    private final int maxBytesPerSecond;
-    private final BlockingQueue<Request> requestQueue = new LinkedBlockingQueue<Request>();
-    private final ScheduledExecutorService executorService;
-    private final AtomicBoolean shutdown = new AtomicBoolean(false);
-
-    public LeakyBucketStreamThrottler(final int maxBytesPerSecond) {
-        this.maxBytesPerSecond = maxBytesPerSecond;
-
-        executorService = Executors.newSingleThreadScheduledExecutor();
-        final Runnable task = new Drain();
-        executorService.scheduleAtFixedRate(task, 0, 1000, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public void close() {
-        this.shutdown.set(true);
-
-        executorService.shutdown();
-        try {
-            // Should not take more than 2 seconds because we run every second. If it takes more than
-            // 2 seconds, it is because the Runnable thread is blocking on a write; in this case,
-            // we will just ignore it and return
-            executorService.awaitTermination(2, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-        }
-    }
-
-    @Override
-    public OutputStream newThrottledOutputStream(final OutputStream toWrap) {
-        return new OutputStream() {
-            @Override
-            public void write(final int b) throws IOException {
-                write(new byte[]{(byte) b}, 0, 1);
-            }
-
-            @Override
-            public void write(byte[] b) throws IOException {
-                write(b, 0, b.length);
-            }
-
-            @Override
-            public void write(byte[] b, int off, int len) throws IOException {
-                final InputStream in = new ByteArrayInputStream(b, off, len);
-                LeakyBucketStreamThrottler.this.copy(in, toWrap);
-            }
-
-            @Override
-            public void close() throws IOException {
-                toWrap.close();
-            }
-
-            @Override
-            public void flush() throws IOException {
-                toWrap.flush();
-            }
-        };
-    }
-
-    @Override
-    public InputStream newThrottledInputStream(final InputStream toWrap) {
-        return new InputStream() {
-            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
-            @Override
-            public int read() throws IOException {
-                final ByteArrayOutputStream baos = new ByteArrayOutputStream(1);
-                LeakyBucketStreamThrottler.this.copy(toWrap, baos, 1L);
-                if (baos.getBufferLength() < 1) {
-                    return -1;
-                }
-
-                return baos.getUnderlyingBuffer()[0] & 0xFF;
-            }
-
-            @Override
-            public int read(final byte[] b) throws IOException {
-                if(b.length == 0){
-                    return 0;
-                }
-                return read(b, 0, b.length);
-            }
-
-            @Override
-            public int read(byte[] b, int off, int len) throws IOException {
-                if ( len < 0 ) {
-                    throw new IllegalArgumentException();
-                }
-                if ( len == 0 ) {
-                    return 0;
-                }
-                
-                baos.reset();
-                final int copied = (int) LeakyBucketStreamThrottler.this.copy(toWrap, baos, len);
-                if (copied == 0) {
-                    return -1;
-                }
-                System.arraycopy(baos.getUnderlyingBuffer(), 0, b, off, copied);
-                return copied;
-            }
-
-            @Override
-            public void close() throws IOException {
-                toWrap.close();
-            }
-
-            @Override
-            public int available() throws IOException {
-                return toWrap.available();
-            }
-        };
-    }
-
-    @Override
-    public long copy(final InputStream in, final OutputStream out) throws IOException {
-        return copy(in, out, -1);
-    }
-
-    @Override
-    public long copy(final InputStream in, final OutputStream out, final long maxBytes) throws IOException {
-        long totalBytesCopied = 0;
-        boolean finished = false;
-        while (!finished) {
-            final long requestMax = (maxBytes < 0) ? Long.MAX_VALUE : maxBytes - totalBytesCopied;
-            final Request request = new Request(in, out, requestMax);
-            boolean transferred = false;
-            while (!transferred) {
-                if (shutdown.get()) {
-                    throw new IOException("Throttler shutdown");
-                }
-
-                try {
-                    transferred = requestQueue.offer(request, 1000, TimeUnit.MILLISECONDS);
-                } catch (final InterruptedException e) {
-                    throw new IOException("Interrupted", e);
-                }
-            }
-
-            final BlockingQueue<Response> responseQueue = request.getResponseQueue();
-            Response response = null;
-            while (response == null) {
-                try {
-                    if (shutdown.get()) {
-                        throw new IOException("Throttler shutdown");
-                    }
-                    response = responseQueue.poll(1000L, TimeUnit.MILLISECONDS);
-                } catch (InterruptedException e) {
-                    throw new IOException("Interrupted", e);
-                }
-            }
-
-            if (!response.isSuccess()) {
-                throw response.getError();
-            }
-
-            totalBytesCopied += response.getBytesCopied();
-            finished = (response.getBytesCopied() == 0) || (totalBytesCopied >= maxBytes && maxBytes > 0);
-        }
-
-        return totalBytesCopied;
-    }
-
-    /**
-     * This class is responsible for draining water from the leaky bucket. I.e.,
-     * it actually moves the data
-     */
-    private class Drain implements Runnable {
-
-        private final byte[] buffer;
-
-        public Drain() {
-            final int bufferSize = Math.min(4096, maxBytesPerSecond);
-            buffer = new byte[bufferSize];
-        }
-
-        @Override
-        public void run() {
-            final long start = System.currentTimeMillis();
-
-            int bytesTransferred = 0;
-            while (bytesTransferred < maxBytesPerSecond) {
-                final long maxMillisToWait = 1000 - (System.currentTimeMillis() - start);
-                if (maxMillisToWait < 1) {
-                    return;
-                }
-
-                try {
-                    final Request request = requestQueue.poll(maxMillisToWait, TimeUnit.MILLISECONDS);
-                    if (request == null) {
-                        return;
-                    }
-
-                    final BlockingQueue<Response> responseQueue = request.getResponseQueue();
-
-                    final OutputStream out = request.getOutputStream();
-                    final InputStream in = request.getInputStream();
-
-                    try {
-                        final long requestMax = request.getMaxBytesToCopy();
-                        long maxBytesToTransfer;
-                        if (requestMax < 0) {
-                            maxBytesToTransfer = Math.min(buffer.length, maxBytesPerSecond - bytesTransferred);
-                        } else {
-                            maxBytesToTransfer = Math.min(requestMax,
-                                    Math.min(buffer.length, maxBytesPerSecond - bytesTransferred));
-                        }
-                        maxBytesToTransfer = Math.max(1L, maxBytesToTransfer);
-
-                        final int bytesCopied = fillBuffer(in, maxBytesToTransfer);
-                        out.write(buffer, 0, bytesCopied);
-
-                        final Response response = new Response(true, bytesCopied);
-                        responseQueue.put(response);
-                        bytesTransferred += bytesCopied;
-                    } catch (final IOException e) {
-                        final Response response = new Response(e);
-                        responseQueue.put(response);
-                    }
-                } catch (InterruptedException e) {
-                }
-            }
-        }
-
-        private int fillBuffer(final InputStream in, final long maxBytes) throws IOException {
-            int bytesRead = 0;
-            int len;
-            while (bytesRead < maxBytes && (len = in.read(buffer, bytesRead, (int) Math.min(maxBytes - bytesRead, buffer.length - bytesRead))) > 0) {
-                bytesRead += len;
-            }
-
-            return bytesRead;
-        }
-    }
-
-    private static class Response {
-
-        private final boolean success;
-        private final IOException error;
-        private final int bytesCopied;
-
-        public Response(final boolean success, final int bytesCopied) {
-            this.success = success;
-            this.bytesCopied = bytesCopied;
-            this.error = null;
-        }
-
-        public Response(final IOException error) {
-            this.success = false;
-            this.error = error;
-            this.bytesCopied = -1;
-        }
-
-        public boolean isSuccess() {
-            return success;
-        }
-
-        public IOException getError() {
-            return error;
-        }
-
-        public int getBytesCopied() {
-            return bytesCopied;
-        }
-    }
-
-    private static class Request {
-
-        private final OutputStream out;
-        private final InputStream in;
-        private final long maxBytesToCopy;
-        private final BlockingQueue<Response> responseQueue;
-
-        public Request(final InputStream in, final OutputStream out, final long maxBytesToCopy) {
-            this.out = out;
-            this.in = in;
-            this.maxBytesToCopy = maxBytesToCopy;
-            this.responseQueue = new LinkedBlockingQueue<Response>(1);
-        }
-
-        public BlockingQueue<Response> getResponseQueue() {
-            return this.responseQueue;
-        }
-
-        public OutputStream getOutputStream() {
-            return out;
-        }
-
-        public InputStream getInputStream() {
-            return in;
-        }
-
-        public long getMaxBytesToCopy() {
-            return maxBytesToCopy;
-        }
-
-        @Override
-        public String toString() {
-            return "Request[maxBytes=" + maxBytesToCopy + "]";
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java
deleted file mode 100644
index 0e75a22..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Wraps and InputStream so that the underlying InputStream cannot be closed.
- * This is used so that the InputStream can be wrapped with yet another
- * InputStream and prevent the outer layer from closing the inner InputStream
- */
-public class NonCloseableInputStream extends FilterInputStream {
-
-    private final InputStream toWrap;
-
-    public NonCloseableInputStream(final InputStream toWrap) {
-        super(toWrap);
-        this.toWrap = toWrap;
-    }
-
-    @Override
-    public int read() throws IOException {
-        return toWrap.read();
-    }
-
-    @Override
-    public int read(byte[] b) throws IOException {
-        return toWrap.read(b);
-    }
-
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-        return toWrap.read(b, off, len);
-    }
-
-    @Override
-    public void close() throws IOException {
-        // do nothing
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java
deleted file mode 100644
index 9c77637..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-public class NonCloseableOutputStream extends FilterOutputStream {
-
-    private final OutputStream out;
-
-    public NonCloseableOutputStream(final OutputStream out) {
-        super(out);
-        this.out = out;
-    }
-
-    @Override
-    public void write(byte[] b) throws IOException {
-        out.write(b);
-    }
-
-    @Override
-    public void write(byte[] b, int off, int len) throws IOException {
-        out.write(b, off, len);
-    }
-
-    @Override
-    public void write(int b) throws IOException {
-        out.write(b);
-    }
-
-    @Override
-    public void close() throws IOException {
-        out.flush();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java
deleted file mode 100644
index 8452761..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * OutputStream that throws away all data, just like as if writing to /dev/null
- */
-public class NullOutputStream extends OutputStream {
-
-    @Override
-    public void write(final int b) throws IOException {
-    }
-
-    @Override
-    public void write(final byte[] b) throws IOException {
-    }
-
-    @Override
-    public void write(final byte[] b, int off, int len) throws IOException {
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-
-    @Override
-    public void flush() throws IOException {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
deleted file mode 100644
index 9158050..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public interface StreamThrottler extends Closeable {
-
-    long copy(InputStream in, OutputStream out) throws IOException;
-
-    long copy(InputStream in, OutputStream out, long maxBytes) throws IOException;
-
-    InputStream newThrottledInputStream(final InputStream toWrap);
-
-    OutputStream newThrottledOutputStream(final OutputStream toWrap);
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java
deleted file mode 100644
index 8e3d606..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.nifi.stream.io.exception.BytePatternNotFoundException;
-import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
-
-public class StreamUtils {
-
-    public static long copy(final InputStream source, final OutputStream destination) throws IOException {
-        final byte[] buffer = new byte[8192];
-        int len;
-        long totalCount = 0L;
-        while ((len = source.read(buffer)) > 0) {
-            destination.write(buffer, 0, len);
-            totalCount += len;
-        }
-        return totalCount;
-    }
-
-    /**
-     * Copies <code>numBytes</code> from <code>source</code> to
-     * <code>destination</code>. If <code>numBytes</code> are not available from
-     * <code>source</code>, throws EOFException
-     *
-     * @param source
-     * @param destination
-     * @param numBytes
-     * @throws IOException
-     */
-    public static void copy(final InputStream source, final OutputStream destination, final long numBytes) throws IOException {
-        final byte[] buffer = new byte[8192];
-        int len;
-        long bytesLeft = numBytes;
-        while ((len = source.read(buffer, 0, (int) Math.min(bytesLeft, buffer.length))) > 0) {
-            destination.write(buffer, 0, len);
-            bytesLeft -= len;
-        }
-
-        if (bytesLeft > 0) {
-            throw new EOFException("Attempted to copy " + numBytes + " bytes but only " + (numBytes - bytesLeft) + " bytes were available");
-        }
-    }
-
-    /**
-     * Reads data from the given input stream, copying it to the destination
-     * byte array. If the InputStream has less data than the given byte array,
-     * throws an EOFException
-     *
-     * @param source
-     * @param destination
-     * @throws IOException
-     */
-    public static void fillBuffer(final InputStream source, final byte[] destination) throws IOException {
-        fillBuffer(source, destination, true);
-    }
-
-    /**
-     * Reads data from the given input stream, copying it to the destination
-     * byte array. If the InputStream has less data than the given byte array,
-     * throws an EOFException if <code>ensureCapacity</code> is true and
-     * otherwise returns the number of bytes copied
-     *
-     * @param source
-     * @param destination
-     * @param ensureCapacity whether or not to enforce that the InputStream have
-     * at least as much data as the capacity of the destination byte array
-     * @return 
-     * @throws IOException
-     */
-    public static int fillBuffer(final InputStream source, final byte[] destination, final boolean ensureCapacity) throws IOException {
-        int bytesRead = 0;
-        int len;
-        while (bytesRead < destination.length) {
-            len = source.read(destination, bytesRead, destination.length - bytesRead);
-            if (len < 0) {
-                if (ensureCapacity) {
-                    throw new EOFException();
-                } else {
-                    break;
-                }
-            }
-
-            bytesRead += len;
-        }
-
-        return bytesRead;
-    }
-
-    /**
-     * Copies data from in to out until either we are out of data (returns null)
-     * or we hit one of the byte patterns identified by the
-     * <code>stoppers</code> parameter (returns the byte pattern matched). The
-     * bytes in the stopper will be copied.
-     *
-     * @param in
-     * @param out
-     * @param maxBytes
-     * @param stoppers
-     * @return the byte array matched, or null if end of stream was reached
-     * @throws IOException
-     */
-    public static byte[] copyInclusive(final InputStream in, final OutputStream out, final int maxBytes, final byte[]... stoppers) throws IOException {
-        if (stoppers.length == 0) {
-            return null;
-        }
-
-        final List<NonThreadSafeCircularBuffer> circularBuffers = new ArrayList<NonThreadSafeCircularBuffer>();
-        for (final byte[] stopper : stoppers) {
-            circularBuffers.add(new NonThreadSafeCircularBuffer(stopper));
-        }
-
-        long bytesRead = 0;
-        while (true) {
-            final int next = in.read();
-            if (next == -1) {
-                return null;
-            } else if (maxBytes > 0 && ++bytesRead >= maxBytes) {
-                throw new BytePatternNotFoundException("Did not encounter any byte pattern that was expected; data does not appear to be in the expected format");
-            }
-
-            out.write(next);
-
-            for (final NonThreadSafeCircularBuffer circ : circularBuffers) {
-                if (circ.addAndCompare((byte) next)) {
-                    return circ.getByteArray();
-                }
-            }
-        }
-    }
-
-    /**
-     * Copies data from in to out until either we are out of data (returns null)
-     * or we hit one of the byte patterns identified by the
-     * <code>stoppers</code> parameter (returns the byte pattern matched). The
-     * byte pattern matched will NOT be copied to the output and will be un-read
-     * from the input.
-     *
-     * @param in
-     * @param out
-     * @param maxBytes
-     * @param stoppers
-     * @return the byte array matched, or null if end of stream was reached
-     * @throws IOException
-     */
-    public static byte[] copyExclusive(final InputStream in, final OutputStream out, final int maxBytes, final byte[]... stoppers) throws IOException {
-        if (stoppers.length == 0) {
-            return null;
-        }
-
-        int longest = 0;
-        NonThreadSafeCircularBuffer longestBuffer = null;
-        final List<NonThreadSafeCircularBuffer> circularBuffers = new ArrayList<NonThreadSafeCircularBuffer>();
-        for (final byte[] stopper : stoppers) {
-            final NonThreadSafeCircularBuffer circularBuffer = new NonThreadSafeCircularBuffer(stopper);
-            if (stopper.length > longest) {
-                longest = stopper.length;
-                longestBuffer = circularBuffer;
-                circularBuffers.add(0, circularBuffer);
-            } else {
-                circularBuffers.add(circularBuffer);
-            }
-        }
-
-        long bytesRead = 0;
-        while (true) {
-            final int next = in.read();
-            if (next == -1) {
-                return null;
-            } else if (maxBytes > 0 && bytesRead++ > maxBytes) {
-                throw new BytePatternNotFoundException("Did not encounter any byte pattern that was expected; data does not appear to be in the expected format");
-            }
-
-            for (final NonThreadSafeCircularBuffer circ : circularBuffers) {
-                if (circ.addAndCompare((byte) next)) {
-                    // The longest buffer has some data that may not have been written out yet; we need to make sure
-                    // that we copy out those bytes.
-                    final int bytesToCopy = longest - circ.getByteArray().length;
-                    for (int i = 0; i < bytesToCopy; i++) {
-                        final int oldestByte = longestBuffer.getOldestByte();
-                        if (oldestByte != -1) {
-                            out.write(oldestByte);
-                            longestBuffer.addAndCompare((byte) 0);
-                        }
-                    }
-
-                    return circ.getByteArray();
-                }
-            }
-
-            if (longestBuffer.isFilled()) {
-                out.write(longestBuffer.getOldestByte());
-            }
-        }
-    }
-
-    /**
-     * Skips the specified number of bytes from the InputStream
-     *
-     * If unable to skip that number of bytes, throws EOFException
-     *
-     * @param stream
-     * @param bytesToSkip
-     * @throws IOException
-     */
-    public static void skip(final InputStream stream, final long bytesToSkip) throws IOException {
-        if (bytesToSkip <= 0) {
-            return;
-        }
-        long totalSkipped = 0L;
-
-        // If we have a FileInputStream, calling skip(1000000) will return 1000000 even if the file is only
-        // 3 bytes. As a result, we will skip 1 less than the number requested, and then read the last
-        // byte in order to make sure that we've consumed the number of bytes requested. We then check that
-        // the final byte, which we read, is not -1.
-        final long actualBytesToSkip = bytesToSkip - 1;
-        while (totalSkipped < actualBytesToSkip) {
-            final long skippedThisIteration = stream.skip(actualBytesToSkip - totalSkipped);
-            if (skippedThisIteration == 0) {
-                final int nextByte = stream.read();
-                if (nextByte == -1) {
-                    throw new EOFException();
-                } else {
-                    totalSkipped++;
-                }
-            }
-
-            totalSkipped += skippedThisIteration;
-        }
-
-        final int lastByte = stream.read();
-        if (lastByte == -1) {
-            throw new EOFException();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java
deleted file mode 100644
index 2b9050d..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.OutputStream;
-
-/**
- * This class extends the {@link java.util.zip.ZipOutputStream} by providing a
- * constructor that allows the user to specify the compression level. The
- * default compression level is 1, as opposed to Java's default of 5.
- */
-public class ZipOutputStream extends java.util.zip.ZipOutputStream {
-
-    public static final int DEFAULT_COMPRESSION_LEVEL = 1;
-
-    public ZipOutputStream(final OutputStream out) {
-        this(out, DEFAULT_COMPRESSION_LEVEL);
-    }
-
-    public ZipOutputStream(final OutputStream out, final int compressionLevel) {
-        super(out);
-        def.setLevel(compressionLevel);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java
deleted file mode 100644
index 5d08616..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io.exception;
-
-import java.io.IOException;
-
-public class BytePatternNotFoundException extends IOException {
-
-    private static final long serialVersionUID = -4128911284318513973L;
-
-    public BytePatternNotFoundException(final String explanation) {
-        super(explanation);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java
deleted file mode 100644
index b4b4c17..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io.util;
-
-import java.util.Arrays;
-
-public class NonThreadSafeCircularBuffer {
-
-    private final byte[] lookingFor;
-    private final int[] buffer;
-    private int insertionPointer = 0;
-    private int bufferSize = 0;
-
-    public NonThreadSafeCircularBuffer(final byte[] lookingFor) {
-        this.lookingFor = lookingFor;
-        buffer = new int[lookingFor.length];
-        Arrays.fill(buffer, -1);
-    }
-
-    public byte[] getByteArray() {
-        return lookingFor;
-    }
-
-    /**
-     * Returns the oldest byte in the buffer
-     *
-     * @return
-     */
-    public int getOldestByte() {
-        return buffer[insertionPointer];
-    }
-
-    public boolean isFilled() {
-        return bufferSize >= buffer.length;
-    }
-
-    public boolean addAndCompare(final byte data) {
-        buffer[insertionPointer] = data;
-        insertionPointer = (insertionPointer + 1) % lookingFor.length;
-
-        bufferSize++;
-        if (bufferSize < lookingFor.length) {
-            return false;
-        }
-
-        for (int i = 0; i < lookingFor.length; i++) {
-            final byte compare = (byte) buffer[(insertionPointer + i) % lookingFor.length];
-            if (compare != lookingFor[i]) {
-                return false;
-            }
-        }
-
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java
deleted file mode 100644
index 92061e0..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-public class BooleanHolder extends ObjectHolder<Boolean> {
-
-    public BooleanHolder(final boolean initialValue) {
-        super(initialValue);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
deleted file mode 100644
index 805223f..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Locale;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class FormatUtils {
-
-    private static final String UNION = "|";
-
-    // for Data Sizes
-    private static final double BYTES_IN_KILOBYTE = 1024;
-    private static final double BYTES_IN_MEGABYTE = BYTES_IN_KILOBYTE * 1024;
-    private static final double BYTES_IN_GIGABYTE = BYTES_IN_MEGABYTE * 1024;
-    private static final double BYTES_IN_TERABYTE = BYTES_IN_GIGABYTE * 1024;
-
-    // for Time Durations
-    private static final String NANOS = join(UNION, "ns", "nano", "nanos", "nanoseconds");
-    private static final String MILLIS = join(UNION, "ms", "milli", "millis", "milliseconds");
-    private static final String SECS = join(UNION, "s", "sec", "secs", "second", "seconds");
-    private static final String MINS = join(UNION, "m", "min", "mins", "minute", "minutes");
-    private static final String HOURS = join(UNION, "h", "hr", "hrs", "hour", "hours");
-    private static final String DAYS = join(UNION, "d", "day", "days");
-
-    private static final String VALID_TIME_UNITS = join(UNION, NANOS, MILLIS, SECS, MINS, HOURS, DAYS);
-    public static final String TIME_DURATION_REGEX = "(\\d+)\\s*(" + VALID_TIME_UNITS + ")";
-    public static final Pattern TIME_DURATION_PATTERN = Pattern.compile(TIME_DURATION_REGEX);
-
-    /**
-     * Formats the specified count by adding commas.
-     *
-     * @param count
-     * @return
-     */
-    public static String formatCount(final long count) {
-        return NumberFormat.getIntegerInstance().format(count);
-    }
-
-    /**
-     * Formats the specified duration in 'mm:ss.SSS' format.
-     *
-     * @param sourceDuration
-     * @param sourceUnit
-     * @return
-     */
-    public static String formatMinutesSeconds(final long sourceDuration, final TimeUnit sourceUnit) {
-        final long millis = TimeUnit.MILLISECONDS.convert(sourceDuration, sourceUnit);
-        final SimpleDateFormat formatter = new SimpleDateFormat("mm:ss.SSS", Locale.US);
-        return formatter.format(new Date(millis));
-    }
-
-    /**
-     * Formats the specified duration in 'HH:mm:ss.SSS' format.
-     *
-     * @param sourceDuration
-     * @param sourceUnit
-     * @return
-     */
-    public static String formatHoursMinutesSeconds(final long sourceDuration, final TimeUnit sourceUnit) {
-        final long millis = TimeUnit.MILLISECONDS.convert(sourceDuration, sourceUnit);
-        final long millisInHour = TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
-        final int hours = (int) (millis / millisInHour);
-        final long whatsLeft = millis - hours * millisInHour;
-
-        return pad(hours) + ":" + new SimpleDateFormat("mm:ss.SSS", Locale.US).format(new Date(whatsLeft));
-    }
-
-    private static String pad(final int val) {
-        return (val < 10) ? "0" + val : String.valueOf(val);
-    }
-
-    /**
-     * Formats the specified data size in human readable format.
-     *
-     * @param dataSize Data size in bytes
-     * @return Human readable format
-     */
-    public static String formatDataSize(final double dataSize) {
-        // initialize the formatter
-        final NumberFormat format = NumberFormat.getNumberInstance();
-        format.setMaximumFractionDigits(2);
-
-        // check terabytes
-        double dataSizeToFormat = dataSize / BYTES_IN_TERABYTE;
-        if (dataSizeToFormat > 1) {
-            return format.format(dataSizeToFormat) + " TB";
-        }
-
-        // check gigabytes
-        dataSizeToFormat = dataSize / BYTES_IN_GIGABYTE;
-        if (dataSizeToFormat > 1) {
-            return format.format(dataSizeToFormat) + " GB";
-        }
-
-        // check megabytes
-        dataSizeToFormat = dataSize / BYTES_IN_MEGABYTE;
-        if (dataSizeToFormat > 1) {
-            return format.format(dataSizeToFormat) + " MB";
-        }
-
-        // check kilobytes
-        dataSizeToFormat = dataSize / BYTES_IN_KILOBYTE;
-        if (dataSizeToFormat > 1) {
-            return format.format(dataSizeToFormat) + " KB";
-        }
-
-        // default to bytes
-        return format.format(dataSize) + " bytes";
-    }
-
-    public static long getTimeDuration(final String value, final TimeUnit desiredUnit) {
-        final Matcher matcher = TIME_DURATION_PATTERN.matcher(value.toLowerCase());
-        if (!matcher.matches()) {
-            throw new IllegalArgumentException("Value '" + value + "' is not a valid Time Duration");
-        }
-
-        final String duration = matcher.group(1);
-        final String units = matcher.group(2);
-        TimeUnit specifiedTimeUnit = null;
-        switch (units.toLowerCase()) {
-            case "ns":
-            case "nano":
-            case "nanos":
-            case "nanoseconds":
-                specifiedTimeUnit = TimeUnit.NANOSECONDS;
-                break;
-            case "ms":
-            case "milli":
-            case "millis":
-            case "milliseconds":
-                specifiedTimeUnit = TimeUnit.MILLISECONDS;
-                break;
-            case "s":
-            case "sec":
-            case "secs":
-            case "second":
-            case "seconds":
-                specifiedTimeUnit = TimeUnit.SECONDS;
-                break;
-            case "m":
-            case "min":
-            case "mins":
-            case "minute":
-            case "minutes":
-                specifiedTimeUnit = TimeUnit.MINUTES;
-                break;
-            case "h":
-            case "hr":
-            case "hrs":
-            case "hour":
-            case "hours":
-                specifiedTimeUnit = TimeUnit.HOURS;
-                break;
-            case "d":
-            case "day":
-            case "days":
-                specifiedTimeUnit = TimeUnit.DAYS;
-                break;
-        }
-
-        final long durationVal = Long.parseLong(duration);
-        return desiredUnit.convert(durationVal, specifiedTimeUnit);
-    }
-
-    public static String formatUtilization(final double utilization) {
-        return utilization + "%";
-    }
-
-    private static String join(final String delimiter, final String... values) {
-        if (values.length == 0) {
-            return "";
-        } else if (values.length == 1) {
-            return values[0];
-        }
-
-        final StringBuilder sb = new StringBuilder();
-        sb.append(values[0]);
-        for (int i = 1; i < values.length; i++) {
-            sb.append(delimiter).append(values[i]);
-        }
-
-        return sb.toString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java
deleted file mode 100644
index 213bbc0..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-public class IntegerHolder extends ObjectHolder<Integer> {
-
-    public IntegerHolder(final int initialValue) {
-        super(initialValue);
-    }
-
-    public int addAndGet(final int delta) {
-        final int curValue = get();
-        final int newValue = curValue + delta;
-        set(newValue);
-        return newValue;
-    }
-
-    public int getAndAdd(final int delta) {
-        final int curValue = get();
-        final int newValue = curValue + delta;
-        set(newValue);
-        return curValue;
-    }
-
-    public int incrementAndGet() {
-        return addAndGet(1);
-    }
-
-    public int getAndIncrement() {
-        return getAndAdd(1);
-    }
-
-    public int decrementAndGet() {
-        return addAndGet(-1);
-    }
-
-    public int getAndDecrement() {
-        return getAndAdd(-1);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java
deleted file mode 100644
index ef70ce8..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-/**
- * Wraps a Long value so that it can be declared <code>final</code> and still be
- * accessed from which inner classes; the functionality is similar to that of an
- * AtomicLong, but operations on this class are not atomic. This results in
- * greater performance when the atomicity is not needed.
- */
-public class LongHolder extends ObjectHolder<Long> {
-
-    public LongHolder(final long initialValue) {
-        super(initialValue);
-    }
-
-    public long addAndGet(final long delta) {
-        final long curValue = get();
-        final long newValue = curValue + delta;
-        set(newValue);
-        return newValue;
-    }
-
-    public long getAndAdd(final long delta) {
-        final long curValue = get();
-        final long newValue = curValue + delta;
-        set(newValue);
-        return curValue;
-    }
-
-    public long incrementAndGet() {
-        return addAndGet(1);
-    }
-
-    public long getAndIncrement() {
-        return getAndAdd(1);
-    }
-
-    public long decrementAndGet() {
-        return addAndGet(-1L);
-    }
-
-    public long getAndDecrement() {
-        return getAndAdd(-1L);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java
deleted file mode 100644
index 85bfd96..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.util.Arrays;
-
-/**
- * <p>
- * A RingBuffer that can be used to scan byte sequences for subsequences.
- * </p>
- *
- * <p>
- * This class implements an efficient naive search algorithm, which allows the
- * user of the library to identify byte sequences in a stream on-the-fly so that
- * the stream can be segmented without having to buffer the data.
- * </p>
- *
- * <p>
- * The intended usage paradigm is:
- * <code>
- * <pre>
- * final byte[] searchSequence = ...;
- * final CircularBuffer buffer = new CircularBuffer(searchSequence);
- * while ((int nextByte = in.read()) > 0) {
- *      if ( buffer.addAndCompare(nextByte) ) {
- *          // This byte is the last byte in the given sequence
- *      } else {
- *          // This byte does not complete the given sequence
- *      }
- * }
- * </pre>
- * </code>
- * </p>
- */
-public class NaiveSearchRingBuffer {
-
-    private final byte[] lookingFor;
-    private final int[] buffer;
-    private int insertionPointer = 0;
-    private int bufferSize = 0;
-
-    public NaiveSearchRingBuffer(final byte[] lookingFor) {
-        this.lookingFor = lookingFor;
-        this.buffer = new int[lookingFor.length];
-        Arrays.fill(buffer, -1);
-    }
-
-    /**
-     * Returns the contents of the internal buffer, which represents the last X
-     * bytes added to the buffer, where X is the minimum of the number of bytes
-     * added to the buffer or the length of the byte sequence for which we are
-     * looking
-     *
-     * @return
-     */
-    public byte[] getBufferContents() {
-        final int contentLength = Math.min(lookingFor.length, bufferSize);
-        final byte[] contents = new byte[contentLength];
-        for (int i = 0; i < contentLength; i++) {
-            final byte nextByte = (byte) buffer[(insertionPointer + i) % lookingFor.length];
-            contents[i] = nextByte;
-        }
-        return contents;
-    }
-
-    /**
-     * Returns the oldest byte in the buffer
-     *
-     * @return
-     */
-    public int getOldestByte() {
-        return buffer[insertionPointer];
-    }
-
-    /**
-     * Returns <code>true</code> if the number of bytes that have been added to
-     * the buffer is at least equal to the length of the byte sequence for which
-     * we are searching
-     *
-     * @return
-     */
-    public boolean isFilled() {
-        return bufferSize >= buffer.length;
-    }
-
-    /**
-     * Clears the internal buffer so that a new search may begin
-     */
-    public void clear() {
-        Arrays.fill(buffer, -1);
-        insertionPointer = 0;
-        bufferSize = 0;
-    }
-
-    /**
-     * Add the given byte to the buffer and notify whether or not the byte
-     * completes the desired byte sequence.
-     *
-     * @param data
-     * @return <code>true</code> if this byte completes the byte sequence,
-     * <code>false</code> otherwise.
-     */
-    public boolean addAndCompare(final byte data) {
-        buffer[insertionPointer] = data;
-        insertionPointer = (insertionPointer + 1) % lookingFor.length;
-
-        bufferSize++;
-        if (bufferSize < lookingFor.length) {
-            return false;
-        }
-
-        for (int i = 0; i < lookingFor.length; i++) {
-            final byte compare = (byte) buffer[(insertionPointer + i) % lookingFor.length];
-            if (compare != lookingFor[i]) {
-                return false;
-            }
-        }
-
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java
deleted file mode 100644
index a58ec6a..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-/**
- * A bean that holds a single value of type T.
- *
- * @param <T>
- */
-public class ObjectHolder<T> {
-
-    private T value;
-
-    public ObjectHolder(final T initialValue) {
-        this.value = initialValue;
-    }
-
-    public T get() {
-        return value;
-    }
-
-    public void set(T value) {
-        this.value = value;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java
deleted file mode 100644
index c0bb830..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * Thread-safe implementation of a RingBuffer
- *
- * @param <T>
- */
-public class RingBuffer<T> {
-
-    private final Object[] buffer;
-    private int insertionPointer = 0;
-    private boolean filled = false;
-
-    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-    private final Lock readLock = rwLock.readLock();
-    private final Lock writeLock = rwLock.writeLock();
-
-    public RingBuffer(final int size) {
-        buffer = new Object[size];
-    }
-
-    /**
-     * Adds the given value to the RingBuffer and returns the value that was
-     * removed in order to make room.
-     *
-     * @param value
-     * @return
-     */
-    @SuppressWarnings("unchecked")
-    public T add(final T value) {
-        Objects.requireNonNull(value);
-
-        writeLock.lock();
-        try {
-            final Object removed = buffer[insertionPointer];
-
-            buffer[insertionPointer] = value;
-
-            if (insertionPointer == buffer.length - 1) {
-                filled = true;
-            }
-
-            insertionPointer = (insertionPointer + 1) % buffer.length;
-            return (T) removed;
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    public int getSize() {
-        readLock.lock();
-        try {
-            return filled ? buffer.length : insertionPointer;
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    public List<T> getSelectedElements(final Filter<T> filter) {
-        return getSelectedElements(filter, Integer.MAX_VALUE);
-    }
-
-    public List<T> getSelectedElements(final Filter<T> filter, final int maxElements) {
-        final List<T> selected = new ArrayList<>(1000);
-        int numSelected = 0;
-        readLock.lock();
-        try {
-            for (int i = 0; i < buffer.length && numSelected < maxElements; i++) {
-                final int idx = (insertionPointer + i) % buffer.length;
-                final Object val = buffer[idx];
-                if (val == null) {
-                    continue;
-                }
-
-                @SuppressWarnings("unchecked")
-                final T element = (T) val;
-                if (filter.select(element)) {
-                    selected.add(element);
-                    numSelected++;
-                }
-            }
-        } finally {
-            readLock.unlock();
-        }
-        return selected;
-    }
-
-    public int countSelectedElements(final Filter<T> filter) {
-        int numSelected = 0;
-        readLock.lock();
-        try {
-            for (int i = 0; i < buffer.length; i++) {
-                final int idx = (insertionPointer + i) % buffer.length;
-                final Object val = buffer[idx];
-                if (val == null) {
-                    continue;
-                }
-
-                @SuppressWarnings("unchecked")
-                final T element = (T) val;
-                if (filter.select(element)) {
-                    numSelected++;
-                }
-            }
-        } finally {
-            readLock.unlock();
-        }
-
-        return numSelected;
-    }
-
-    /**
-     * Removes all elements from the RingBuffer that match the given filter
-     *
-     * @param filter
-     * @return
-     */
-    public int removeSelectedElements(final Filter<T> filter) {
-        int count = 0;
-
-        writeLock.lock();
-        try {
-            for (int i = 0; i < buffer.length; i++) {
-                final int idx = (insertionPointer + i + 1) % buffer.length;
-                final Object val = buffer[idx];
-                if (val == null) {
-                    continue;
-                }
-
-                @SuppressWarnings("unchecked")
-                final T element = (T) val;
-
-                if (filter.select(element)) {
-                    buffer[idx] = null;
-                }
-            }
-        } finally {
-            writeLock.unlock();
-        }
-
-        return count;
-    }
-
-    public List<T> asList() {
-        return getSelectedElements(new Filter<T>() {
-            @Override
-            public boolean select(final T value) {
-                return true;
-            }
-        });
-    }
-
-    public T getOldestElement() {
-        readLock.lock();
-        try {
-            return getElementData(insertionPointer);
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    public T getNewestElement() {
-        readLock.lock();
-        try {
-            int index = (insertionPointer == 0) ? buffer.length : insertionPointer - 1;
-            return getElementData(index);
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private T getElementData(final int index) {
-        readLock.lock();
-        try {
-            return (T) buffer[index];
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    /**
-     * Iterates over each element in the RingBuffer, calling the
-     * {@link ForEachEvaluator#evaluate(Object) evaluate} method on each element
-     * in the RingBuffer. If the Evaluator returns {@code false}, the method
-     * will skip all remaining elements in the RingBuffer; otherwise, the next
-     * element will be evaluated until all elements have been evaluated.
-     *
-     * @param evaluator
-     */
-    public void forEach(final ForEachEvaluator<T> evaluator) {
-        forEach(evaluator, IterationDirection.FORWARD);
-    }
-
-    /**
-     * Iterates over each element in the RingBuffer, calling the
-     * {@link ForEachEvaluator#evaluate(Object) evaluate} method on each element
-     * in the RingBuffer. If the Evaluator returns {@code false}, the method
-     * will skip all remaining elements in the RingBuffer; otherwise, the next
-     * element will be evaluated until all elements have been evaluated.
-     *
-     * @param evaluator
-     * @param iterationDirection the order in which to iterate over the elements
-     * in the RingBuffer
-     */
-    public void forEach(final ForEachEvaluator<T> evaluator, final IterationDirection iterationDirection) {
-        readLock.lock();
-        try {
-            final int startIndex;
-            final int endIndex;
-            final int increment;
-
-            if (iterationDirection == IterationDirection.FORWARD) {
-                startIndex = 0;
-                endIndex = buffer.length - 1;
-                increment = 1;
-            } else {
-                startIndex = buffer.length - 1;
-                endIndex = 0;
-                increment = -1;
-            }
-
-            for (int i = startIndex; (iterationDirection == IterationDirection.FORWARD ? i <= endIndex : i >= endIndex); i += increment) {
-                final int idx = (insertionPointer + i) % buffer.length;
-                final Object val = buffer[idx];
-                if (val == null) {
-                    continue;
-                }
-
-                @SuppressWarnings("unchecked")
-                final T element = (T) val;
-                if (!evaluator.evaluate(element)) {
-                    return;
-                }
-            }
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    public static interface Filter<S> {
-
-        boolean select(S value);
-    }
-
-    /**
-     * Defines an interface that can be used to iterate over all of the elements
-     * in the RingBuffer via the {@link #forEach} method
-     *
-     * @param <S>
-     */
-    public static interface ForEachEvaluator<S> {
-
-        /**
-         * Evaluates the given element and returns {@code true} if the next
-         * element should be evaluated, {@code false} otherwise
-         *
-         * @param value
-         * @return
-         */
-        boolean evaluate(S value);
-    }
-
-    public static enum IterationDirection {
-
-        FORWARD,
-        BACKWARD;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java
deleted file mode 100644
index cd11930..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import java.util.concurrent.TimeUnit;
-
-public final class StopWatch {
-
-    private long startNanos = -1L;
-    private long duration = -1L;
-
-    /**
-     * Creates a StopWatch but does not start it
-     */
-    public StopWatch() {
-        this(false);
-    }
-
-    /**
-     * @param autoStart whether or not the timer should be started automatically
-     */
-    public StopWatch(final boolean autoStart) {
-        if (autoStart) {
-            start();
-        }
-    }
-
-    public void start() {
-        this.startNanos = System.nanoTime();
-        this.duration = -1L;
-    }
-
-    public void stop() {
-        if (startNanos < 0) {
-            throw new IllegalStateException("StopWatch has not been started");
-        }
-        this.duration = System.nanoTime() - startNanos;
-        this.startNanos = -1L;
-    }
-
-    /**
-     * Returns the amount of time that the StopWatch was running.
-     *
-     * @param timeUnit
-     * @return
-     *
-     * @throws IllegalStateException if the StopWatch has not been stopped via
-     * {@link #stop()}
-     */
-    public long getDuration(final TimeUnit timeUnit) {
-        if (duration < 0) {
-            throw new IllegalStateException("Cannot get duration until StopWatch has been stopped");
-        }
-        return timeUnit.convert(duration, TimeUnit.NANOSECONDS);
-    }
-
-    /**
-     * Returns the amount of time that has elapsed since the timer was started.
-     *
-     * @param timeUnit
-     * @return
-     */
-    public long getElapsed(final TimeUnit timeUnit) {
-        return timeUnit.convert(System.nanoTime() - startNanos, TimeUnit.NANOSECONDS);
-    }
-
-    public String calculateDataRate(final long bytes) {
-        final double seconds = (double) duration / 1000000000.0D;
-        final long dataSize = (long) (bytes / seconds);
-        return FormatUtils.formatDataSize(dataSize) + "/sec";
-    }
-
-    public String getDuration() {
-        final StringBuilder sb = new StringBuilder();
-
-        long duration = this.duration;
-        final long minutes = (duration > 60000000000L) ? (duration / 60000000000L) : 0L;
-        duration -= TimeUnit.NANOSECONDS.convert(minutes, TimeUnit.MINUTES);
-
-        final long seconds = (duration > 1000000000L) ? (duration / 1000000000L) : 0L;
-        duration -= TimeUnit.NANOSECONDS.convert(seconds, TimeUnit.SECONDS);
-
-        final long millis = (duration > 1000000L) ? (duration / 1000000L) : 0L;
-        duration -= TimeUnit.NANOSECONDS.convert(millis, TimeUnit.MILLISECONDS);
-
-        final long nanos = duration % 1000000L;
-
-        if (minutes > 0) {
-            sb.append(minutes).append(" minutes");
-        }
-
-        if (seconds > 0) {
-            if (minutes > 0) {
-                sb.append(", ");
-            }
-
-            sb.append(seconds).append(" seconds");
-        }
-
-        if (millis > 0) {
-            if (seconds > 0) {
-                sb.append(", ");
-            }
-
-            sb.append(millis).append(" millis");
-        }
-        if (seconds == 0 && millis == 0) {
-            sb.append(nanos).append(" nanos");
-        }
-
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java
deleted file mode 100644
index 63736ed..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-/**
- *
- * @author unattrib
- * @param <A>
- * @param <B>
- */
-public class Tuple<A, B> {
-
-    final A key;
-    final B value;
-
-    public Tuple(A key, B value) {
-        this.key = key;
-        this.value = value;
-    }
-
-    public A getKey() {
-        return key;
-    }
-
-    public B getValue() {
-        return value;
-    }
-
-    @Override
-    public boolean equals(final Object other) {
-        if (other == null) {
-            return false;
-        }
-        if (other == this) {
-            return true;
-        }
-        if (!(other instanceof Tuple)) {
-            return false;
-        }
-
-        final Tuple<?, ?> tuple = (Tuple<?, ?>) other;
-        if (key == null) {
-            if (tuple.key != null) {
-                return false;
-            }
-        } else {
-            if (!key.equals(tuple.key)) {
-                return false;
-            }
-        }
-
-        if (value == null) {
-            if (tuple.value != null) {
-                return false;
-            }
-        } else {
-            if (!value.equals(tuple.value)) {
-                return false;
-            }
-        }
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        return 581 + (this.key == null ? 0 : this.key.hashCode()) + (this.value == null ? 0 : this.value.hashCode());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java
deleted file mode 100644
index a8d7e82..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.concurrency;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-
-public class DebugDisabledTimedLock implements DebuggableTimedLock {
-
-    private final Lock lock;
-
-    public DebugDisabledTimedLock(final Lock lock) {
-        this.lock = lock;
-    }
-
-    /**
-     *
-     * @return
-     */
-    @Override
-    public boolean tryLock() {
-        return lock.tryLock();
-    }
-
-    /**
-     *
-     * @param timeout
-     * @param timeUnit
-     * @return
-     */
-    @Override
-    public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
-        try {
-            return lock.tryLock(timeout, timeUnit);
-        } catch (InterruptedException e) {
-            return false;
-        }
-    }
-
-    /**
-     *
-     */
-    @Override
-    public void lock() {
-        lock.lock();
-    }
-
-    @Override
-    public void unlock(final String task) {
-        lock.unlock();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java
deleted file mode 100644
index f082168..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.concurrency;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DebugEnabledTimedLock implements DebuggableTimedLock {
-
-    private final Lock lock;
-    private final Logger logger;
-    private long lockTime = 0L;
-
-    private final Map<String, Long> lockIterations = new HashMap<>();
-    private final Map<String, Long> lockNanos = new HashMap<>();
-
-    private final String name;
-    private final int iterationFrequency;
-
-    public DebugEnabledTimedLock(final Lock lock, final String name, final int iterationFrequency) {
-        this.lock = lock;
-        this.name = name;
-        this.iterationFrequency = iterationFrequency;
-        logger = LoggerFactory.getLogger(TimedLock.class.getName() + "." + name);
-    }
-
-    /**
-     *
-     * @return
-     */
-    @Override
-    public boolean tryLock() {
-        logger.trace("Trying to obtain Lock: {}", name);
-        final boolean success = lock.tryLock();
-        if (!success) {
-            logger.trace("TryLock failed for Lock: {}", name);
-            return false;
-        }
-        logger.trace("TryLock successful");
-
-        return true;
-    }
-
-    /**
-     *
-     * @param timeout
-     * @param timeUnit
-     * @return
-     */
-    @Override
-    public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
-        logger.trace("Trying to obtain Lock {} with a timeout of {} {}", name, timeout, timeUnit);
-        final boolean success;
-        try {
-            success = lock.tryLock(timeout, timeUnit);
-        } catch (final InterruptedException ie) {
-            return false;
-        }
-
-        if (!success) {
-            logger.trace("TryLock failed for Lock {} with a timeout of {} {}", name, timeout, timeUnit);
-            return false;
-        }
-        logger.trace("TryLock successful");
-        return true;
-    }
-
-    /**
-     *
-     */
-    @Override
-    public void lock() {
-        logger.trace("Obtaining Lock {}", name);
-        lock.lock();
-        lockTime = System.nanoTime();
-        logger.trace("Obtained Lock {}", name);
-    }
-
-    /**
-     *
-     * @param task
-     */
-    @Override
-    public void unlock(final String task) {
-        if (lockTime <= 0L) {
-            lock.unlock();
-            return;
-        }
-
-        logger.trace("Releasing Lock {}", name);
-        final long nanosLocked = System.nanoTime() - lockTime;
-
-        Long startIterations = lockIterations.get(task);
-        if (startIterations == null) {
-            startIterations = 0L;
-        }
-        final long iterations = startIterations + 1L;
-        lockIterations.put(task, iterations);
-
-        Long startNanos = lockNanos.get(task);
-        if (startNanos == null) {
-            startNanos = 0L;
-        }
-        final long totalNanos = startNanos + nanosLocked;
-        lockNanos.put(task, totalNanos);
-
-        lockTime = -1L;
-
-        lock.unlock();
-        logger.trace("Released Lock {}", name);
-
-        if (iterations % iterationFrequency == 0) {
-            logger.debug("Lock {} held for {} nanos for task: {}; total lock iterations: {}; total lock nanos: {}", name, nanosLocked, task, iterations, totalNanos);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java
deleted file mode 100644
index 69da6e8..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.concurrency;
-
-import java.util.concurrent.TimeUnit;
-
-public interface DebuggableTimedLock {
-
-    void lock();
-
-    boolean tryLock(long timePeriod, TimeUnit timeUnit);
-
-    boolean tryLock();
-
-    void unlock(String task);
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java
deleted file mode 100644
index 532d3c3..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.concurrency;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TimedLock {
-
-    private final DebugEnabledTimedLock enabled;
-    private final DebugDisabledTimedLock disabled;
-
-    private final Logger logger;
-
-    public TimedLock(final Lock lock, final String name, final int iterationFrequency) {
-        this.enabled = new DebugEnabledTimedLock(lock, name, iterationFrequency);
-        this.disabled = new DebugDisabledTimedLock(lock);
-
-        logger = LoggerFactory.getLogger(TimedLock.class.getName() + "." + name);
-    }
-
-    private DebuggableTimedLock getLock() {
-        return logger.isDebugEnabled() ? enabled : disabled;
-    }
-
-    public boolean tryLock() {
-        return getLock().tryLock();
-    }
-
-    public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
-        return getLock().tryLock(timeout, timeUnit);
-    }
-
-    public void lock() {
-        getLock().lock();
-    }
-
-    public void unlock(final String task) {
-        getLock().unlock(task);
-    }
-
-}


Mime
View raw message