flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [2/3] flink git commit: [FLINK-8109][py] Check for existence of plan/additional files
Date Mon, 20 Nov 2017 14:44:26 GMT
[FLINK-8109][py] Check for existence of plan/additional files


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/497c36f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/497c36f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/497c36f0

Branch: refs/heads/release-1.4
Commit: 497c36f0d25c0277832a682226f6f08ba7c83635
Parents: 8eadda3
Author: zentol <chesnay@apache.org>
Authored: Mon Nov 20 12:58:27 2017 +0100
Committer: zentol <chesnay@apache.org>
Committed: Mon Nov 20 15:44:08 2017 +0100

----------------------------------------------------------------------
 .../flink/python/api/PythonPlanBinder.java      | 21 +++++++-
 .../flink/python/api/PythonPlanBinderTest.java  | 53 ++++++++++++++++----
 2 files changed, 61 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/497c36f0/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index b7adde1..e0c8215 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -50,6 +50,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.UUID;
@@ -93,7 +94,12 @@ public class PythonPlanBinder {
 	public static void main(String[] args) throws Exception {
 		Configuration globalConfig = GlobalConfiguration.loadConfiguration();
 		PythonPlanBinder binder = new PythonPlanBinder(globalConfig);
-		binder.runPlan(args);
+		try {
+			binder.runPlan(args);
+		} catch (Exception e) {
+			System.out.println("Failed to run plan: " + e.getMessage());
+			LOG.error("Failed to run plan.", e);
+		}
 	}
 
 	public PythonPlanBinder(Configuration globalConfig) {
@@ -146,11 +152,22 @@ public class PythonPlanBinder {
 
 			operatorConfig.setString(PLAN_ARGUMENTS_KEY, planArguments);
 
+			Path planPath = new Path(planFile);
+			if (!FileSystem.getUnguardedFileSystem(planPath.toUri()).exists(planPath)) {
+				throw new FileNotFoundException("Plan file " + planFile + " does not exist.");
+			}
+			for (String file : filesToCopy) {
+				Path filePath = new Path(file);
+				if (!FileSystem.getUnguardedFileSystem(filePath.toUri()).exists(filePath)) {
+					throw new FileNotFoundException("Additional file " + file + " does not exist.");
+				}
+			}
+
 			// copy flink library, plan file and additional files to temporary location
 			Path tmpPlanFilesPath = new Path(tmpPlanFilesDir);
 			deleteIfExists(tmpPlanFilesPath);
 			FileCache.copy(new Path(pythonLibraryPath), tmpPlanFilesPath, false);
-			copyFile(new Path(planFile), tmpPlanFilesPath, FLINK_PYTHON_PLAN_NAME);
+			copyFile(planPath, tmpPlanFilesPath, FLINK_PYTHON_PLAN_NAME);
 			for (String file : filesToCopy) {
 				Path source = new Path(file);
 				copyFile(source, tmpPlanFilesPath, source.getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/497c36f0/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
index 9e63091..55cf1dc 100644
--- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.python.api.streaming.data.PythonStreamer;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
 import java.io.BufferedReader;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
@@ -119,22 +120,52 @@ public class PythonPlanBinderTest extends JavaProgramTestBase {
 	@Override
 	protected void testProgram() throws Exception {
 		testBoundCheck();
-		String utils = findUtilsFile();
+		testNotExistingPlanFile();
+		testNotExistingAdditionalFile();
 		String python2 = getPython2Path();
 		if (python2 != null) {
-			for (String file : findTestFiles()) {
-				Configuration configuration = new Configuration();
-				configuration.setString(PythonOptions.PYTHON_BINARY_PATH, python2);
-				new PythonPlanBinder(configuration).runPlan(new String[]{file, utils});
-			}
+			log.info("Running python2 tests");
+			runTestPrograms(python2);
 		}
 		String python3 = getPython3Path();
 		if (python3 != null) {
-			for (String file : findTestFiles()) {
-				Configuration configuration = new Configuration();
-				configuration.setString(PythonOptions.PYTHON_BINARY_PATH, python3);
-				new PythonPlanBinder(configuration).runPlan(new String[]{file, utils});
-			}
+			log.info("Running python3 tests");
+			runTestPrograms(python3);
+		}
+	}
+
+	private void runTestPrograms(String pythonBinary) throws Exception {
+		String utils = findUtilsFile();
+		for (String file : findTestFiles()) {
+			log.info("Running file {}.", file);
+			Configuration configuration = new Configuration();
+			configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary);
+			new PythonPlanBinder(configuration).runPlan(new String[]{file, utils});
+		}
+	}
+
+	private void testNotExistingPlanFile() throws Exception {
+		log.info("Running testNotExistingPlanFile.");
+		String utils = findUtilsFile();
+		String nonExistingPlan = utils + "abc";
+		Configuration configuration = new Configuration();
+		try {
+			new PythonPlanBinder(configuration).runPlan(new String[]{nonExistingPlan});
+		} catch (FileNotFoundException expected) {
+			// we expect this exception to be thrown since the plan file does not exist
+		}
+	}
+
+	private void testNotExistingAdditionalFile() throws Exception {
+		log.info("Running testNotExistingAdditionalFile.");
+		String utils = findUtilsFile();
+		String planFile = findTestFiles().iterator().next();
+		String nonExistingLibrary = utils + "abc";
+		Configuration configuration = new Configuration();
+		try {
+			new PythonPlanBinder(configuration).runPlan(new String[]{planFile, utils, nonExistingLibrary});
+		} catch (FileNotFoundException expected) {
+			// we expect this exception to be thrown since the plan file does not exist
 		}
 	}
 


Mime
View raw message