flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-6384] [py] Remove python binary check via additional process
Date Wed, 26 Apr 2017 13:55:24 GMT
Repository: flink
Updated Branches:
  refs/heads/master 0d856b34f -> 1829819b6


[FLINK-6384] [py] Remove python binary check via additional process

The PythonStreamer used to check for the existence of the python binary by
starting a python process. This process was not closed afterwards. This caused
the PythonPlanBinderTest to fail locally.

I think the check whether a python binary exists is not necessary since the
subsequent python command would fail anyway if there is no binary available on
the system. The system failure message is that there is no such file or directory.
This error message should be descriptive enough in order to debug such a problem.

This closes #3774.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1829819b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1829819b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1829819b

Branch: refs/heads/master
Commit: 1829819b64fb32740bcbd4d5074e3fa138276f89
Parents: 0d856b3
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Apr 25 20:41:58 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Apr 26 15:54:56 2017 +0200

----------------------------------------------------------------------
 .../api/streaming/data/PythonStreamer.java      | 32 ++++++++++++--------
 .../flink/python/api/PythonPlanBinderTest.java  | 21 ++++++++++---
 2 files changed, 36 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1829819b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 97d5780..cc4ba43 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -19,6 +19,7 @@ import org.apache.flink.python.api.PythonOptions;
 import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializer;
 import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer;
 import org.apache.flink.python.api.streaming.util.StreamPrinter;
+import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -114,13 +115,7 @@ public class PythonStreamer<S extends PythonSender, OUT> implements
Serializable
 
 		String pythonBinaryPath = config.getString(PythonOptions.PYTHON_BINARY_PATH);
 
-		try {
-			Runtime.getRuntime().exec(pythonBinaryPath);
-		} catch (IOException ignored) {
-			throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
-		}
-
-		process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + config.getString(PLAN_ARGUMENTS_KEY,
""));
+		process = Runtime.getRuntime().exec(new String[] {pythonBinaryPath, "-O", "-B", planPath,
config.getString(PLAN_ARGUMENTS_KEY, "")});
 		outPrinter = new Thread(new StreamPrinter(process.getInputStream()));
 		outPrinter.start();
 		errorPrinter = new Thread(new StreamPrinter(process.getErrorStream(), msg));
@@ -130,8 +125,9 @@ public class PythonStreamer<S extends PythonSender, OUT> implements
Serializable
 			@Override
 			public void run() {
 				try {
-					destroyProcess();
-				} catch (IOException ignored) {
+					destroyProcess(process);
+				} catch (IOException ioException) {
+					LOG.warn("Could not destroy python process.", ioException);
 				}
 			}
 		};
@@ -192,20 +188,30 @@ public class PythonStreamer<S extends PythonSender, OUT> implements
Serializable
 	 * @throws IOException
 	 */
 	public void close() throws IOException {
+		Throwable throwable = null;
+
 		try {
 			socket.close();
 			sender.close();
 			receiver.close();
-		} catch (Exception e) {
-			LOG.error("Exception occurred while closing Streamer. :{}", e.getMessage());
+		} catch (Throwable t) {
+			throwable = t;
 		}
-		destroyProcess();
+
+		try {
+			destroyProcess(process);
+		} catch (Throwable t) {
+			throwable = ExceptionUtils.firstOrSuppressed(t, throwable);
+		}
+
 		if (shutdownThread != null) {
 			Runtime.getRuntime().removeShutdownHook(shutdownThread);
 		}
+
+		ExceptionUtils.tryRethrowIOException(throwable);
 	}
 
-	private void destroyProcess() throws IOException {
+	public static void destroyProcess(Process process) throws IOException {
 		try {
 			process.exitValue();
 		} catch (IllegalThreadStateException ignored) { //process still active

http://git-wip-us.apache.org/repos/asf/flink/blob/1829819b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
index ba8ea78..20f3503 100644
--- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -16,6 +16,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.python.api.streaming.data.PythonStreamer;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
 import java.io.IOException;
@@ -50,21 +51,33 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
 		return files;
 	}
 
-	private static boolean isPython2Supported() {
+	private static boolean isPython2Supported() throws IOException {
+		Process process = null;
+
 		try {
-			Runtime.getRuntime().exec("python");
+			process = Runtime.getRuntime().exec("python");
 			return true;
 		} catch (IOException ex) {
 			return false;
+		} finally {
+			if (process != null) {
+				PythonStreamer.destroyProcess(process);
+			}
 		}
 	}
 
-	private static boolean isPython3Supported() {
+	private static boolean isPython3Supported() throws IOException {
+		Process process = null;
+
 		try {
-			Runtime.getRuntime().exec("python3");
+			process = Runtime.getRuntime().exec("python3");
 			return true;
 		} catch (IOException ex) {
 			return false;
+		} finally {
+			if (process != null) {
+				PythonStreamer.destroyProcess(process);
+			}
 		}
 	}
 


Mime
View raw message