flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [31/37] flink git commit: [hotfix] [py] Code cleanup - PythonStreamer
Date Thu, 06 Apr 2017 07:28:44 GMT
[hotfix] [py] Code cleanup - PythonStreamer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd588efe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd588efe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd588efe

Branch: refs/heads/table-retraction
Commit: dd588efe6470dfdcad51249ff36f12cfde7b75da
Parents: 258ed17
Author: zentol <chesnay@apache.org>
Authored: Thu Mar 30 23:13:07 2017 +0200
Committer: zentol <chesnay@apache.org>
Committed: Wed Apr 5 20:44:30 2017 +0200

----------------------------------------------------------------------
 .../python/api/streaming/data/PythonStreamer.java | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dd588efe/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 3c79b1f..006a1b2 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -84,7 +84,7 @@ public class PythonStreamer<S extends PythonSender, OUT> implements
Serializable
 		this.setID = setID;
 		this.usePython3 = PythonPlanBinder.usePython3;
 		planArguments = PythonPlanBinder.arguments.toString();
-		receiver = new PythonReceiver(usesByteArray);
+		receiver = new PythonReceiver<>(usesByteArray);
 		this.function = function;
 		this.sender = sender;
 	}
@@ -162,10 +162,22 @@ public class PythonStreamer<S extends PythonSender, OUT> implements
Serializable
 	private void checkPythonProcessHealth() {
 		try {
 			int value = process.exitValue();
+			try {
+				outPrinter.join();
+			} catch (InterruptedException ignored) {
+				outPrinter.interrupt();
+				Thread.interrupted();
+			}
+			try {
+				errorPrinter.join();
+			} catch (InterruptedException ignored) {
+				errorPrinter.interrupt();
+				Thread.interrupted();
+			}
 			if (value != 0) {
-				throw new RuntimeException("Plan file caused an error. Check log-files for details.");
+				throw new RuntimeException("Plan file caused an error. Check log-files for details."
+ msg.get());
 			} else {
-				throw new RuntimeException("Plan file exited prematurely without an error.");
+				throw new RuntimeException("Plan file exited prematurely without an error." + msg.get());
 			}
 		} catch (IllegalThreadStateException ignored) {//Process still running
 		}


Mime
View raw message