flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/6] flink git commit: [hotfix] [nifi] Minor style cleanups in NiFi source
Date Wed, 08 Jun 2016 13:17:47 GMT
[hotfix] [nifi] Minor style cleanups in NiFi source


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b08b64ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b08b64ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b08b64ab

Branch: refs/heads/master
Commit: b08b64abdb7c9bd7946e9c36e63ec368a1ac5032
Parents: 38362c4
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Jun 7 19:23:56 2016 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Jun 8 15:17:10 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/nifi/NiFiSource.java   | 99 ++++++++++----------
 1 file changed, 49 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b08b64ab/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
index 00b6921..57c59ec 100644
--- a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
+++ b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.nifi;
 
 import org.apache.flink.api.common.functions.StoppableFunction;
@@ -26,6 +27,7 @@ import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.client.SiteToSiteClientConfig;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.stream.io.StreamUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,13 +42,20 @@ import java.util.Map;
  */
 public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> implements
StoppableFunction{
 
+	private static final long serialVersionUID = 1L;
+
 	private static final Logger LOG = LoggerFactory.getLogger(NiFiSource.class);
 
 	private static final long DEFAULT_WAIT_TIME_MS = 1000;
 
-	private long waitTimeMs;
-	private SiteToSiteClient client;
-	private SiteToSiteClientConfig clientConfig;
+	// ------------------------------------------------------------------------
+
+	private final SiteToSiteClientConfig clientConfig;
+
+	private final long waitTimeMs;
+
+	private transient SiteToSiteClient client;
+
 	private volatile boolean isRunning = true;
 
 	/**
@@ -73,63 +82,58 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket>
imple
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
 		client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
-		isRunning = true;
 	}
 
 	@Override
 	public void run(SourceContext<NiFiDataPacket> ctx) throws Exception {
-		try {
-			while (isRunning) {
-				final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
-				if (transaction == null) {
-					LOG.warn("A transaction could not be created, waiting and will try again...");
-					try {
-						Thread.sleep(waitTimeMs);
-					} catch (InterruptedException ignored) {
-
-					}
-					continue;
+		while (isRunning) {
+			final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
+			if (transaction == null) {
+				LOG.warn("A transaction could not be created, waiting and will try again...");
+				try {
+					Thread.sleep(waitTimeMs);
+				} catch (InterruptedException ignored) {
+
 				}
+				continue;
+			}
 
-				DataPacket dataPacket = transaction.receive();
-				if (dataPacket == null) {
-					transaction.confirm();
-					transaction.complete();
+			DataPacket dataPacket = transaction.receive();
+			if (dataPacket == null) {
+				transaction.confirm();
+				transaction.complete();
 
-					LOG.debug("No data available to pull, waiting and will try again...");
-					try {
-						Thread.sleep(waitTimeMs);
-					} catch (InterruptedException ignored) {
+				LOG.debug("No data available to pull, waiting and will try again...");
+				try {
+					Thread.sleep(waitTimeMs);
+				} catch (InterruptedException ignored) {
 
-					}
-					continue;
 				}
+				continue;
+			}
 
-				final List<NiFiDataPacket> niFiDataPackets = new ArrayList<>();
-				do {
-					// Read the data into a byte array and wrap it along with the attributes
-					// into a NiFiDataPacket.
-					final InputStream inStream = dataPacket.getData();
-					final byte[] data = new byte[(int) dataPacket.getSize()];
-					StreamUtils.fillBuffer(inStream, data);
-
-					final Map<String, String> attributes = dataPacket.getAttributes();
+			final List<NiFiDataPacket> niFiDataPackets = new ArrayList<>();
+			do {
+				// Read the data into a byte array and wrap it along with the attributes
+				// into a NiFiDataPacket.
+				final InputStream inStream = dataPacket.getData();
+				final byte[] data = new byte[(int) dataPacket.getSize()];
+				StreamUtils.fillBuffer(inStream, data);
 
-					niFiDataPackets.add(new StandardNiFiDataPacket(data, attributes));
-					dataPacket = transaction.receive();
-				} while (dataPacket != null);
+				final Map<String, String> attributes = dataPacket.getAttributes();
 
-				// Confirm transaction to verify the data
-				transaction.confirm();
+				niFiDataPackets.add(new StandardNiFiDataPacket(data, attributes));
+				dataPacket = transaction.receive();
+			} while (dataPacket != null);
 
-				for (NiFiDataPacket dp : niFiDataPackets) {
-					ctx.collect(dp);
-				}
+			// Confirm transaction to verify the data
+			transaction.confirm();
 
-				transaction.complete();
+			for (NiFiDataPacket dp : niFiDataPackets) {
+				ctx.collect(dp);
 			}
-		} finally {
-			ctx.close();
+
+			transaction.complete();
 		}
 	}
 
@@ -144,11 +148,6 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket>
imple
 		client.close();
 	}
 
- /**
-	* {@inheritDoc}
-	* <p>
-	* Sets the {@link #isRunning} flag to {@code false}.
-	*/
 	@Override
 	public void stop() {
 		this.isRunning = false;


Mime
View raw message