Return-Path: X-Original-To: apmail-nifi-commits-archive@minotaur.apache.org Delivered-To: apmail-nifi-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5572517E8D for ; Mon, 19 Jan 2015 18:16:31 +0000 (UTC) Received: (qmail 29603 invoked by uid 500); 19 Jan 2015 18:16:33 -0000 Delivered-To: apmail-nifi-commits-archive@nifi.apache.org Received: (qmail 29554 invoked by uid 500); 19 Jan 2015 18:16:33 -0000 Mailing-List: contact commits-help@nifi.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.incubator.apache.org Delivered-To: mailing list commits@nifi.incubator.apache.org Received: (qmail 29539 invoked by uid 99); 19 Jan 2015 18:16:33 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Jan 2015 18:16:33 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 19 Jan 2015 18:16:04 +0000 Received: (qmail 26158 invoked by uid 99); 19 Jan 2015 18:15:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Jan 2015 18:15:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5AF03E0861; Mon, 19 Jan 2015 18:15:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mcgilman@apache.org To: commits@nifi.incubator.apache.org Date: Mon, 19 Jan 2015 18:16:14 -0000 Message-Id: <9b1ae3c437184acd860be9512722ecf7@git.apache.org> In-Reply-To: <814206d3c3af40e3a2b9b0eff8ebb390@git.apache.org> References: <814206d3c3af40e3a2b9b0eff8ebb390@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [40/59] [abbrv] [partial] incubator-nifi git commit: Reworked overall directory structure to make releasing nifi vs maven plugis easier X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java deleted file mode 100644 index 8a8b7c0..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.socket.multicast; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.MulticastSocket; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author unattributed - */ -public final class MulticastUtils { - - private static final Logger logger = new org.apache.nifi.logging.NiFiLog(LoggerFactory.getLogger(MulticastUtils.class)); - - public static MulticastSocket createMulticastSocket(final MulticastConfiguration config) throws IOException { - return createMulticastSocket(0, config); - } - - public static MulticastSocket createMulticastSocket(final int port, final MulticastConfiguration config) throws IOException { - if (config == null) { - throw new IllegalArgumentException("Configuration may not be null."); - } - - final MulticastSocket socket; - if (port <= 0) { - socket = new MulticastSocket(); - } else { - socket = new MulticastSocket(port); - } - socket.setTimeToLive(config.getTtl().getTtl()); - - if (config.getSocketTimeout() != null) { - socket.setSoTimeout(config.getSocketTimeout()); - } - - if (config.getReuseAddress() != null) { - socket.setReuseAddress(config.getReuseAddress()); - } - - if (config.getReceiveBufferSize() != null) { - socket.setReceiveBufferSize(config.getReceiveBufferSize()); - } - - if (config.getSendBufferSize() != null) { - socket.setSendBufferSize(config.getSendBufferSize()); - } - - if (config.getTrafficClass() != null) { - socket.setTrafficClass(config.getTrafficClass()); - } - - if (config.getLoopbackMode() != null) { - socket.setLoopbackMode(config.getLoopbackMode()); - } - - return socket; - } - - public static void closeQuietly(final MulticastSocket socket) { - - if (socket == null) { - return; - } - - try { - socket.close(); - } catch (final Exception ex) { - logger.debug("Failed to close multicast socket due to: " + ex, ex); - } - - } - - public static void closeQuietly(final MulticastSocket socket, final InetAddress groupAddress) { - - if (socket == null) { - return; - } - - try { - socket.leaveGroup(groupAddress); - } catch (final Exception ex) { - logger.debug("Failed to leave multicast group due to: " + ex, ex); - } - - try { - socket.close(); - } catch (final Exception ex) { - logger.debug("Failed to close multicast socket due to: " + ex, ex); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java deleted file mode 100644 index 173146e..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.socket.multicast; - -/** - * Defines a generic interface for discovering services. - * - * @author unattributed - */ -public interface ServiceDiscovery { - - /** - * @return the discovered service - */ - DiscoverableService getService(); - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java deleted file mode 100644 index 86260d8..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.socket.multicast; - -import java.util.Set; - -/** - * Defines the interface for broadcasting a collection of services for client - * discovery. - * - * @author unattributed - */ -public interface ServicesBroadcaster { - - /** - * @return the delay in milliseconds to wait between successive broadcasts - */ - int getBroadcastDelayMs(); - - /** - * @return the broadcasted services - */ - Set getServices(); - - /** - * Adds the given service to the set of broadcasted services. - * - * @param service a service - * @return true if the service was added to the set; false a service with - * the given service name already exists in the set. - */ - boolean addService(DiscoverableService service); - - /** - * Removes the service with the given service name from the set. - * - * @param serviceName a service name - * @return true if the service was removed; false otherwise - */ - boolean removeService(String serviceName); - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java deleted file mode 100644 index b5240c9..0000000 --- a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.nio.example; - -import java.io.IOException; -import java.util.Calendar; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.io.nio.BufferPool; -import org.apache.nifi.io.nio.ChannelListener; -import org.apache.nifi.io.nio.consumer.StreamConsumer; -import org.apache.nifi.io.nio.consumer.StreamConsumerFactory; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - * @author none - */ -public final class ServerMain { - - private static final Logger LOGGER = LoggerFactory.getLogger(ServerMain.class); - - public static void main(final String[] args) throws IOException { - System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); - System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); - - final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); - final Map> consumerMap = new ConcurrentHashMap<>(); - final BufferPool bufferPool = new BufferPool(10, 5 << 20, false, 40.0); - ChannelListener listener = null; - try { - executor.scheduleWithFixedDelay(bufferPool, 0L, 5L, TimeUnit.SECONDS); - listener = new ChannelListener(5, new ExampleStreamConsumerFactory(executor, consumerMap), bufferPool, 5, TimeUnit.MILLISECONDS); - listener.setChannelReaderSchedulingPeriod(50L, TimeUnit.MILLISECONDS); - listener.addDatagramChannel(null, 20000, 32 << 20); - LOGGER.info("Listening for UDP data on port 20000"); - listener.addServerSocket(null, 20001, 64 << 20); - LOGGER.info("listening for TCP connections on port 20001"); - listener.addServerSocket(null, 20002, 64 << 20); - LOGGER.info("listening for TCP connections on port 20002"); - final Calendar endTime = Calendar.getInstance(); - endTime.add(Calendar.MINUTE, 30); - while (true) { - processAllConsumers(consumerMap); - if (endTime.before(Calendar.getInstance())) { - break; // time to shut down - } - } - } finally { - if (listener != null) { - LOGGER.info("Shutting down server...."); - listener.shutdown(1L, TimeUnit.SECONDS); - LOGGER.info("Consumer map size = " + consumerMap.size()); - while (consumerMap.size() > 0) { - processAllConsumers(consumerMap); - } - LOGGER.info("Consumer map size = " + consumerMap.size()); - } - executor.shutdown(); - } - } - - private static void processAllConsumers(final Map> consumerMap) { - final Set deadConsumers = new HashSet<>(); - for (final Map.Entry> entry : consumerMap.entrySet()) { - if (entry.getKey().isConsumerFinished()) { - entry.getValue().cancel(true); - deadConsumers.add(entry.getKey()); - } - } - for (final StreamConsumer consumer : deadConsumers) { - LOGGER.debug("removing consumer " + consumer); - consumerMap.remove(consumer); - } - } - - public static final class ConsumerRunner implements Runnable { - - private final StreamConsumer consumer; - - public ConsumerRunner(final StreamConsumer consumer) { - this.consumer = consumer; - } - - @Override - public void run() { - if (consumer.isConsumerFinished()) { - return; - } - try { - consumer.process(); - } catch (IOException ex) { - LOGGER.error("", ex); - } - } - } - - public static final class ExampleStreamConsumerFactory implements StreamConsumerFactory { - - final ScheduledExecutorService executor; - final Map> consumerMap; - - public ExampleStreamConsumerFactory(final ScheduledExecutorService executor, final Map> consumerMap) { - this.executor = executor; - this.consumerMap = consumerMap; - } - - @Override - public StreamConsumer newInstance(final String streamId) { - final StreamConsumer consumer = new UselessStreamConsumer(streamId); - final ScheduledFuture future = executor.scheduleWithFixedDelay(new ConsumerRunner(consumer), 0L, 10L, TimeUnit.MILLISECONDS); - consumerMap.put(consumer, future); - LOGGER.info("Added consumer: " + consumer); - return consumer; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java deleted file mode 100644 index b3d214e..0000000 --- a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.nio.example; - -import java.io.IOException; -import java.net.Socket; -import java.net.SocketException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author none - */ -public class TCPClient { - - private static final Logger logger = LoggerFactory.getLogger(TCPClient.class); - - public static void main(final String[] args) throws Exception { - final byte[] bytes = TCPClient.makeBytes(); - Thread first = new Thread(new Runnable() { - - @Override - public void run() { - try { - for (int i = 0; i < 10; i++) { - sendData(20001, bytes); - } - } catch (Exception e) { - logger.error("Blew exception", e); - } - } - }); - Thread second = new Thread(new Runnable() { - - @Override - public void run() { - try { - for (int i = 0; i < 10; i++) { - sendData(20002, bytes); - } - } catch (Exception e) { - logger.error("Blew exception", e); - } - } - }); - first.start(); - second.start(); - } - - public static byte[] makeBytes() { - byte[] bytes = new byte[2 << 20]; - return bytes; - } - - private static void sendData(final int port, final byte[] bytes) throws SocketException, IOException, InterruptedException { - long totalBytes; - try (Socket sock = new Socket("localhost", port)) { - sock.setTcpNoDelay(true); - sock.setSoTimeout(2000); - totalBytes = 0L; - logger.info("socket established " + sock + " to port " + port + " now waiting 5 seconds to send anything..."); - Thread.sleep(5000L); - for (int i = 0; i < 1000; i++) { - sock.getOutputStream().write(bytes); - totalBytes += bytes.length; - } sock.getOutputStream().flush(); - } - logger.info("Total bytes sent: " + totalBytes + " to port " + port); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java deleted file mode 100644 index 90f4c42..0000000 --- a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.nio.example; - -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetSocketAddress; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author none - */ -public class UDPClient { - - private static final Logger LOGGER = LoggerFactory.getLogger(UDPClient.class); - - public static void main(final String[] args) throws Exception { - final byte[] buffer = UDPClient.makeBytes(); - final DatagramPacket packet = new DatagramPacket(buffer, buffer.length, new InetSocketAddress("localhost", 20000)); - final DatagramSocket socket = new DatagramSocket(); - final long startTime = System.nanoTime(); - for (int i = 0; i < 819200; i++) { // 100 MB - socket.send(packet); - } - final long endTime = System.nanoTime(); - final long durationMillis = (endTime - startTime) / 1000000; - LOGGER.info("Sent all UDP packets without any obvious errors | duration ms= " + durationMillis); - } - - public static byte[] makeBytes() { - byte[] bytes = new byte[128]; - return bytes; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java deleted file mode 100644 index 9ec26e9..0000000 --- a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.nio.example; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.nifi.io.nio.consumer.AbstractStreamConsumer; - -/** - * - * @author none - */ -public class UselessStreamConsumer extends AbstractStreamConsumer { - - public UselessStreamConsumer(final String id) { - super(id); - } - - @Override - protected void processBuffer(final ByteBuffer buffer) throws IOException { - } - - @Override - protected void onConsumerDone() { - System.err.println("IN consumer done"); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/test/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/test/resources/log4j.xml b/commons/nifi-socket-utils/src/test/resources/log4j.xml deleted file mode 100644 index 8e93769..0000000 --- a/commons/nifi-socket-utils/src/test/resources/log4j.xml +++ /dev/null @@ -1,36 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/.gitignore ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/.gitignore b/commons/nifi-utils/.gitignore deleted file mode 100755 index 12c5231..0000000 --- a/commons/nifi-utils/.gitignore +++ /dev/null @@ -1,8 +0,0 @@ -/target -/target -/target -/target -/target -/target -/target -/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/pom.xml ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/pom.xml b/commons/nifi-utils/pom.xml deleted file mode 100644 index c5c2a68..0000000 --- a/commons/nifi-utils/pom.xml +++ /dev/null @@ -1,33 +0,0 @@ - - - - 4.0.0 - - - org.apache.nifi - nifi-commons-parent - 0.0.1-SNAPSHOT - - - nifi-utils - 0.0.1-SNAPSHOT - jar - NiFi Utils - - http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java b/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java deleted file mode 100644 index 24f43ca..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/CoreAttributes.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.flowfile.attributes; - -public enum CoreAttributes implements FlowFileAttributeKey { - /** - * The flowfile's path indicates the relative directory to which a FlowFile belongs and does not - * contain the filename - */ - PATH("path"), - - /** - * The flowfile's absolute path indicates the absolute directory to which a FlowFile belongs and does not - * contain the filename - */ - ABSOLUTE_PATH("absolute.path"), - - /** - * The filename of the FlowFile. The filename should not contain any directory structure. - */ - FILENAME("filename"), - - /** - * A unique UUID assigned to this FlowFile - */ - UUID("uuid"), - - /** - * A numeric value indicating the FlowFile priority - */ - PRIORITY("priority"), - - /** - * The MIME Type of this FlowFile - */ - MIME_TYPE("mime.type"), - - /** - * Specifies the reason that a FlowFile is being discarded - */ - DISCARD_REASON("discard.reason"), - - /** - * Indicates an identifier other than the FlowFile's UUID that is known to refer to this FlowFile. - */ - ALTERNATE_IDENTIFIER("alternate.identifier"); - - private final String key; - private CoreAttributes(final String key) { - this.key = key; - } - - @Override - public String key() { - return key; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java b/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java deleted file mode 100644 index cc6c28e..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/FlowFileAttributeKey.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.flowfile.attributes; - -public interface FlowFileAttributeKey { - String key(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java deleted file mode 100644 index 77c34c9..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Objects; - -public class StandardVersionNegotiator implements VersionNegotiator { - - private final List versions; - private int curVersion; - - public StandardVersionNegotiator(final int... supportedVersions) { - if (Objects.requireNonNull(supportedVersions).length == 0) { - throw new IllegalArgumentException("At least one version must be supported"); - } - - final List supported = new ArrayList<>(); - for (final int version : supportedVersions) { - supported.add(version); - } - this.versions = Collections.unmodifiableList(supported); - this.curVersion = supportedVersions[0]; - } - - @Override - public int getVersion() { - return curVersion; - } - - @Override - public void setVersion(final int version) throws IllegalArgumentException { - if (!isVersionSupported(version)) { - throw new IllegalArgumentException("Version " + version + " is not supported"); - } - - this.curVersion = version; - } - - @Override - public int getPreferredVersion() { - return versions.get(0); - } - - @Override - public Integer getPreferredVersion(final int maxVersion) { - for (final Integer version : this.versions) { - if (maxVersion >= version) { - return version; - } - } - return null; - } - - @Override - public boolean isVersionSupported(final int version) { - return versions.contains(version); - } - - @Override - public List getSupportedVersions() { - return versions; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java deleted file mode 100644 index 74f9b3d..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote; - -import java.util.List; - -public interface VersionNegotiator { - - /** - * @return the currently configured Version of this resource - */ - int getVersion(); - - /** - * Sets the version of this resource to the specified version. Only the - * lower byte of the version is relevant. - * - * @param version - * @throws IllegalArgumentException if the given Version is not supported by - * this resource, as is indicated by the {@link #isVersionSupported(int)} - * method - */ - void setVersion(int version) throws IllegalArgumentException; - - /** - * - * @return the Version of this resource that is preferred - */ - int getPreferredVersion(); - - /** - * Gets the preferred version of this resource that is no greater than the - * given maxVersion. If no acceptable version exists that is less than - * maxVersion, then null is returned - * - * @param maxVersion - * @return - */ - Integer getPreferredVersion(int maxVersion); - - /** - * Indicates whether or not the specified version is supported by this - * resource - * - * @param version - * @return - */ - boolean isVersionSupported(int version); - - List getSupportedVersions(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java deleted file mode 100644 index 05fd915..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.exception; - -/** - * Indicates that the user disabled transmission while communications were - * taking place with a peer - */ -public class TransmissionDisabledException extends RuntimeException { - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java deleted file mode 100644 index 71cf894..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; -import java.util.zip.DataFormatException; -import java.util.zip.Inflater; - -public class CompressionInputStream extends InputStream { - - private final InputStream in; - private final Inflater inflater; - - private byte[] compressedBuffer; - private byte[] buffer; - - private int bufferIndex; - private boolean eos = false; // whether or not we've reached the end of stream - private boolean allDataRead = false; // different from eos b/c eos means allDataRead == true && buffer is empty - - private final byte[] fourByteBuffer = new byte[4]; - - public CompressionInputStream(final InputStream in) { - this.in = in; - inflater = new Inflater(); - - buffer = new byte[0]; - compressedBuffer = new byte[0]; - bufferIndex = 1; - } - - private String toHex(final byte[] array) { - final StringBuilder sb = new StringBuilder("0x"); - for (final byte b : array) { - final String hex = Integer.toHexString(b).toUpperCase(); - if (hex.length() == 1) { - sb.append("0"); - } - sb.append(hex); - } - return sb.toString(); - } - - protected void readChunkHeader() throws IOException { - // Ensure that we have a valid SYNC chunk - fillBuffer(fourByteBuffer); - if (!Arrays.equals(CompressionOutputStream.SYNC_BYTES, fourByteBuffer)) { - throw new IOException("Invalid CompressionInputStream. Expected first 4 bytes to be 'SYNC' but were " + toHex(fourByteBuffer)); - } - - // determine the size of the decompressed buffer - fillBuffer(fourByteBuffer); - buffer = new byte[toInt(fourByteBuffer)]; - - // determine the size of the compressed buffer - fillBuffer(fourByteBuffer); - compressedBuffer = new byte[toInt(fourByteBuffer)]; - - bufferIndex = buffer.length; // indicate that buffer is empty - } - - private int toInt(final byte[] data) { - return ((data[0] & 0xFF) << 24) - | ((data[1] & 0xFF) << 16) - | ((data[2] & 0xFF) << 8) - | (data[3] & 0xFF); - } - - protected void bufferAndDecompress() throws IOException { - if (allDataRead) { - eos = true; - return; - } - - readChunkHeader(); - fillBuffer(compressedBuffer); - - inflater.setInput(compressedBuffer); - try { - inflater.inflate(buffer); - } catch (final DataFormatException e) { - throw new IOException(e); - } - inflater.reset(); - - bufferIndex = 0; - final int moreDataByte = in.read(); - if (moreDataByte < 1) { - allDataRead = true; - } else if (moreDataByte > 1) { - throw new IOException("Expected indicator of whether or not more data was to come (-1, 0, or 1) but got " + moreDataByte); - } - } - - private void fillBuffer(final byte[] buffer) throws IOException { - int len; - int bytesLeft = buffer.length; - int bytesRead = 0; - while (bytesLeft > 0 && (len = in.read(buffer, bytesRead, bytesLeft)) > 0) { - bytesLeft -= len; - bytesRead += len; - } - - if (bytesRead < buffer.length) { - throw new EOFException(); - } - } - - private boolean isBufferEmpty() { - return bufferIndex >= buffer.length; - } - - @Override - public int read() throws IOException { - if (eos) { - return -1; - } - - if (isBufferEmpty()) { - bufferAndDecompress(); - } - - if (isBufferEmpty()) { - eos = true; - return -1; - } - - return buffer[bufferIndex++]; - } - - @Override - public int read(final byte[] b) throws IOException { - return read(b, 0, b.length); - } - - @Override - public int read(final byte[] b, final int off, final int len) throws IOException { - if (eos) { - return -1; - } - - if (isBufferEmpty()) { - bufferAndDecompress(); - } - - if (isBufferEmpty()) { - eos = true; - return -1; - } - - final int free = buffer.length - bufferIndex; - final int bytesToTransfer = Math.min(len, free); - System.arraycopy(buffer, bufferIndex, b, off, bytesToTransfer); - bufferIndex += bytesToTransfer; - - return bytesToTransfer; - } - - /** - * Does nothing. Does NOT close underlying InputStream - * @throws java.io.IOException - */ - @Override - public void close() throws IOException { - - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java deleted file mode 100644 index bc46b0f..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.zip.Deflater; - -public class CompressionOutputStream extends OutputStream { - - public static final byte[] SYNC_BYTES = new byte[]{'S', 'Y', 'N', 'C'}; - - public static final int DEFAULT_COMPRESSION_LEVEL = 1; - public static final int DEFAULT_BUFFER_SIZE = 64 << 10; - public static final int MIN_BUFFER_SIZE = 8 << 10; - - private final OutputStream out; - private final Deflater deflater; - - private final byte[] buffer; - private final byte[] compressed; - - private int bufferIndex = 0; - private boolean dataWritten = false; - - public CompressionOutputStream(final OutputStream outStream) { - this(outStream, DEFAULT_BUFFER_SIZE); - } - - public CompressionOutputStream(final OutputStream outStream, final int bufferSize) { - this(outStream, bufferSize, DEFAULT_COMPRESSION_LEVEL, Deflater.DEFAULT_STRATEGY); - } - - public CompressionOutputStream(final OutputStream outStream, final int bufferSize, final int level, final int strategy) { - if (bufferSize < MIN_BUFFER_SIZE) { - throw new IllegalArgumentException("Buffer size must be at least " + MIN_BUFFER_SIZE); - } - - this.out = outStream; - this.deflater = new Deflater(level); - this.deflater.setStrategy(strategy); - buffer = new byte[bufferSize]; - compressed = new byte[bufferSize + 64]; - } - - /** - * Compresses the currently buffered chunk of data and sends it to the - * output stream - * - * @throws IOException - */ - protected void compressAndWrite() throws IOException { - if (bufferIndex <= 0) { - return; - } - - deflater.setInput(buffer, 0, bufferIndex); - deflater.finish(); - final int compressedBytes = deflater.deflate(compressed); - - writeChunkHeader(compressedBytes); - out.write(compressed, 0, compressedBytes); - - bufferIndex = 0; - deflater.reset(); - } - - private void writeChunkHeader(final int compressedBytes) throws IOException { - // If we have already written data, write out a '1' to indicate that we have more data; when we close - // the stream, we instead write a '0' to indicate that we are finished sending data. - if (dataWritten) { - out.write(1); - } - out.write(SYNC_BYTES); - dataWritten = true; - - writeInt(out, bufferIndex); - writeInt(out, compressedBytes); - } - - private void writeInt(final OutputStream out, final int val) throws IOException { - out.write(val >>> 24); - out.write(val >>> 16); - out.write(val >>> 8); - out.write(val); - } - - protected boolean bufferFull() { - return bufferIndex >= buffer.length; - } - - @Override - public void write(final int b) throws IOException { - buffer[bufferIndex++] = (byte) (b & 0xFF); - if (bufferFull()) { - compressAndWrite(); - } - } - - @Override - public void write(final byte[] b) throws IOException { - write(b, 0, b.length); - } - - @Override - public void write(final byte[] b, final int off, final int len) throws IOException { - int bytesLeft = len; - while (bytesLeft > 0) { - final int free = buffer.length - bufferIndex; - final int bytesThisIteration = Math.min(bytesLeft, free); - System.arraycopy(b, off + len - bytesLeft, buffer, bufferIndex, bytesThisIteration); - bufferIndex += bytesThisIteration; - - bytesLeft -= bytesThisIteration; - if (bufferFull()) { - compressAndWrite(); - } - } - } - - @Override - public void flush() throws IOException { - compressAndWrite(); - super.flush(); - } - - @Override - public void close() throws IOException { - compressAndWrite(); - out.write(0); // indicate that the stream is finished. - out.flush(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java deleted file mode 100644 index e03dfbf..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io; - -import java.io.IOException; -import java.io.InputStream; - -import org.apache.nifi.remote.exception.TransmissionDisabledException; - -public class InterruptableInputStream extends InputStream { - - private volatile boolean interrupted = false; - private final InputStream in; - - public InterruptableInputStream(final InputStream in) { - this.in = in; - } - - @Override - public int read() throws IOException { - if (interrupted) { - throw new TransmissionDisabledException(); - } - - return in.read(); - } - - @Override - public int read(byte[] b) throws IOException { - if (interrupted) { - throw new TransmissionDisabledException(); - } - - return in.read(b); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - if (interrupted) { - throw new TransmissionDisabledException(); - } - - return in.read(b, off, len); - } - - @Override - public int available() throws IOException { - if (interrupted) { - throw new TransmissionDisabledException(); - } - - return in.available(); - } - - @Override - public void close() throws IOException { - if (interrupted) { - throw new TransmissionDisabledException(); - } - - in.close(); - } - - @Override - public synchronized void mark(int readlimit) { - if (interrupted) { - throw new TransmissionDisabledException(); - } - - in.mark(readlimit); - } - - @Override - public boolean markSupported() { - if (interrupted) { - throw new TransmissionDisabledException(); - } - - return in.markSupported(); - } - - @Override - public synchronized void reset() throws IOException { - if (interrupted) { - throw new TransmissionDisabledException(); - } - - in.reset(); - } - - @Override - public long skip(long n) throws IOException { - if (interrupted) { - throw new TransmissionDisabledException(); - } - - return in.skip(n); - } - - public void interrupt() { - interrupted = true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java deleted file mode 100644 index cba5be6..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io; - -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.nifi.remote.exception.TransmissionDisabledException; - -public class InterruptableOutputStream extends OutputStream { - - private final OutputStream out; - private volatile boolean interrupted = false; - - public InterruptableOutputStream(final OutputStream out) { - this.out = out; - } - - @Override - public void write(int b) throws IOException { - if (interrupted) { - throw new TransmissionDisabledException(); - } - - out.write(b); - } - - @Override - public void write(byte[] b) throws IOException { - if (interrupted) { - throw new TransmissionDisabledException(); - } - - out.write(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - if (interrupted) { - throw new TransmissionDisabledException(); - } - - out.write(b, off, len); - } - - @Override - public void close() throws IOException { - if (interrupted) { - throw new TransmissionDisabledException(); - } - - out.close(); - } - - @Override - public void flush() throws IOException { - if (interrupted) { - throw new TransmissionDisabledException(); - } - - out.flush(); - } - - public void interrupt() { - this.interrupted = true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java deleted file mode 100644 index 68913bd..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io.socket; - -import java.nio.ByteBuffer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class BufferStateManager { - - private static final Logger logger = LoggerFactory.getLogger(BufferStateManager.class); - - private ByteBuffer buffer; - private Direction direction = Direction.WRITE; - - public BufferStateManager(final ByteBuffer buffer) { - this.buffer = buffer; - } - - public BufferStateManager(final ByteBuffer buffer, final Direction direction) { - this.buffer = buffer; - this.direction = direction; - } - - /** - * Ensures that the buffer is at least as big as the size specified, - * resizing the buffer if necessary. This operation MAY change the direction - * of the buffer. - * - * @param requiredSize - */ - public void ensureSize(final int requiredSize) { - if (buffer.capacity() < requiredSize) { - final ByteBuffer newBuffer = ByteBuffer.allocate(requiredSize); - - // we have to read buffer so make sure the direction is correct. - if (direction == Direction.WRITE) { - buffer.flip(); - } - - // Copy from buffer to newBuffer - newBuffer.put(buffer); - - // Swap the buffers - buffer = newBuffer; - - // the new buffer is ready to be written to - direction = Direction.WRITE; - } - } - - public ByteBuffer prepareForWrite(final int requiredSize) { - ensureSize(requiredSize); - - if (direction == Direction.READ) { - direction = Direction.WRITE; - buffer.position(buffer.limit()); - } - - buffer.limit(buffer.capacity()); - return buffer; - } - - public ByteBuffer prepareForRead(final int requiredSize) { - ensureSize(requiredSize); - - if (direction == Direction.WRITE) { - direction = Direction.READ; - buffer.flip(); - } - - return buffer; - } - - /** - * Clears the contents of the buffer and sets direction to WRITE - */ - public void clear() { - logger.debug("Clearing {}", buffer); - buffer.clear(); - direction = Direction.WRITE; - } - - public void compact() { - final String before = buffer.toString(); - buffer.compact(); - logger.debug("Before compact: {}, after: {}", before, buffer); - direction = Direction.WRITE; - } - - public static enum Direction { - - READ, WRITE; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java deleted file mode 100644 index 32a3f26..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io.socket; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.net.SocketTimeoutException; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedByInterruptException; -import java.nio.channels.SocketChannel; -import java.util.concurrent.TimeUnit; - -public class SocketChannelInputStream extends InputStream { - - private static final long CHANNEL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS); - private final SocketChannel channel; - private volatile int timeoutMillis = 30000; - - private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1); - private Byte bufferedByte = null; - - public SocketChannelInputStream(final SocketChannel socketChannel) throws IOException { - // this class expects a non-blocking channel - socketChannel.configureBlocking(false); - this.channel = socketChannel; - } - - public void setTimeout(final int timeoutMillis) { - this.timeoutMillis = timeoutMillis; - } - - @Override - public int read() throws IOException { - if (bufferedByte != null) { - final int retVal = bufferedByte & 0xFF; - bufferedByte = null; - return retVal; - } - - oneByteBuffer.flip(); - oneByteBuffer.clear(); - - final long maxTime = System.currentTimeMillis() + timeoutMillis; - int bytesRead; - do { - bytesRead = channel.read(oneByteBuffer); - if (bytesRead == 0) { - if (System.currentTimeMillis() > maxTime) { - throw new SocketTimeoutException("Timed out reading from socket"); - } - try { - TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS); - } catch (InterruptedException e) { - close(); - Thread.currentThread().interrupt(); // set the interrupt status - throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation - } - } - } while (bytesRead == 0); - - if (bytesRead == -1) { - return -1; - } - oneByteBuffer.flip(); - return oneByteBuffer.get() & 0xFF; - } - - @Override - public int read(final byte[] b) throws IOException { - return read(b, 0, b.length); - } - - @Override - public int read(final byte[] b, final int off, final int len) throws IOException { - if (bufferedByte != null) { - final byte retVal = bufferedByte; - bufferedByte = null; - b[off] = retVal; - return 1; - } - - final ByteBuffer buffer = ByteBuffer.wrap(b, off, len); - - final long maxTime = System.currentTimeMillis() + timeoutMillis; - int bytesRead; - do { - bytesRead = channel.read(buffer); - if (bytesRead == 0) { - if (System.currentTimeMillis() > maxTime) { - throw new SocketTimeoutException("Timed out reading from socket"); - } - try { - TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS); - } catch (InterruptedException e) { - close(); - Thread.currentThread().interrupt(); // set the interrupt status - throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation - } - } - } while (bytesRead == 0); - - return bytesRead; - } - - @Override - public int available() throws IOException { - if (bufferedByte != null) { - return 1; - } - - isDataAvailable(); // attempt to read from socket - return (bufferedByte == null) ? 0 : 1; - } - - public boolean isDataAvailable() throws IOException { - if (bufferedByte != null) { - return true; - } - - oneByteBuffer.flip(); - oneByteBuffer.clear(); - final int bytesRead = channel.read(oneByteBuffer); - if (bytesRead == -1) { - throw new EOFException("Peer has closed the stream"); - } - if (bytesRead > 0) { - oneByteBuffer.flip(); - bufferedByte = oneByteBuffer.get(); - return true; - } - return false; - } - - /** - * Closes the underlying socket channel. - * @throws java.io.IOException - */ - @Override - public void close() throws IOException { - channel.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java deleted file mode 100644 index 77049ad..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io.socket; - -import java.io.IOException; -import java.io.OutputStream; -import java.net.SocketTimeoutException; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedByInterruptException; -import java.nio.channels.SocketChannel; -import java.util.concurrent.TimeUnit; - -public class SocketChannelOutputStream extends OutputStream { - - private static final long CHANNEL_FULL_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS); - private final SocketChannel channel; - private volatile int timeout = 30000; - - private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1); - - public SocketChannelOutputStream(final SocketChannel socketChannel) throws IOException { - // this class expects a non-blocking channel - socketChannel.configureBlocking(false); - this.channel = socketChannel; - } - - public void setTimeout(final int timeoutMillis) { - this.timeout = timeoutMillis; - } - - @Override - public void write(final int b) throws IOException { - oneByteBuffer.flip(); - oneByteBuffer.clear(); - oneByteBuffer.put((byte) b); - oneByteBuffer.flip(); - - final int timeoutMillis = this.timeout; - long maxTime = System.currentTimeMillis() + timeoutMillis; - int bytesWritten; - while (oneByteBuffer.hasRemaining()) { - bytesWritten = channel.write(oneByteBuffer); - if (bytesWritten == 0) { - if (System.currentTimeMillis() > maxTime) { - throw new SocketTimeoutException("Timed out writing to socket"); - } - try { - TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS); - } catch (InterruptedException e) { - close(); - Thread.currentThread().interrupt(); // set the interrupt status - throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation - } - } else { - return; - } - } - } - - @Override - public void write(final byte[] b) throws IOException { - write(b, 0, b.length); - } - - @Override - public void write(final byte[] b, final int off, final int len) throws IOException { - final ByteBuffer buffer = ByteBuffer.wrap(b, off, len); - - final int timeoutMillis = this.timeout; - long maxTime = System.currentTimeMillis() + timeoutMillis; - int bytesWritten; - while (buffer.hasRemaining()) { - bytesWritten = channel.write(buffer); - if (bytesWritten == 0) { - if (System.currentTimeMillis() > maxTime) { - throw new SocketTimeoutException("Timed out writing to socket"); - } - try { - TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS); - } catch (InterruptedException e) { - close(); - Thread.currentThread().interrupt(); // set the interrupt status - throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation - } - } else { - maxTime = System.currentTimeMillis() + timeoutMillis; - } - } - } - - /** - * Closes the underlying SocketChannel - * @throws java.io.IOException - */ - @Override - public void close() throws IOException { - channel.close(); - } -}