flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [30/37] flink git commit: [hotfix] [py] Code cleanup - StreamPrinter
Date Thu, 06 Apr 2017 07:28:43 GMT
[hotfix] [py] Code cleanup - StreamPrinter

- implements Runnable instead of extending Thread
- use AtomicRefence<String> instead of StringBuilder
- remove redundant wrapInException argument


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/258ed179
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/258ed179
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/258ed179

Branch: refs/heads/table-retraction
Commit: 258ed1791a3e66443b29e4a4e14dcc4e03ffd9e6
Parents: f4324ba
Author: zentol <chesnay@apache.org>
Authored: Thu Mar 30 19:41:54 2017 +0200
Committer: zentol <chesnay@apache.org>
Committed: Wed Apr 5 20:43:55 2017 +0200

----------------------------------------------------------------------
 .../api/streaming/data/PythonStreamer.java      |  7 +++--
 .../api/streaming/plan/PythonPlanStreamer.java  |  4 +--
 .../api/streaming/util/StreamPrinter.java       | 30 ++++++++++++--------
 3 files changed, 24 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/258ed179/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 219ae27..3c79b1f 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -32,6 +32,7 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
@@ -71,7 +72,7 @@ public class PythonStreamer<S extends PythonSender, OUT> implements
Serializable
 	protected S sender;
 	protected PythonReceiver<OUT> receiver;
 
-	protected StringBuilder msg = new StringBuilder();
+	protected AtomicReference<String> msg = new AtomicReference<>();
 
 	protected final AbstractRichFunction function;
 
@@ -118,9 +119,9 @@ public class PythonStreamer<S extends PythonSender, OUT> implements
Serializable
 		}
 
 		process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + planArguments);
-		outPrinter = new StreamPrinter(process.getInputStream());
+		outPrinter = new Thread(new StreamPrinter(process.getInputStream()));
 		outPrinter.start();
-		errorPrinter = new StreamPrinter(process.getErrorStream(), true, msg);
+		errorPrinter = new Thread(new StreamPrinter(process.getErrorStream(), msg));
 		errorPrinter.start();
 
 		shutdownThread = new Thread() {

http://git-wip-us.apache.org/repos/asf/flink/blob/258ed179/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index 4eb0f51..9b62563 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -67,8 +67,8 @@ public class PythonPlanStreamer {
 		}
 		process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + tmpPath + FLINK_PYTHON_PLAN_NAME
+ args);
 
-		new StreamPrinter(process.getInputStream()).start();
-		new StreamPrinter(process.getErrorStream()).start();
+		new Thread(new StreamPrinter(process.getInputStream())).start();
+		new Thread(new StreamPrinter(process.getErrorStream())).start();
 
 		server = new ServerSocket(0);
 		server.setSoTimeout(50);

http://git-wip-us.apache.org/repos/asf/flink/blob/258ed179/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
index 30a728c..c6a1ede 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
@@ -18,40 +18,46 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Simple utility class to print all contents of an inputstream to stdout.
  */
-public class StreamPrinter extends Thread {
+public class StreamPrinter implements Runnable {
 	private final BufferedReader reader;
-	private final boolean wrapInException;
-	private StringBuilder msg;
+	private final AtomicReference<String> output;
 
 	public StreamPrinter(InputStream stream) {
-		this(stream, false, null);
+		this(stream, null);
 	}
 
-	public StreamPrinter(InputStream stream, boolean wrapInException, StringBuilder msg) {
+	public StreamPrinter(InputStream stream, AtomicReference<String> output) {
 		this.reader = new BufferedReader(new InputStreamReader(stream, ConfigConstants.DEFAULT_CHARSET));
-		this.wrapInException = wrapInException;
-		this.msg = msg;
+		this.output = output;
 	}
 
 	@Override
 	public void run() {
 		String line;
-		try {
-			if (wrapInException) {
+		if (output != null) {
+			StringBuilder msg = new StringBuilder();
+			try {
 				while ((line = reader.readLine()) != null) {
-					msg.append("\n" + line);
+					msg.append(line);
+					msg.append("\n");
 				}
-			} else {
+			} catch (IOException ignored) {
+			} finally {
+				output.set(msg.toString());
+			}
+		} else {
+			try {
 				while ((line = reader.readLine()) != null) {
 					System.out.println(line);
 					System.out.flush();
 				}
+			} catch (IOException ignored) {
 			}
-		} catch (IOException ex) {
 		}
 	}
 }


Mime
View raw message