flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [01/12] flink git commit: [hotfix] [py] Code cleanup - PythonPlanBinder [Forced Update!]
Date Thu, 06 Apr 2017 19:29:08 GMT
Repository: flink
Updated Branches:
  refs/heads/table-retraction 448cb333b -> ff2625089 (forced update)


[hotfix] [py] Code cleanup - PythonPlanBinder


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/940d16c7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/940d16c7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/940d16c7

Branch: refs/heads/table-retraction
Commit: 940d16c77731a8558b13cd9b3406862229e4bd4f
Parents: bba49d6
Author: zentol <chesnay@apache.org>
Authored: Thu Mar 30 23:46:44 2017 +0200
Committer: zentol <chesnay@apache.org>
Committed: Thu Apr 6 10:57:10 2017 +0200

----------------------------------------------------------------------
 .../flink/python/api/PythonPlanBinder.java      | 45 ++++++++++----------
 1 file changed, 22 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/940d16c7/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 7c228e1..733a6fb 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -12,13 +12,6 @@
  */
 package org.apache.flink.python.api;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Arrays;
-import java.util.Random;
-
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -40,24 +33,32 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.python.api.functions.util.NestedKeyDiscarder;
-import org.apache.flink.python.api.functions.util.StringTupleDeserializerMap;
 import org.apache.flink.python.api.PythonOperationInfo.DatasizeHint;
-import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.HUGE;
-import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.NONE;
-import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.TINY;
 import org.apache.flink.python.api.functions.PythonCoGroup;
-import org.apache.flink.python.api.functions.util.IdentityGroupReduce;
 import org.apache.flink.python.api.functions.PythonMapPartition;
+import org.apache.flink.python.api.functions.util.IdentityGroupReduce;
 import org.apache.flink.python.api.functions.util.KeyDiscarder;
+import org.apache.flink.python.api.functions.util.NestedKeyDiscarder;
 import org.apache.flink.python.api.functions.util.SerializerMap;
 import org.apache.flink.python.api.functions.util.StringDeserializerMap;
+import org.apache.flink.python.api.functions.util.StringTupleDeserializerMap;
 import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
 import org.apache.flink.python.api.util.SetCache;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.HUGE;
+import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.NONE;
+import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.TINY;
+
 /**
  * This class allows the execution of a Flink plan written in python.
  */
@@ -116,7 +117,7 @@ public class PythonPlanBinder {
 		binder.runPlan(Arrays.copyOfRange(args, 1, args.length));
 	}
 
-	public PythonPlanBinder() throws IOException {
+	public PythonPlanBinder() {
 		Configuration conf = GlobalConfiguration.loadConfiguration();
 		FLINK_PYTHON2_BINARY_PATH = conf.getString(FLINK_PYTHON2_BINARY_KEY, "python");
 		FLINK_PYTHON3_BINARY_PATH = conf.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
@@ -164,6 +165,7 @@ public class PythonPlanBinder {
 	}
 
 	//=====Setup========================================================================================================
+
 	/**
 	 * Copies all files to a common directory (FLINK_PYTHON_FILE_PATH). This allows us to distribute
it as one big
 	 * package, and resolves PYTHONPATH issues.
@@ -186,7 +188,7 @@ public class PythonPlanBinder {
 		}
 	}
 
-	private static void clearPath(String path) throws IOException, URISyntaxException {
+	private static void clearPath(String path) throws IOException {
 		FileSystem fs = FileSystem.get(new Path(path).toUri());
 		if (fs.exists(new Path(path))) {
 			fs.delete(new Path(path), true);
@@ -232,9 +234,9 @@ public class PythonPlanBinder {
 			local.delete(new Path(FLINK_PYTHON_FILE_PATH), true);
 			local.delete(new Path(FLINK_TMP_DATA_DIR), true);
 			streamer.close();
-		} catch (NullPointerException npe) {
+		} catch (NullPointerException ignored) {
 		} catch (IOException ioe) {
-			LOG.error("PythonAPI file cleanup failed. " + ioe.getMessage());
+			LOG.error("PythonAPI file cleanup failed. {}", ioe.getMessage());
 		} catch (URISyntaxException use) { // can't occur
 		}
 	}
@@ -249,6 +251,7 @@ public class PythonPlanBinder {
 	}
 
 	//====Environment===================================================================================================
+
 	/**
 	 * This enum contains the identifiers for all supported environment parameters.
 	 */
@@ -285,6 +288,7 @@ public class PythonPlanBinder {
 	}
 
 	//====Operations====================================================================================================
+
 	/**
 	 * This enum contains the identifiers for all supported DataSet operations.
 	 */
@@ -300,12 +304,7 @@ public class PythonPlanBinder {
 		Integer operationCount = (Integer) streamer.getRecord(true);
 		for (int x = 0; x < operationCount; x++) {
 			PythonOperationInfo info = new PythonOperationInfo(streamer, currentEnvironmentID);
-			Operation op;
-			try {
-				op = Operation.valueOf(info.identifier.toUpperCase());
-			} catch (IllegalArgumentException iae) {
-				throw new IllegalArgumentException("Invalid operation specified: " + info.identifier);
-			}
+			Operation op = Operation.valueOf(info.identifier.toUpperCase());
 			switch (op) {
 				case SOURCE_CSV:
 					createCsvSource(info);


Mime
View raw message