Return-Path: X-Original-To: apmail-nifi-commits-archive@minotaur.apache.org Delivered-To: apmail-nifi-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D0673C42C for ; Fri, 16 Jan 2015 02:31:31 +0000 (UTC) Received: (qmail 83070 invoked by uid 500); 16 Jan 2015 02:31:33 -0000 Delivered-To: apmail-nifi-commits-archive@nifi.apache.org Received: (qmail 83017 invoked by uid 500); 16 Jan 2015 02:31:33 -0000 Mailing-List: contact commits-help@nifi.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.incubator.apache.org Delivered-To: mailing list commits@nifi.incubator.apache.org Received: (qmail 82842 invoked by uid 99); 16 Jan 2015 02:31:33 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Jan 2015 02:31:33 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 16 Jan 2015 02:30:57 +0000 Received: (qmail 76799 invoked by uid 99); 16 Jan 2015 02:29:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Jan 2015 02:29:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 86099E0818; Fri, 16 Jan 2015 02:29:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: joewitt@apache.org To: commits@nifi.incubator.apache.org Date: Fri, 16 Jan 2015 02:29:55 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [38/51] [partial] incubator-nifi git commit: Reworked overall directory structure to make releasing nifi vs maven plugis easier X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java deleted file mode 100644 index ae075b5..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -public class LeakyBucketStreamThrottler implements StreamThrottler { - - private final int maxBytesPerSecond; - private final BlockingQueue requestQueue = new LinkedBlockingQueue(); - private final ScheduledExecutorService executorService; - private final AtomicBoolean shutdown = new AtomicBoolean(false); - - public LeakyBucketStreamThrottler(final int maxBytesPerSecond) { - this.maxBytesPerSecond = maxBytesPerSecond; - - executorService = Executors.newSingleThreadScheduledExecutor(); - final Runnable task = new Drain(); - executorService.scheduleAtFixedRate(task, 0, 1000, TimeUnit.MILLISECONDS); - } - - @Override - public void close() { - this.shutdown.set(true); - - executorService.shutdown(); - try { - // Should not take more than 2 seconds because we run every second. If it takes more than - // 2 seconds, it is because the Runnable thread is blocking on a write; in this case, - // we will just ignore it and return - executorService.awaitTermination(2, TimeUnit.SECONDS); - } catch (InterruptedException e) { - } - } - - @Override - public OutputStream newThrottledOutputStream(final OutputStream toWrap) { - return new OutputStream() { - @Override - public void write(final int b) throws IOException { - write(new byte[]{(byte) b}, 0, 1); - } - - @Override - public void write(byte[] b) throws IOException { - write(b, 0, b.length); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - final InputStream in = new ByteArrayInputStream(b, off, len); - LeakyBucketStreamThrottler.this.copy(in, toWrap); - } - - @Override - public void close() throws IOException { - toWrap.close(); - } - - @Override - public void flush() throws IOException { - toWrap.flush(); - } - }; - } - - @Override - public InputStream newThrottledInputStream(final InputStream toWrap) { - return new InputStream() { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - @Override - public int read() throws IOException { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(1); - LeakyBucketStreamThrottler.this.copy(toWrap, baos, 1L); - if (baos.getBufferLength() < 1) { - return -1; - } - - return baos.getUnderlyingBuffer()[0] & 0xFF; - } - - @Override - public int read(final byte[] b) throws IOException { - if(b.length == 0){ - return 0; - } - return read(b, 0, b.length); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - if ( len < 0 ) { - throw new IllegalArgumentException(); - } - if ( len == 0 ) { - return 0; - } - - baos.reset(); - final int copied = (int) LeakyBucketStreamThrottler.this.copy(toWrap, baos, len); - if (copied == 0) { - return -1; - } - System.arraycopy(baos.getUnderlyingBuffer(), 0, b, off, copied); - return copied; - } - - @Override - public void close() throws IOException { - toWrap.close(); - } - - @Override - public int available() throws IOException { - return toWrap.available(); - } - }; - } - - @Override - public long copy(final InputStream in, final OutputStream out) throws IOException { - return copy(in, out, -1); - } - - @Override - public long copy(final InputStream in, final OutputStream out, final long maxBytes) throws IOException { - long totalBytesCopied = 0; - boolean finished = false; - while (!finished) { - final long requestMax = (maxBytes < 0) ? Long.MAX_VALUE : maxBytes - totalBytesCopied; - final Request request = new Request(in, out, requestMax); - boolean transferred = false; - while (!transferred) { - if (shutdown.get()) { - throw new IOException("Throttler shutdown"); - } - - try { - transferred = requestQueue.offer(request, 1000, TimeUnit.MILLISECONDS); - } catch (final InterruptedException e) { - throw new IOException("Interrupted", e); - } - } - - final BlockingQueue responseQueue = request.getResponseQueue(); - Response response = null; - while (response == null) { - try { - if (shutdown.get()) { - throw new IOException("Throttler shutdown"); - } - response = responseQueue.poll(1000L, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - throw new IOException("Interrupted", e); - } - } - - if (!response.isSuccess()) { - throw response.getError(); - } - - totalBytesCopied += response.getBytesCopied(); - finished = (response.getBytesCopied() == 0) || (totalBytesCopied >= maxBytes && maxBytes > 0); - } - - return totalBytesCopied; - } - - /** - * This class is responsible for draining water from the leaky bucket. I.e., - * it actually moves the data - */ - private class Drain implements Runnable { - - private final byte[] buffer; - - public Drain() { - final int bufferSize = Math.min(4096, maxBytesPerSecond); - buffer = new byte[bufferSize]; - } - - @Override - public void run() { - final long start = System.currentTimeMillis(); - - int bytesTransferred = 0; - while (bytesTransferred < maxBytesPerSecond) { - final long maxMillisToWait = 1000 - (System.currentTimeMillis() - start); - if (maxMillisToWait < 1) { - return; - } - - try { - final Request request = requestQueue.poll(maxMillisToWait, TimeUnit.MILLISECONDS); - if (request == null) { - return; - } - - final BlockingQueue responseQueue = request.getResponseQueue(); - - final OutputStream out = request.getOutputStream(); - final InputStream in = request.getInputStream(); - - try { - final long requestMax = request.getMaxBytesToCopy(); - long maxBytesToTransfer; - if (requestMax < 0) { - maxBytesToTransfer = Math.min(buffer.length, maxBytesPerSecond - bytesTransferred); - } else { - maxBytesToTransfer = Math.min(requestMax, - Math.min(buffer.length, maxBytesPerSecond - bytesTransferred)); - } - maxBytesToTransfer = Math.max(1L, maxBytesToTransfer); - - final int bytesCopied = fillBuffer(in, maxBytesToTransfer); - out.write(buffer, 0, bytesCopied); - - final Response response = new Response(true, bytesCopied); - responseQueue.put(response); - bytesTransferred += bytesCopied; - } catch (final IOException e) { - final Response response = new Response(e); - responseQueue.put(response); - } - } catch (InterruptedException e) { - } - } - } - - private int fillBuffer(final InputStream in, final long maxBytes) throws IOException { - int bytesRead = 0; - int len; - while (bytesRead < maxBytes && (len = in.read(buffer, bytesRead, (int) Math.min(maxBytes - bytesRead, buffer.length - bytesRead))) > 0) { - bytesRead += len; - } - - return bytesRead; - } - } - - private static class Response { - - private final boolean success; - private final IOException error; - private final int bytesCopied; - - public Response(final boolean success, final int bytesCopied) { - this.success = success; - this.bytesCopied = bytesCopied; - this.error = null; - } - - public Response(final IOException error) { - this.success = false; - this.error = error; - this.bytesCopied = -1; - } - - public boolean isSuccess() { - return success; - } - - public IOException getError() { - return error; - } - - public int getBytesCopied() { - return bytesCopied; - } - } - - private static class Request { - - private final OutputStream out; - private final InputStream in; - private final long maxBytesToCopy; - private final BlockingQueue responseQueue; - - public Request(final InputStream in, final OutputStream out, final long maxBytesToCopy) { - this.out = out; - this.in = in; - this.maxBytesToCopy = maxBytesToCopy; - this.responseQueue = new LinkedBlockingQueue(1); - } - - public BlockingQueue getResponseQueue() { - return this.responseQueue; - } - - public OutputStream getOutputStream() { - return out; - } - - public InputStream getInputStream() { - return in; - } - - public long getMaxBytesToCopy() { - return maxBytesToCopy; - } - - @Override - public String toString() { - return "Request[maxBytes=" + maxBytesToCopy + "]"; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java deleted file mode 100644 index 0e75a22..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io; - -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; - -/** - * Wraps and InputStream so that the underlying InputStream cannot be closed. - * This is used so that the InputStream can be wrapped with yet another - * InputStream and prevent the outer layer from closing the inner InputStream - */ -public class NonCloseableInputStream extends FilterInputStream { - - private final InputStream toWrap; - - public NonCloseableInputStream(final InputStream toWrap) { - super(toWrap); - this.toWrap = toWrap; - } - - @Override - public int read() throws IOException { - return toWrap.read(); - } - - @Override - public int read(byte[] b) throws IOException { - return toWrap.read(b); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return toWrap.read(b, off, len); - } - - @Override - public void close() throws IOException { - // do nothing - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java deleted file mode 100644 index 9c77637..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io; - -import java.io.FilterOutputStream; -import java.io.IOException; -import java.io.OutputStream; - -public class NonCloseableOutputStream extends FilterOutputStream { - - private final OutputStream out; - - public NonCloseableOutputStream(final OutputStream out) { - super(out); - this.out = out; - } - - @Override - public void write(byte[] b) throws IOException { - out.write(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - out.write(b, off, len); - } - - @Override - public void write(int b) throws IOException { - out.write(b); - } - - @Override - public void close() throws IOException { - out.flush(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java deleted file mode 100644 index 8452761..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * OutputStream that throws away all data, just like as if writing to /dev/null - */ -public class NullOutputStream extends OutputStream { - - @Override - public void write(final int b) throws IOException { - } - - @Override - public void write(final byte[] b) throws IOException { - } - - @Override - public void write(final byte[] b, int off, int len) throws IOException { - } - - @Override - public void close() throws IOException { - } - - @Override - public void flush() throws IOException { - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java deleted file mode 100644 index 9158050..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -public interface StreamThrottler extends Closeable { - - long copy(InputStream in, OutputStream out) throws IOException; - - long copy(InputStream in, OutputStream out, long maxBytes) throws IOException; - - InputStream newThrottledInputStream(final InputStream toWrap); - - OutputStream newThrottledOutputStream(final OutputStream toWrap); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java deleted file mode 100644 index 8e3d606..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; - -import org.apache.nifi.stream.io.exception.BytePatternNotFoundException; -import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; - -public class StreamUtils { - - public static long copy(final InputStream source, final OutputStream destination) throws IOException { - final byte[] buffer = new byte[8192]; - int len; - long totalCount = 0L; - while ((len = source.read(buffer)) > 0) { - destination.write(buffer, 0, len); - totalCount += len; - } - return totalCount; - } - - /** - * Copies numBytes from source to - * destination. If numBytes are not available from - * source, throws EOFException - * - * @param source - * @param destination - * @param numBytes - * @throws IOException - */ - public static void copy(final InputStream source, final OutputStream destination, final long numBytes) throws IOException { - final byte[] buffer = new byte[8192]; - int len; - long bytesLeft = numBytes; - while ((len = source.read(buffer, 0, (int) Math.min(bytesLeft, buffer.length))) > 0) { - destination.write(buffer, 0, len); - bytesLeft -= len; - } - - if (bytesLeft > 0) { - throw new EOFException("Attempted to copy " + numBytes + " bytes but only " + (numBytes - bytesLeft) + " bytes were available"); - } - } - - /** - * Reads data from the given input stream, copying it to the destination - * byte array. If the InputStream has less data than the given byte array, - * throws an EOFException - * - * @param source - * @param destination - * @throws IOException - */ - public static void fillBuffer(final InputStream source, final byte[] destination) throws IOException { - fillBuffer(source, destination, true); - } - - /** - * Reads data from the given input stream, copying it to the destination - * byte array. If the InputStream has less data than the given byte array, - * throws an EOFException if ensureCapacity is true and - * otherwise returns the number of bytes copied - * - * @param source - * @param destination - * @param ensureCapacity whether or not to enforce that the InputStream have - * at least as much data as the capacity of the destination byte array - * @return - * @throws IOException - */ - public static int fillBuffer(final InputStream source, final byte[] destination, final boolean ensureCapacity) throws IOException { - int bytesRead = 0; - int len; - while (bytesRead < destination.length) { - len = source.read(destination, bytesRead, destination.length - bytesRead); - if (len < 0) { - if (ensureCapacity) { - throw new EOFException(); - } else { - break; - } - } - - bytesRead += len; - } - - return bytesRead; - } - - /** - * Copies data from in to out until either we are out of data (returns null) - * or we hit one of the byte patterns identified by the - * stoppers parameter (returns the byte pattern matched). The - * bytes in the stopper will be copied. - * - * @param in - * @param out - * @param maxBytes - * @param stoppers - * @return the byte array matched, or null if end of stream was reached - * @throws IOException - */ - public static byte[] copyInclusive(final InputStream in, final OutputStream out, final int maxBytes, final byte[]... stoppers) throws IOException { - if (stoppers.length == 0) { - return null; - } - - final List circularBuffers = new ArrayList(); - for (final byte[] stopper : stoppers) { - circularBuffers.add(new NonThreadSafeCircularBuffer(stopper)); - } - - long bytesRead = 0; - while (true) { - final int next = in.read(); - if (next == -1) { - return null; - } else if (maxBytes > 0 && ++bytesRead >= maxBytes) { - throw new BytePatternNotFoundException("Did not encounter any byte pattern that was expected; data does not appear to be in the expected format"); - } - - out.write(next); - - for (final NonThreadSafeCircularBuffer circ : circularBuffers) { - if (circ.addAndCompare((byte) next)) { - return circ.getByteArray(); - } - } - } - } - - /** - * Copies data from in to out until either we are out of data (returns null) - * or we hit one of the byte patterns identified by the - * stoppers parameter (returns the byte pattern matched). The - * byte pattern matched will NOT be copied to the output and will be un-read - * from the input. - * - * @param in - * @param out - * @param maxBytes - * @param stoppers - * @return the byte array matched, or null if end of stream was reached - * @throws IOException - */ - public static byte[] copyExclusive(final InputStream in, final OutputStream out, final int maxBytes, final byte[]... stoppers) throws IOException { - if (stoppers.length == 0) { - return null; - } - - int longest = 0; - NonThreadSafeCircularBuffer longestBuffer = null; - final List circularBuffers = new ArrayList(); - for (final byte[] stopper : stoppers) { - final NonThreadSafeCircularBuffer circularBuffer = new NonThreadSafeCircularBuffer(stopper); - if (stopper.length > longest) { - longest = stopper.length; - longestBuffer = circularBuffer; - circularBuffers.add(0, circularBuffer); - } else { - circularBuffers.add(circularBuffer); - } - } - - long bytesRead = 0; - while (true) { - final int next = in.read(); - if (next == -1) { - return null; - } else if (maxBytes > 0 && bytesRead++ > maxBytes) { - throw new BytePatternNotFoundException("Did not encounter any byte pattern that was expected; data does not appear to be in the expected format"); - } - - for (final NonThreadSafeCircularBuffer circ : circularBuffers) { - if (circ.addAndCompare((byte) next)) { - // The longest buffer has some data that may not have been written out yet; we need to make sure - // that we copy out those bytes. - final int bytesToCopy = longest - circ.getByteArray().length; - for (int i = 0; i < bytesToCopy; i++) { - final int oldestByte = longestBuffer.getOldestByte(); - if (oldestByte != -1) { - out.write(oldestByte); - longestBuffer.addAndCompare((byte) 0); - } - } - - return circ.getByteArray(); - } - } - - if (longestBuffer.isFilled()) { - out.write(longestBuffer.getOldestByte()); - } - } - } - - /** - * Skips the specified number of bytes from the InputStream - * - * If unable to skip that number of bytes, throws EOFException - * - * @param stream - * @param bytesToSkip - * @throws IOException - */ - public static void skip(final InputStream stream, final long bytesToSkip) throws IOException { - if (bytesToSkip <= 0) { - return; - } - long totalSkipped = 0L; - - // If we have a FileInputStream, calling skip(1000000) will return 1000000 even if the file is only - // 3 bytes. As a result, we will skip 1 less than the number requested, and then read the last - // byte in order to make sure that we've consumed the number of bytes requested. We then check that - // the final byte, which we read, is not -1. - final long actualBytesToSkip = bytesToSkip - 1; - while (totalSkipped < actualBytesToSkip) { - final long skippedThisIteration = stream.skip(actualBytesToSkip - totalSkipped); - if (skippedThisIteration == 0) { - final int nextByte = stream.read(); - if (nextByte == -1) { - throw new EOFException(); - } else { - totalSkipped++; - } - } - - totalSkipped += skippedThisIteration; - } - - final int lastByte = stream.read(); - if (lastByte == -1) { - throw new EOFException(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java deleted file mode 100644 index 2b9050d..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io; - -import java.io.OutputStream; - -/** - * This class extends the {@link java.util.zip.ZipOutputStream} by providing a - * constructor that allows the user to specify the compression level. The - * default compression level is 1, as opposed to Java's default of 5. - */ -public class ZipOutputStream extends java.util.zip.ZipOutputStream { - - public static final int DEFAULT_COMPRESSION_LEVEL = 1; - - public ZipOutputStream(final OutputStream out) { - this(out, DEFAULT_COMPRESSION_LEVEL); - } - - public ZipOutputStream(final OutputStream out, final int compressionLevel) { - super(out); - def.setLevel(compressionLevel); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java deleted file mode 100644 index 5d08616..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io.exception; - -import java.io.IOException; - -public class BytePatternNotFoundException extends IOException { - - private static final long serialVersionUID = -4128911284318513973L; - - public BytePatternNotFoundException(final String explanation) { - super(explanation); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java deleted file mode 100644 index b4b4c17..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io.util; - -import java.util.Arrays; - -public class NonThreadSafeCircularBuffer { - - private final byte[] lookingFor; - private final int[] buffer; - private int insertionPointer = 0; - private int bufferSize = 0; - - public NonThreadSafeCircularBuffer(final byte[] lookingFor) { - this.lookingFor = lookingFor; - buffer = new int[lookingFor.length]; - Arrays.fill(buffer, -1); - } - - public byte[] getByteArray() { - return lookingFor; - } - - /** - * Returns the oldest byte in the buffer - * - * @return - */ - public int getOldestByte() { - return buffer[insertionPointer]; - } - - public boolean isFilled() { - return bufferSize >= buffer.length; - } - - public boolean addAndCompare(final byte data) { - buffer[insertionPointer] = data; - insertionPointer = (insertionPointer + 1) % lookingFor.length; - - bufferSize++; - if (bufferSize < lookingFor.length) { - return false; - } - - for (int i = 0; i < lookingFor.length; i++) { - final byte compare = (byte) buffer[(insertionPointer + i) % lookingFor.length]; - if (compare != lookingFor[i]) { - return false; - } - } - - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java deleted file mode 100644 index 92061e0..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/BooleanHolder.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -public class BooleanHolder extends ObjectHolder { - - public BooleanHolder(final boolean initialValue) { - super(initialValue); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java deleted file mode 100644 index 805223f..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -import java.text.NumberFormat; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Locale; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public class FormatUtils { - - private static final String UNION = "|"; - - // for Data Sizes - private static final double BYTES_IN_KILOBYTE = 1024; - private static final double BYTES_IN_MEGABYTE = BYTES_IN_KILOBYTE * 1024; - private static final double BYTES_IN_GIGABYTE = BYTES_IN_MEGABYTE * 1024; - private static final double BYTES_IN_TERABYTE = BYTES_IN_GIGABYTE * 1024; - - // for Time Durations - private static final String NANOS = join(UNION, "ns", "nano", "nanos", "nanoseconds"); - private static final String MILLIS = join(UNION, "ms", "milli", "millis", "milliseconds"); - private static final String SECS = join(UNION, "s", "sec", "secs", "second", "seconds"); - private static final String MINS = join(UNION, "m", "min", "mins", "minute", "minutes"); - private static final String HOURS = join(UNION, "h", "hr", "hrs", "hour", "hours"); - private static final String DAYS = join(UNION, "d", "day", "days"); - - private static final String VALID_TIME_UNITS = join(UNION, NANOS, MILLIS, SECS, MINS, HOURS, DAYS); - public static final String TIME_DURATION_REGEX = "(\\d+)\\s*(" + VALID_TIME_UNITS + ")"; - public static final Pattern TIME_DURATION_PATTERN = Pattern.compile(TIME_DURATION_REGEX); - - /** - * Formats the specified count by adding commas. - * - * @param count - * @return - */ - public static String formatCount(final long count) { - return NumberFormat.getIntegerInstance().format(count); - } - - /** - * Formats the specified duration in 'mm:ss.SSS' format. - * - * @param sourceDuration - * @param sourceUnit - * @return - */ - public static String formatMinutesSeconds(final long sourceDuration, final TimeUnit sourceUnit) { - final long millis = TimeUnit.MILLISECONDS.convert(sourceDuration, sourceUnit); - final SimpleDateFormat formatter = new SimpleDateFormat("mm:ss.SSS", Locale.US); - return formatter.format(new Date(millis)); - } - - /** - * Formats the specified duration in 'HH:mm:ss.SSS' format. - * - * @param sourceDuration - * @param sourceUnit - * @return - */ - public static String formatHoursMinutesSeconds(final long sourceDuration, final TimeUnit sourceUnit) { - final long millis = TimeUnit.MILLISECONDS.convert(sourceDuration, sourceUnit); - final long millisInHour = TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS); - final int hours = (int) (millis / millisInHour); - final long whatsLeft = millis - hours * millisInHour; - - return pad(hours) + ":" + new SimpleDateFormat("mm:ss.SSS", Locale.US).format(new Date(whatsLeft)); - } - - private static String pad(final int val) { - return (val < 10) ? "0" + val : String.valueOf(val); - } - - /** - * Formats the specified data size in human readable format. - * - * @param dataSize Data size in bytes - * @return Human readable format - */ - public static String formatDataSize(final double dataSize) { - // initialize the formatter - final NumberFormat format = NumberFormat.getNumberInstance(); - format.setMaximumFractionDigits(2); - - // check terabytes - double dataSizeToFormat = dataSize / BYTES_IN_TERABYTE; - if (dataSizeToFormat > 1) { - return format.format(dataSizeToFormat) + " TB"; - } - - // check gigabytes - dataSizeToFormat = dataSize / BYTES_IN_GIGABYTE; - if (dataSizeToFormat > 1) { - return format.format(dataSizeToFormat) + " GB"; - } - - // check megabytes - dataSizeToFormat = dataSize / BYTES_IN_MEGABYTE; - if (dataSizeToFormat > 1) { - return format.format(dataSizeToFormat) + " MB"; - } - - // check kilobytes - dataSizeToFormat = dataSize / BYTES_IN_KILOBYTE; - if (dataSizeToFormat > 1) { - return format.format(dataSizeToFormat) + " KB"; - } - - // default to bytes - return format.format(dataSize) + " bytes"; - } - - public static long getTimeDuration(final String value, final TimeUnit desiredUnit) { - final Matcher matcher = TIME_DURATION_PATTERN.matcher(value.toLowerCase()); - if (!matcher.matches()) { - throw new IllegalArgumentException("Value '" + value + "' is not a valid Time Duration"); - } - - final String duration = matcher.group(1); - final String units = matcher.group(2); - TimeUnit specifiedTimeUnit = null; - switch (units.toLowerCase()) { - case "ns": - case "nano": - case "nanos": - case "nanoseconds": - specifiedTimeUnit = TimeUnit.NANOSECONDS; - break; - case "ms": - case "milli": - case "millis": - case "milliseconds": - specifiedTimeUnit = TimeUnit.MILLISECONDS; - break; - case "s": - case "sec": - case "secs": - case "second": - case "seconds": - specifiedTimeUnit = TimeUnit.SECONDS; - break; - case "m": - case "min": - case "mins": - case "minute": - case "minutes": - specifiedTimeUnit = TimeUnit.MINUTES; - break; - case "h": - case "hr": - case "hrs": - case "hour": - case "hours": - specifiedTimeUnit = TimeUnit.HOURS; - break; - case "d": - case "day": - case "days": - specifiedTimeUnit = TimeUnit.DAYS; - break; - } - - final long durationVal = Long.parseLong(duration); - return desiredUnit.convert(durationVal, specifiedTimeUnit); - } - - public static String formatUtilization(final double utilization) { - return utilization + "%"; - } - - private static String join(final String delimiter, final String... values) { - if (values.length == 0) { - return ""; - } else if (values.length == 1) { - return values[0]; - } - - final StringBuilder sb = new StringBuilder(); - sb.append(values[0]); - for (int i = 1; i < values.length; i++) { - sb.append(delimiter).append(values[i]); - } - - return sb.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java deleted file mode 100644 index 213bbc0..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/IntegerHolder.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -public class IntegerHolder extends ObjectHolder { - - public IntegerHolder(final int initialValue) { - super(initialValue); - } - - public int addAndGet(final int delta) { - final int curValue = get(); - final int newValue = curValue + delta; - set(newValue); - return newValue; - } - - public int getAndAdd(final int delta) { - final int curValue = get(); - final int newValue = curValue + delta; - set(newValue); - return curValue; - } - - public int incrementAndGet() { - return addAndGet(1); - } - - public int getAndIncrement() { - return getAndAdd(1); - } - - public int decrementAndGet() { - return addAndGet(-1); - } - - public int getAndDecrement() { - return getAndAdd(-1); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java deleted file mode 100644 index ef70ce8..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/LongHolder.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -/** - * Wraps a Long value so that it can be declared final and still be - * accessed from which inner classes; the functionality is similar to that of an - * AtomicLong, but operations on this class are not atomic. This results in - * greater performance when the atomicity is not needed. - */ -public class LongHolder extends ObjectHolder { - - public LongHolder(final long initialValue) { - super(initialValue); - } - - public long addAndGet(final long delta) { - final long curValue = get(); - final long newValue = curValue + delta; - set(newValue); - return newValue; - } - - public long getAndAdd(final long delta) { - final long curValue = get(); - final long newValue = curValue + delta; - set(newValue); - return curValue; - } - - public long incrementAndGet() { - return addAndGet(1); - } - - public long getAndIncrement() { - return getAndAdd(1); - } - - public long decrementAndGet() { - return addAndGet(-1L); - } - - public long getAndDecrement() { - return getAndAdd(-1L); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java deleted file mode 100644 index 85bfd96..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -import java.util.Arrays; - -/** - *

- * A RingBuffer that can be used to scan byte sequences for subsequences. - *

- * - *

- * This class implements an efficient naive search algorithm, which allows the - * user of the library to identify byte sequences in a stream on-the-fly so that - * the stream can be segmented without having to buffer the data. - *

- * - *

- * The intended usage paradigm is: - * - *

- * final byte[] searchSequence = ...;
- * final CircularBuffer buffer = new CircularBuffer(searchSequence);
- * while ((int nextByte = in.read()) > 0) {
- *      if ( buffer.addAndCompare(nextByte) ) {
- *          // This byte is the last byte in the given sequence
- *      } else {
- *          // This byte does not complete the given sequence
- *      }
- * }
- * 
- * - *

- */ -public class NaiveSearchRingBuffer { - - private final byte[] lookingFor; - private final int[] buffer; - private int insertionPointer = 0; - private int bufferSize = 0; - - public NaiveSearchRingBuffer(final byte[] lookingFor) { - this.lookingFor = lookingFor; - this.buffer = new int[lookingFor.length]; - Arrays.fill(buffer, -1); - } - - /** - * Returns the contents of the internal buffer, which represents the last X - * bytes added to the buffer, where X is the minimum of the number of bytes - * added to the buffer or the length of the byte sequence for which we are - * looking - * - * @return - */ - public byte[] getBufferContents() { - final int contentLength = Math.min(lookingFor.length, bufferSize); - final byte[] contents = new byte[contentLength]; - for (int i = 0; i < contentLength; i++) { - final byte nextByte = (byte) buffer[(insertionPointer + i) % lookingFor.length]; - contents[i] = nextByte; - } - return contents; - } - - /** - * Returns the oldest byte in the buffer - * - * @return - */ - public int getOldestByte() { - return buffer[insertionPointer]; - } - - /** - * Returns true if the number of bytes that have been added to - * the buffer is at least equal to the length of the byte sequence for which - * we are searching - * - * @return - */ - public boolean isFilled() { - return bufferSize >= buffer.length; - } - - /** - * Clears the internal buffer so that a new search may begin - */ - public void clear() { - Arrays.fill(buffer, -1); - insertionPointer = 0; - bufferSize = 0; - } - - /** - * Add the given byte to the buffer and notify whether or not the byte - * completes the desired byte sequence. - * - * @param data - * @return true if this byte completes the byte sequence, - * false otherwise. - */ - public boolean addAndCompare(final byte data) { - buffer[insertionPointer] = data; - insertionPointer = (insertionPointer + 1) % lookingFor.length; - - bufferSize++; - if (bufferSize < lookingFor.length) { - return false; - } - - for (int i = 0; i < lookingFor.length; i++) { - final byte compare = (byte) buffer[(insertionPointer + i) % lookingFor.length]; - if (compare != lookingFor[i]) { - return false; - } - } - - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java deleted file mode 100644 index a58ec6a..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/ObjectHolder.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -/** - * A bean that holds a single value of type T. - * - * @param - */ -public class ObjectHolder { - - private T value; - - public ObjectHolder(final T initialValue) { - this.value = initialValue; - } - - public T get() { - return value; - } - - public void set(T value) { - this.value = value; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java deleted file mode 100644 index c0bb830..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - * Thread-safe implementation of a RingBuffer - * - * @param - */ -public class RingBuffer { - - private final Object[] buffer; - private int insertionPointer = 0; - private boolean filled = false; - - private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); - private final Lock readLock = rwLock.readLock(); - private final Lock writeLock = rwLock.writeLock(); - - public RingBuffer(final int size) { - buffer = new Object[size]; - } - - /** - * Adds the given value to the RingBuffer and returns the value that was - * removed in order to make room. - * - * @param value - * @return - */ - @SuppressWarnings("unchecked") - public T add(final T value) { - Objects.requireNonNull(value); - - writeLock.lock(); - try { - final Object removed = buffer[insertionPointer]; - - buffer[insertionPointer] = value; - - if (insertionPointer == buffer.length - 1) { - filled = true; - } - - insertionPointer = (insertionPointer + 1) % buffer.length; - return (T) removed; - } finally { - writeLock.unlock(); - } - } - - public int getSize() { - readLock.lock(); - try { - return filled ? buffer.length : insertionPointer; - } finally { - readLock.unlock(); - } - } - - public List getSelectedElements(final Filter filter) { - return getSelectedElements(filter, Integer.MAX_VALUE); - } - - public List getSelectedElements(final Filter filter, final int maxElements) { - final List selected = new ArrayList<>(1000); - int numSelected = 0; - readLock.lock(); - try { - for (int i = 0; i < buffer.length && numSelected < maxElements; i++) { - final int idx = (insertionPointer + i) % buffer.length; - final Object val = buffer[idx]; - if (val == null) { - continue; - } - - @SuppressWarnings("unchecked") - final T element = (T) val; - if (filter.select(element)) { - selected.add(element); - numSelected++; - } - } - } finally { - readLock.unlock(); - } - return selected; - } - - public int countSelectedElements(final Filter filter) { - int numSelected = 0; - readLock.lock(); - try { - for (int i = 0; i < buffer.length; i++) { - final int idx = (insertionPointer + i) % buffer.length; - final Object val = buffer[idx]; - if (val == null) { - continue; - } - - @SuppressWarnings("unchecked") - final T element = (T) val; - if (filter.select(element)) { - numSelected++; - } - } - } finally { - readLock.unlock(); - } - - return numSelected; - } - - /** - * Removes all elements from the RingBuffer that match the given filter - * - * @param filter - * @return - */ - public int removeSelectedElements(final Filter filter) { - int count = 0; - - writeLock.lock(); - try { - for (int i = 0; i < buffer.length; i++) { - final int idx = (insertionPointer + i + 1) % buffer.length; - final Object val = buffer[idx]; - if (val == null) { - continue; - } - - @SuppressWarnings("unchecked") - final T element = (T) val; - - if (filter.select(element)) { - buffer[idx] = null; - } - } - } finally { - writeLock.unlock(); - } - - return count; - } - - public List asList() { - return getSelectedElements(new Filter() { - @Override - public boolean select(final T value) { - return true; - } - }); - } - - public T getOldestElement() { - readLock.lock(); - try { - return getElementData(insertionPointer); - } finally { - readLock.unlock(); - } - } - - public T getNewestElement() { - readLock.lock(); - try { - int index = (insertionPointer == 0) ? buffer.length : insertionPointer - 1; - return getElementData(index); - } finally { - readLock.unlock(); - } - } - - @SuppressWarnings("unchecked") - private T getElementData(final int index) { - readLock.lock(); - try { - return (T) buffer[index]; - } finally { - readLock.unlock(); - } - } - - /** - * Iterates over each element in the RingBuffer, calling the - * {@link ForEachEvaluator#evaluate(Object) evaluate} method on each element - * in the RingBuffer. If the Evaluator returns {@code false}, the method - * will skip all remaining elements in the RingBuffer; otherwise, the next - * element will be evaluated until all elements have been evaluated. - * - * @param evaluator - */ - public void forEach(final ForEachEvaluator evaluator) { - forEach(evaluator, IterationDirection.FORWARD); - } - - /** - * Iterates over each element in the RingBuffer, calling the - * {@link ForEachEvaluator#evaluate(Object) evaluate} method on each element - * in the RingBuffer. If the Evaluator returns {@code false}, the method - * will skip all remaining elements in the RingBuffer; otherwise, the next - * element will be evaluated until all elements have been evaluated. - * - * @param evaluator - * @param iterationDirection the order in which to iterate over the elements - * in the RingBuffer - */ - public void forEach(final ForEachEvaluator evaluator, final IterationDirection iterationDirection) { - readLock.lock(); - try { - final int startIndex; - final int endIndex; - final int increment; - - if (iterationDirection == IterationDirection.FORWARD) { - startIndex = 0; - endIndex = buffer.length - 1; - increment = 1; - } else { - startIndex = buffer.length - 1; - endIndex = 0; - increment = -1; - } - - for (int i = startIndex; (iterationDirection == IterationDirection.FORWARD ? i <= endIndex : i >= endIndex); i += increment) { - final int idx = (insertionPointer + i) % buffer.length; - final Object val = buffer[idx]; - if (val == null) { - continue; - } - - @SuppressWarnings("unchecked") - final T element = (T) val; - if (!evaluator.evaluate(element)) { - return; - } - } - } finally { - readLock.unlock(); - } - } - - public static interface Filter { - - boolean select(S value); - } - - /** - * Defines an interface that can be used to iterate over all of the elements - * in the RingBuffer via the {@link #forEach} method - * - * @param - */ - public static interface ForEachEvaluator { - - /** - * Evaluates the given element and returns {@code true} if the next - * element should be evaluated, {@code false} otherwise - * - * @param value - * @return - */ - boolean evaluate(S value); - } - - public static enum IterationDirection { - - FORWARD, - BACKWARD; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java deleted file mode 100644 index cd11930..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/StopWatch.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -import java.util.concurrent.TimeUnit; - -public final class StopWatch { - - private long startNanos = -1L; - private long duration = -1L; - - /** - * Creates a StopWatch but does not start it - */ - public StopWatch() { - this(false); - } - - /** - * @param autoStart whether or not the timer should be started automatically - */ - public StopWatch(final boolean autoStart) { - if (autoStart) { - start(); - } - } - - public void start() { - this.startNanos = System.nanoTime(); - this.duration = -1L; - } - - public void stop() { - if (startNanos < 0) { - throw new IllegalStateException("StopWatch has not been started"); - } - this.duration = System.nanoTime() - startNanos; - this.startNanos = -1L; - } - - /** - * Returns the amount of time that the StopWatch was running. - * - * @param timeUnit - * @return - * - * @throws IllegalStateException if the StopWatch has not been stopped via - * {@link #stop()} - */ - public long getDuration(final TimeUnit timeUnit) { - if (duration < 0) { - throw new IllegalStateException("Cannot get duration until StopWatch has been stopped"); - } - return timeUnit.convert(duration, TimeUnit.NANOSECONDS); - } - - /** - * Returns the amount of time that has elapsed since the timer was started. - * - * @param timeUnit - * @return - */ - public long getElapsed(final TimeUnit timeUnit) { - return timeUnit.convert(System.nanoTime() - startNanos, TimeUnit.NANOSECONDS); - } - - public String calculateDataRate(final long bytes) { - final double seconds = (double) duration / 1000000000.0D; - final long dataSize = (long) (bytes / seconds); - return FormatUtils.formatDataSize(dataSize) + "/sec"; - } - - public String getDuration() { - final StringBuilder sb = new StringBuilder(); - - long duration = this.duration; - final long minutes = (duration > 60000000000L) ? (duration / 60000000000L) : 0L; - duration -= TimeUnit.NANOSECONDS.convert(minutes, TimeUnit.MINUTES); - - final long seconds = (duration > 1000000000L) ? (duration / 1000000000L) : 0L; - duration -= TimeUnit.NANOSECONDS.convert(seconds, TimeUnit.SECONDS); - - final long millis = (duration > 1000000L) ? (duration / 1000000L) : 0L; - duration -= TimeUnit.NANOSECONDS.convert(millis, TimeUnit.MILLISECONDS); - - final long nanos = duration % 1000000L; - - if (minutes > 0) { - sb.append(minutes).append(" minutes"); - } - - if (seconds > 0) { - if (minutes > 0) { - sb.append(", "); - } - - sb.append(seconds).append(" seconds"); - } - - if (millis > 0) { - if (seconds > 0) { - sb.append(", "); - } - - sb.append(millis).append(" millis"); - } - if (seconds == 0 && millis == 0) { - sb.append(nanos).append(" nanos"); - } - - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java deleted file mode 100644 index 63736ed..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/Tuple.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -/** - * - * @author unattrib - * @param - * @param - */ -public class Tuple { - - final A key; - final B value; - - public Tuple(A key, B value) { - this.key = key; - this.value = value; - } - - public A getKey() { - return key; - } - - public B getValue() { - return value; - } - - @Override - public boolean equals(final Object other) { - if (other == null) { - return false; - } - if (other == this) { - return true; - } - if (!(other instanceof Tuple)) { - return false; - } - - final Tuple tuple = (Tuple) other; - if (key == null) { - if (tuple.key != null) { - return false; - } - } else { - if (!key.equals(tuple.key)) { - return false; - } - } - - if (value == null) { - if (tuple.value != null) { - return false; - } - } else { - if (!value.equals(tuple.value)) { - return false; - } - } - - return true; - } - - @Override - public int hashCode() { - return 581 + (this.key == null ? 0 : this.key.hashCode()) + (this.value == null ? 0 : this.value.hashCode()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java deleted file mode 100644 index a8d7e82..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugDisabledTimedLock.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.concurrency; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; - -public class DebugDisabledTimedLock implements DebuggableTimedLock { - - private final Lock lock; - - public DebugDisabledTimedLock(final Lock lock) { - this.lock = lock; - } - - /** - * - * @return - */ - @Override - public boolean tryLock() { - return lock.tryLock(); - } - - /** - * - * @param timeout - * @param timeUnit - * @return - */ - @Override - public boolean tryLock(final long timeout, final TimeUnit timeUnit) { - try { - return lock.tryLock(timeout, timeUnit); - } catch (InterruptedException e) { - return false; - } - } - - /** - * - */ - @Override - public void lock() { - lock.lock(); - } - - @Override - public void unlock(final String task) { - lock.unlock(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java deleted file mode 100644 index f082168..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebugEnabledTimedLock.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.concurrency; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DebugEnabledTimedLock implements DebuggableTimedLock { - - private final Lock lock; - private final Logger logger; - private long lockTime = 0L; - - private final Map lockIterations = new HashMap<>(); - private final Map lockNanos = new HashMap<>(); - - private final String name; - private final int iterationFrequency; - - public DebugEnabledTimedLock(final Lock lock, final String name, final int iterationFrequency) { - this.lock = lock; - this.name = name; - this.iterationFrequency = iterationFrequency; - logger = LoggerFactory.getLogger(TimedLock.class.getName() + "." + name); - } - - /** - * - * @return - */ - @Override - public boolean tryLock() { - logger.trace("Trying to obtain Lock: {}", name); - final boolean success = lock.tryLock(); - if (!success) { - logger.trace("TryLock failed for Lock: {}", name); - return false; - } - logger.trace("TryLock successful"); - - return true; - } - - /** - * - * @param timeout - * @param timeUnit - * @return - */ - @Override - public boolean tryLock(final long timeout, final TimeUnit timeUnit) { - logger.trace("Trying to obtain Lock {} with a timeout of {} {}", name, timeout, timeUnit); - final boolean success; - try { - success = lock.tryLock(timeout, timeUnit); - } catch (final InterruptedException ie) { - return false; - } - - if (!success) { - logger.trace("TryLock failed for Lock {} with a timeout of {} {}", name, timeout, timeUnit); - return false; - } - logger.trace("TryLock successful"); - return true; - } - - /** - * - */ - @Override - public void lock() { - logger.trace("Obtaining Lock {}", name); - lock.lock(); - lockTime = System.nanoTime(); - logger.trace("Obtained Lock {}", name); - } - - /** - * - * @param task - */ - @Override - public void unlock(final String task) { - if (lockTime <= 0L) { - lock.unlock(); - return; - } - - logger.trace("Releasing Lock {}", name); - final long nanosLocked = System.nanoTime() - lockTime; - - Long startIterations = lockIterations.get(task); - if (startIterations == null) { - startIterations = 0L; - } - final long iterations = startIterations + 1L; - lockIterations.put(task, iterations); - - Long startNanos = lockNanos.get(task); - if (startNanos == null) { - startNanos = 0L; - } - final long totalNanos = startNanos + nanosLocked; - lockNanos.put(task, totalNanos); - - lockTime = -1L; - - lock.unlock(); - logger.trace("Released Lock {}", name); - - if (iterations % iterationFrequency == 0) { - logger.debug("Lock {} held for {} nanos for task: {}; total lock iterations: {}; total lock nanos: {}", name, nanosLocked, task, iterations, totalNanos); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java deleted file mode 100644 index 69da6e8..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/DebuggableTimedLock.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.concurrency; - -import java.util.concurrent.TimeUnit; - -public interface DebuggableTimedLock { - - void lock(); - - boolean tryLock(long timePeriod, TimeUnit timeUnit); - - boolean tryLock(); - - void unlock(String task); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java deleted file mode 100644 index 532d3c3..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/util/concurrency/TimedLock.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util.concurrency; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TimedLock { - - private final DebugEnabledTimedLock enabled; - private final DebugDisabledTimedLock disabled; - - private final Logger logger; - - public TimedLock(final Lock lock, final String name, final int iterationFrequency) { - this.enabled = new DebugEnabledTimedLock(lock, name, iterationFrequency); - this.disabled = new DebugDisabledTimedLock(lock); - - logger = LoggerFactory.getLogger(TimedLock.class.getName() + "." + name); - } - - private DebuggableTimedLock getLock() { - return logger.isDebugEnabled() ? enabled : disabled; - } - - public boolean tryLock() { - return getLock().tryLock(); - } - - public boolean tryLock(final long timeout, final TimeUnit timeUnit) { - return getLock().tryLock(timeout, timeUnit); - } - - public void lock() { - getLock().lock(); - } - - public void unlock(final String task) { - getLock().unlock(task); - } - -}