Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5CD8A200C62 for ; Wed, 26 Apr 2017 15:55:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5862D160BA8; Wed, 26 Apr 2017 13:55:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7A183160B95 for ; Wed, 26 Apr 2017 15:55:25 +0200 (CEST) Received: (qmail 95883 invoked by uid 500); 26 Apr 2017 13:55:24 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 95874 invoked by uid 99); 26 Apr 2017 13:55:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Apr 2017 13:55:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7E92FDFBB7; Wed, 26 Apr 2017 13:55:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Message-Id: <1fa58c75141e4e58bdd4934fab96e2c7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-6384] [py] Remove python binary check via additional process Date: Wed, 26 Apr 2017 13:55:24 +0000 (UTC) archived-at: Wed, 26 Apr 2017 13:55:26 -0000 Repository: flink Updated Branches: refs/heads/master 0d856b34f -> 1829819b6 [FLINK-6384] [py] Remove python binary check via additional process The PythonStreamer used to check for the existence of the python binary by starting a python process. This process was not closed afterwards. This caused the PythonPlanBinderTest to fail locally. I think the check whether a python binary exists is not necessary since the subsequent python command would fail anyway if there is no binary available on the system. The system failure message is that there is no such file or directory. This error message should be descriptive enough in order to debug such a problem. This closes #3774. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1829819b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1829819b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1829819b Branch: refs/heads/master Commit: 1829819b64fb32740bcbd4d5074e3fa138276f89 Parents: 0d856b3 Author: Till Rohrmann Authored: Tue Apr 25 20:41:58 2017 +0200 Committer: Till Rohrmann Committed: Wed Apr 26 15:54:56 2017 +0200 ---------------------------------------------------------------------- .../api/streaming/data/PythonStreamer.java | 32 ++++++++++++-------- .../flink/python/api/PythonPlanBinderTest.java | 21 ++++++++++--- 2 files changed, 36 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1829819b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java index 97d5780..cc4ba43 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java @@ -19,6 +19,7 @@ import org.apache.flink.python.api.PythonOptions; import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializer; import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer; import org.apache.flink.python.api.streaming.util.StreamPrinter; +import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,13 +115,7 @@ public class PythonStreamer implements Serializable String pythonBinaryPath = config.getString(PythonOptions.PYTHON_BINARY_PATH); - try { - Runtime.getRuntime().exec(pythonBinaryPath); - } catch (IOException ignored) { - throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary."); - } - - process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + config.getString(PLAN_ARGUMENTS_KEY, "")); + process = Runtime.getRuntime().exec(new String[] {pythonBinaryPath, "-O", "-B", planPath, config.getString(PLAN_ARGUMENTS_KEY, "")}); outPrinter = new Thread(new StreamPrinter(process.getInputStream())); outPrinter.start(); errorPrinter = new Thread(new StreamPrinter(process.getErrorStream(), msg)); @@ -130,8 +125,9 @@ public class PythonStreamer implements Serializable @Override public void run() { try { - destroyProcess(); - } catch (IOException ignored) { + destroyProcess(process); + } catch (IOException ioException) { + LOG.warn("Could not destroy python process.", ioException); } } }; @@ -192,20 +188,30 @@ public class PythonStreamer implements Serializable * @throws IOException */ public void close() throws IOException { + Throwable throwable = null; + try { socket.close(); sender.close(); receiver.close(); - } catch (Exception e) { - LOG.error("Exception occurred while closing Streamer. :{}", e.getMessage()); + } catch (Throwable t) { + throwable = t; } - destroyProcess(); + + try { + destroyProcess(process); + } catch (Throwable t) { + throwable = ExceptionUtils.firstOrSuppressed(t, throwable); + } + if (shutdownThread != null) { Runtime.getRuntime().removeShutdownHook(shutdownThread); } + + ExceptionUtils.tryRethrowIOException(throwable); } - private void destroyProcess() throws IOException { + public static void destroyProcess(Process process) throws IOException { try { process.exitValue(); } catch (IllegalThreadStateException ignored) { //process still active http://git-wip-us.apache.org/repos/asf/flink/blob/1829819b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java index ba8ea78..20f3503 100644 --- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java +++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java @@ -16,6 +16,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.python.api.streaming.data.PythonStreamer; import org.apache.flink.test.util.JavaProgramTestBase; import java.io.IOException; @@ -50,21 +51,33 @@ public class PythonPlanBinderTest extends JavaProgramTestBase { return files; } - private static boolean isPython2Supported() { + private static boolean isPython2Supported() throws IOException { + Process process = null; + try { - Runtime.getRuntime().exec("python"); + process = Runtime.getRuntime().exec("python"); return true; } catch (IOException ex) { return false; + } finally { + if (process != null) { + PythonStreamer.destroyProcess(process); + } } } - private static boolean isPython3Supported() { + private static boolean isPython3Supported() throws IOException { + Process process = null; + try { - Runtime.getRuntime().exec("python3"); + process = Runtime.getRuntime().exec("python3"); return true; } catch (IOException ex) { return false; + } finally { + if (process != null) { + PythonStreamer.destroyProcess(process); + } } }