flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [04/12] flink git commit: [FLINK-6229] [py] Rework setup of PythonPlanBinder
Date Thu, 06 Apr 2017 19:29:11 GMT
[FLINK-6229] [py] Rework setup of PythonPlanBinder

- make file/argument split more readable
- pass on Paths where applicable instead of recreating them every time
- rename PPB#clearPath to more appropriate deleteIfExists
- simplify PPB#copyFile
- simplify PPB#startPython
- use UUID#randomUUID() instead of Random.nextInt()
- remove several invalid exception declarations


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ff9c99f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ff9c99f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ff9c99f

Branch: refs/heads/table-retraction
Commit: 5ff9c99ff193725c125f2b4c450411db97b6f3b4
Parents: bdcebfd
Author: zentol <chesnay@apache.org>
Authored: Fri Mar 31 12:11:40 2017 +0200
Committer: zentol <chesnay@apache.org>
Committed: Thu Apr 6 10:57:11 2017 +0200

----------------------------------------------------------------------
 .../flink/python/api/PythonPlanBinder.java      | 283 ++++++++-----------
 1 file changed, 124 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5ff9c99f/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index b6181b4..2378d60 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -49,9 +49,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.URISyntaxException;
 import java.util.Arrays;
-import java.util.Random;
+import java.util.UUID;
 
 import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.HUGE;
 import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.NONE;
@@ -68,9 +67,6 @@ public class PythonPlanBinder {
 
 	public static final String PLANBINDER_CONFIG_BCVAR_COUNT = "PLANBINDER_BCVAR_COUNT";
 	public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_";
-
-	private static final Random r = new Random();
-
 	public static final String PLAN_ARGUMENTS_KEY = "python.plan.arguments";
 
 	private static final String FLINK_PYTHON_REL_LOCAL_PATH = File.separator + "resources" +
File.separator + "python";
@@ -80,10 +76,9 @@ public class PythonPlanBinder {
 	private final String pythonLibraryPath;
 
 	private final String tmpPlanFilesDir;
-	private String tmpDistributedDir;
+	private Path tmpDistributedDir;
 
 	private final SetCache sets = new SetCache();
-	public ExecutionEnvironment env;
 	private int currentEnvironmentID = 0;
 	private PythonPlanStreamer streamer;
 
@@ -108,9 +103,9 @@ public class PythonPlanBinder {
 		String configuredPlanTmpPath = globalConfig.getString(PythonOptions.PLAN_TMP_DIR);
 		tmpPlanFilesDir = configuredPlanTmpPath != null
 			? configuredPlanTmpPath
-			: System.getProperty("java.io.tmpdir") + File.separator + "flink_plan_" + r.nextInt();
+			: System.getProperty("java.io.tmpdir") + File.separator + "flink_plan_" + UUID.randomUUID();
 		
-		tmpDistributedDir = globalConfig.getString(PythonOptions.DC_TMP_DIR);
+		tmpDistributedDir = new Path(globalConfig.getString(PythonOptions.DC_TMP_DIR));
 		
 		String flinkRootDir = System.getenv("FLINK_ROOT_DIR");
 		pythonLibraryPath = flinkRootDir != null
@@ -131,121 +126,95 @@ public class PythonPlanBinder {
 	void runPlan(String[] args) throws Exception {
 		int split = 0;
 		for (int x = 0; x < args.length; x++) {
-			if (args[x].compareTo("-") == 0) {
+			if (args[x].equals("-")) {
 				split = x;
+				break;
 			}
 		}
 
 		try {
-			String tmpPath = tmpPlanFilesDir;
-			prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split == 0 ? args.length : split));
-			startPython(tmpPath, Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length));
+			String planFile = args[0];
+			String[] filesToCopy = Arrays.copyOfRange(args, 1, split == 0 ? args.length : split);
+			String[] planArgumentsArray = Arrays.copyOfRange(args, split == 0 ? args.length : split
+ 1, args.length);
+
+			StringBuilder planArgumentsBuilder = new StringBuilder();
+			for (String arg : planArgumentsArray) {
+				planArgumentsBuilder.append(" ").append(arg);
+			}
+			String planArguments = planArgumentsBuilder.toString();
+
+			operatorConfig.setString(PLAN_ARGUMENTS_KEY, planArguments);
+
+			// copy flink library, plan file and additional files to temporary location
+			Path tmpPlanFilesPath = new Path(tmpPlanFilesDir);
+			deleteIfExists(tmpPlanFilesPath);
+			FileCache.copy(new Path(pythonLibraryPath), tmpPlanFilesPath, false);
+			copyFile(new Path(planFile), tmpPlanFilesPath, FLINK_PYTHON_PLAN_NAME);
+			for (String file : filesToCopy) {
+				Path source = new Path(file);
+				copyFile(source, tmpPlanFilesPath, source.getName());
+			}
+
+			// start python process
+			streamer = new PythonPlanStreamer(operatorConfig);
+			streamer.open(tmpPlanFilesDir, planArguments);
 
 			// Python process should terminate itself when all jobs have been run
 			while (streamer.preparePlanMode()) {
-				receivePlan();
+				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+				receivePlan(env);
+
+				// upload files to remote FS and register on Distributed Cache
+				deleteIfExists(tmpDistributedDir);
+				FileCache.copy(tmpPlanFilesPath, tmpDistributedDir, true);
+				env.registerCachedFile(tmpDistributedDir.toUri().toString(), FLINK_PYTHON_DC_ID);
 
-				distributeFiles(tmpPath, env);
 				JobExecutionResult jer = env.execute();
-				sendResult(jer);
+				long runtime = jer.getNetRuntime();
+				streamer.sendRecord(runtime);
 
 				streamer.finishPlanMode();
+				sets.reset();
+			}
+		} finally {
+			try {
+				// clean up created files
+				FileSystem distributedFS = tmpDistributedDir.getFileSystem();
+				distributedFS.delete(tmpDistributedDir, true);
+
+				FileSystem local = FileSystem.getLocalFileSystem();
+				local.delete(new Path(tmpPlanFilesDir), true);
+			} catch (IOException ioe) {
+				LOG.error("PythonAPI file cleanup failed. {}", ioe.getMessage());
+			} finally {
+				if (streamer != null) {
+					streamer.close();
+				}
 			}
-
-			clearPath(tmpPath);
-			close();
-		} catch (Exception e) {
-			close();
-			throw e;
 		}
 	}
 
 	//=====Setup========================================================================================================
 
-	/**
-	 * Copies all files to a common directory {@link PythonOptions#PLAN_TMP_DIR}). This allows
us to distribute it as
-	 * one big package which resolves PYTHONPATH issues.
-	 *
-	 * @param filePaths
-	 * @throws IOException
-	 * @throws URISyntaxException
-	 */
-	private void prepareFiles(String tempFilePath, String... filePaths) throws IOException,
URISyntaxException {
-		//Flink python package
-		clearPath(tempFilePath);
-		FileCache.copy(new Path(pythonLibraryPath), new Path(tmpPlanFilesDir), false);
-
-		//plan file		
-		copyFile(filePaths[0], tempFilePath, FLINK_PYTHON_PLAN_NAME);
-
-		//additional files/folders
-		for (int x = 1; x < filePaths.length; x++) {
-			copyFile(filePaths[x], tempFilePath, null);
+	private static void deleteIfExists(Path path) throws IOException {
+		FileSystem fs = path.getFileSystem();
+		if (fs.exists(path)) {
+			fs.delete(path, true);
 		}
 	}
 
-	private static void clearPath(String path) throws IOException {
-		FileSystem fs = FileSystem.get(new Path(path).toUri());
-		if (fs.exists(new Path(path))) {
-			fs.delete(new Path(path), true);
-		}
-	}
-
-	private static void copyFile(String path, String target, String name) throws IOException,
URISyntaxException {
-		if (path.endsWith("/")) {
-			path = path.substring(0, path.length() - 1);
-		}
-		String identifier = name == null ? path.substring(path.lastIndexOf("/")) : name;
-		String tmpFilePath = target + "/" + identifier;
-		clearPath(tmpFilePath);
-		Path p = new Path(path);
-		FileCache.copy(p.makeQualified(FileSystem.get(p.toUri())), new Path(tmpFilePath), true);
-	}
-
-	private void distributeFiles(String tmpPath, ExecutionEnvironment env) throws IOException
{
-		clearPath(tmpDistributedDir);
-		FileCache.copy(new Path(tmpPath), new Path(tmpDistributedDir), true);
-		env.registerCachedFile(new Path(tmpDistributedDir).toUri().toString(), FLINK_PYTHON_DC_ID);
-	}
-
-	private void startPython(String tempPath, String[] args) throws IOException {
-		StringBuilder arguments = new StringBuilder();
-		for (String arg : args) {
-			arguments.append(" ").append(arg);
-		}
-
-		operatorConfig.setString(PLAN_ARGUMENTS_KEY, arguments.toString());
-
-		streamer = new PythonPlanStreamer(operatorConfig);
-		streamer.open(tempPath, arguments.toString());
-	}
-
-	private void sendResult(JobExecutionResult jer) throws IOException {
-		long runtime = jer.getNetRuntime();
-		streamer.sendRecord(runtime);
-	}
-
-	private void close() {
-		try { //prevent throwing exception so that previous exceptions aren't hidden.
-			FileSystem hdfs = new Path(tmpDistributedDir).getFileSystem();
-			hdfs.delete(new Path(tmpDistributedDir), true);
-
-			FileSystem local = FileSystem.getLocalFileSystem();
-			local.delete(new Path(tmpPlanFilesDir), true);
-			streamer.close();
-		} catch (NullPointerException ignored) {
-		} catch (IOException ioe) {
-			LOG.error("PythonAPI file cleanup failed. {}", ioe.getMessage());
-		}
+	private static void copyFile(Path source, Path targetDirectory, String name) throws IOException
{
+		Path targetFilePath = new Path(targetDirectory, name);
+		deleteIfExists(targetFilePath);
+		FileCache.copy(source, targetFilePath, true);
 	}
 
 	//====Plan==========================================================================================================
-	private void receivePlan() throws IOException {
-		env = ExecutionEnvironment.getExecutionEnvironment();
+	private void receivePlan(ExecutionEnvironment env) throws IOException {
 		//IDs used in HashMap of sets are only unique for each environment
-		sets.reset();
-		receiveParameters();
-		receiveOperations();
+		receiveParameters(env);
+		receiveOperations(env);
 	}
 
 	//====Environment===================================================================================================
@@ -260,7 +229,7 @@ public class PythonPlanBinder {
 		ID
 	}
 
-	private void receiveParameters() throws IOException {
+	private void receiveParameters(ExecutionEnvironment env) throws IOException {
 		for (int x = 0; x < 4; x++) {
 			Tuple value = (Tuple) streamer.getRecord(true);
 			switch (Parameters.valueOf(((String) value.getField(0)).toUpperCase())) {
@@ -271,7 +240,7 @@ public class PythonPlanBinder {
 				case MODE:
 					if (value.<Boolean>getField(1)) {
 						LOG.info("Local execution specified, using default for {}.", PythonOptions.DC_TMP_DIR);
-						tmpDistributedDir = PythonOptions.DC_TMP_DIR.defaultValue();
+						tmpDistributedDir = new Path(PythonOptions.DC_TMP_DIR.defaultValue());
 					}
 					break;
 				case RETRY:
@@ -301,23 +270,23 @@ public class PythonPlanBinder {
 		COGROUP, CROSS, CROSS_H, CROSS_T, FILTER, FLATMAP, GROUPREDUCE, JOIN, JOIN_H, JOIN_T, MAP,
REDUCE, MAPPARTITION
 	}
 
-	private void receiveOperations() throws IOException {
+	private void receiveOperations(ExecutionEnvironment env) throws IOException {
 		Integer operationCount = (Integer) streamer.getRecord(true);
 		for (int x = 0; x < operationCount; x++) {
 			PythonOperationInfo info = new PythonOperationInfo(streamer, currentEnvironmentID);
 			Operation op = Operation.valueOf(info.identifier.toUpperCase());
 			switch (op) {
 				case SOURCE_CSV:
-					createCsvSource(info);
+					createCsvSource(env, info);
 					break;
 				case SOURCE_TEXT:
-					createTextSource(info);
+					createTextSource(env, info);
 					break;
 				case SOURCE_VALUE:
-					createValueSource(info);
+					createValueSource(env, info);
 					break;
 				case SOURCE_SEQ:
-					createSequenceSource(info);
+					createSequenceSource(env, info);
 					break;
 				case SINK_CSV:
 					createCsvSink(info);
@@ -395,12 +364,8 @@ public class PythonPlanBinder {
 		}
 	}
 
-	private int getParallelism(PythonOperationInfo info) {
-		return info.parallelism == -1 ? env.getParallelism() : info.parallelism;
-	}
-
 	@SuppressWarnings("unchecked")
-	private <T extends Tuple> void createCsvSource(PythonOperationInfo info) {
+	private <T extends Tuple> void createCsvSource(ExecutionEnvironment env, PythonOperationInfo
info) {
 		if (!(info.types instanceof TupleTypeInfo)) {
 			throw new RuntimeException("The output type of a csv source has to be a tuple. The derived
type is " + info);
 		}
@@ -408,41 +373,41 @@ public class PythonPlanBinder {
 		String lineD = info.lineDelimiter;
 		String fieldD = info.fieldDelimiter;
 		TupleTypeInfo<T> types = (TupleTypeInfo<T>) info.types;
-		sets.add(info.setID, env.createInput(new TupleCsvInputFormat<>(path, lineD, fieldD,
types), types).setParallelism(getParallelism(info)).name("CsvSource")
-			.map(new SerializerMap<T>()).setParallelism(getParallelism(info)).name("CsvSourcePostStep"));
+		sets.add(info.setID, env.createInput(new TupleCsvInputFormat<>(path, lineD, fieldD,
types), types).setParallelism(info.parallelism).name("CsvSource")
+			.map(new SerializerMap<T>()).setParallelism(info.parallelism).name("CsvSourcePostStep"));
 	}
 
-	private void createTextSource(PythonOperationInfo info) {
-		sets.add(info.setID, env.readTextFile(info.path).setParallelism(getParallelism(info)).name("TextSource")
-			.map(new SerializerMap<String>()).setParallelism(getParallelism(info)).name("TextSourcePostStep"));
+	private void createTextSource(ExecutionEnvironment env, PythonOperationInfo info) {
+		sets.add(info.setID, env.readTextFile(info.path).setParallelism(info.parallelism).name("TextSource")
+			.map(new SerializerMap<String>()).setParallelism(info.parallelism).name("TextSourcePostStep"));
 	}
 
-	private void createValueSource(PythonOperationInfo info) {
-		sets.add(info.setID, env.fromElements(info.values).setParallelism(getParallelism(info)).name("ValueSource")
-			.map(new SerializerMap<>()).setParallelism(getParallelism(info)).name("ValueSourcePostStep"));
+	private void createValueSource(ExecutionEnvironment env, PythonOperationInfo info) {
+		sets.add(info.setID, env.fromElements(info.values).setParallelism(info.parallelism).name("ValueSource")
+			.map(new SerializerMap<>()).setParallelism(info.parallelism).name("ValueSourcePostStep"));
 	}
 
-	private void createSequenceSource(PythonOperationInfo info) {
-		sets.add(info.setID, env.generateSequence(info.frm, info.to).setParallelism(getParallelism(info)).name("SequenceSource")
-			.map(new SerializerMap<Long>()).setParallelism(getParallelism(info)).name("SequenceSourcePostStep"));
+	private void createSequenceSource(ExecutionEnvironment env, PythonOperationInfo info) {
+		sets.add(info.setID, env.generateSequence(info.frm, info.to).setParallelism(info.parallelism).name("SequenceSource")
+			.map(new SerializerMap<Long>()).setParallelism(info.parallelism).name("SequenceSourcePostStep"));
 	}
 
 	private void createCsvSink(PythonOperationInfo info) {
 		DataSet<byte[]> parent = sets.getDataSet(info.parentID);
-		parent.map(new StringTupleDeserializerMap()).setParallelism(getParallelism(info)).name("CsvSinkPreStep")
-				.writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).setParallelism(getParallelism(info)).name("CsvSink");
+		parent.map(new StringTupleDeserializerMap()).setParallelism(info.parallelism).name("CsvSinkPreStep")
+				.writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).setParallelism(info.parallelism).name("CsvSink");
 	}
 
 	private void createTextSink(PythonOperationInfo info) {
 		DataSet<byte[]> parent = sets.getDataSet(info.parentID);
-		parent.map(new StringDeserializerMap()).setParallelism(getParallelism(info))
-			.writeAsText(info.path, info.writeMode).setParallelism(getParallelism(info)).name("TextSink");
+		parent.map(new StringDeserializerMap()).setParallelism(info.parallelism)
+			.writeAsText(info.path, info.writeMode).setParallelism(info.parallelism).name("TextSink");
 	}
 
 	private void createPrintSink(PythonOperationInfo info) {
 		DataSet<byte[]> parent = sets.getDataSet(info.parentID);
-		parent.map(new StringDeserializerMap()).setParallelism(getParallelism(info)).name("PrintSinkPreStep")
-			.output(new PrintingOutputFormat<String>(info.toError)).setParallelism(getParallelism(info));
+		parent.map(new StringDeserializerMap()).setParallelism(info.parallelism).name("PrintSinkPreStep")
+			.output(new PrintingOutputFormat<String>(info.toError)).setParallelism(info.parallelism);
 	}
 
 	private void createBroadcastVariable(PythonOperationInfo info) {
@@ -466,8 +431,8 @@ public class PythonPlanBinder {
 	private <K extends Tuple> void createDistinctOperation(PythonOperationInfo info) {
 		DataSet<Tuple2<K, byte[]>> op = sets.getDataSet(info.parentID);
 		DataSet<byte[]> result = op
-			.distinct(info.keys).setParallelism(getParallelism(info)).name("Distinct")
-			.map(new KeyDiscarder<K>()).setParallelism(getParallelism(info)).name("DistinctPostStep");
+			.distinct(info.keys).setParallelism(info.parallelism).name("Distinct")
+			.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("DistinctPostStep");
 		sets.add(info.setID, result);
 	}
 
@@ -475,17 +440,17 @@ public class PythonPlanBinder {
 		if (sets.isDataSet(info.parentID)) {
 			DataSet<byte[]> op = sets.getDataSet(info.parentID);
 			sets.add(info.setID, op
-				.first(info.count).setParallelism(getParallelism(info)).name("First"));
+				.first(info.count).setParallelism(info.parallelism).name("First"));
 		} else if (sets.isUnsortedGrouping(info.parentID)) {
 			UnsortedGrouping<Tuple2<K, byte[]>> op = sets.getUnsortedGrouping(info.parentID);
 			sets.add(info.setID, op
-				.first(info.count).setParallelism(getParallelism(info)).name("First")
-				.map(new KeyDiscarder<K>()).setParallelism(getParallelism(info)).name("FirstPostStep"));
+				.first(info.count).setParallelism(info.parallelism).name("First")
+				.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
 		} else if (sets.isSortedGrouping(info.parentID)) {
 			SortedGrouping<Tuple2<K, byte[]>> op = sets.getSortedGrouping(info.parentID);
 			sets.add(info.setID, op
-				.first(info.count).setParallelism(getParallelism(info)).name("First")
-				.map(new KeyDiscarder<K>()).setParallelism(getParallelism(info)).name("FirstPostStep"));
+				.first(info.count).setParallelism(info.parallelism).name("First")
+				.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
 		}
 	}
 
@@ -497,14 +462,14 @@ public class PythonPlanBinder {
 	private <K extends Tuple> void createHashPartitionOperation(PythonOperationInfo info)
{
 		DataSet<Tuple2<K, byte[]>> op1 = sets.getDataSet(info.parentID);
 		DataSet<byte[]> result = op1
-			.partitionByHash(info.keys).setParallelism(getParallelism(info))
-			.map(new KeyDiscarder<K>()).setParallelism(getParallelism(info)).name("HashPartitionPostStep");
+			.partitionByHash(info.keys).setParallelism(info.parallelism)
+			.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("HashPartitionPostStep");
 		sets.add(info.setID, result);
 	}
 
 	private void createRebalanceOperation(PythonOperationInfo info) {
 		DataSet<?> op = sets.getDataSet(info.parentID);
-		sets.add(info.setID, op.rebalance().setParallelism(getParallelism(info)).name("Rebalance"));
+		sets.add(info.setID, op.rebalance().setParallelism(info.parallelism).name("Rebalance"));
 	}
 
 	private void createSortOperation(PythonOperationInfo info) {
@@ -520,7 +485,7 @@ public class PythonPlanBinder {
 	private <IN> void createUnionOperation(PythonOperationInfo info) {
 		DataSet<IN> op1 = sets.getDataSet(info.parentID);
 		DataSet<IN> op2 = sets.getDataSet(info.otherID);
-		sets.add(info.setID, op1.union(op2).setParallelism(getParallelism(info)).name("Union"));
+		sets.add(info.setID, op1.union(op2).setParallelism(info.parallelism).name("Union"));
 	}
 
 	private <IN1, IN2, OUT> void createCoGroupOperation(PythonOperationInfo info, TypeInformation<OUT>
type) {
@@ -529,7 +494,7 @@ public class PythonPlanBinder {
 		Keys.ExpressionKeys<IN1> key1 = new Keys.ExpressionKeys<>(info.keys1, op1.getType());
 		Keys.ExpressionKeys<IN2> key2 = new Keys.ExpressionKeys<>(info.keys2, op2.getType());
 		PythonCoGroup<IN1, IN2, OUT> pcg = new PythonCoGroup<>(operatorConfig, info.envID,
info.setID, type);
-		sets.add(info.setID, new CoGroupRawOperator<>(op1, op2, key1, key2, pcg, type, info.name).setParallelism(getParallelism(info)));
+		sets.add(info.setID, new CoGroupRawOperator<>(op1, op2, key1, key2, pcg, type, info.name).setParallelism(info.parallelism));
 	}
 
 	private <IN1, IN2, OUT> void createCrossOperation(DatasizeHint mode, PythonOperationInfo
info, TypeInformation<OUT> type) {
@@ -551,11 +516,11 @@ public class PythonPlanBinder {
 				throw new IllegalArgumentException("Invalid Cross mode specified: " + mode);
 		}
 
-		defaultResult.setParallelism(getParallelism(info));
+		defaultResult.setParallelism(info.parallelism);
 		if (info.usesUDF) {
 			sets.add(info.setID, defaultResult
 				.mapPartition(new PythonMapPartition<Tuple2<IN1, IN2>, OUT>(operatorConfig,
info.envID, info.setID, type))
-				.setParallelism(getParallelism(info)).name(info.name));
+				.setParallelism(info.parallelism).name(info.name));
 		} else {
 			sets.add(info.setID, defaultResult.name("DefaultCross"));
 		}
@@ -565,14 +530,14 @@ public class PythonPlanBinder {
 		DataSet<IN> op1 = sets.getDataSet(info.parentID);
 		sets.add(info.setID, op1
 			.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID,
type))
-			.setParallelism(getParallelism(info)).name(info.name));
+			.setParallelism(info.parallelism).name(info.name));
 	}
 
 	private <IN, OUT> void createFlatMapOperation(PythonOperationInfo info, TypeInformation<OUT>
type) {
 		DataSet<IN> op1 = sets.getDataSet(info.parentID);
 		sets.add(info.setID, op1
 			.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID,
type))
-			.setParallelism(getParallelism(info)).name(info.name));
+			.setParallelism(info.parallelism).name(info.name));
 	}
 
 	private void createGroupReduceOperation(PythonOperationInfo info) {
@@ -587,23 +552,23 @@ public class PythonPlanBinder {
 
 	private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(DataSet<IN> op1,
PythonOperationInfo info, TypeInformation<OUT> type) {
 		return op1
-			.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(getParallelism(info))
+			.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(info.parallelism)
 			.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID,
type))
-			.setParallelism(getParallelism(info)).name(info.name);
+			.setParallelism(info.parallelism).name(info.name);
 	}
 
 	private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(UnsortedGrouping<IN>
op1, PythonOperationInfo info, TypeInformation<OUT> type) {
 		return op1
-			.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep")
+			.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonGroupReducePreStep")
 			.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID,
type))
-			.setParallelism(getParallelism(info)).name(info.name);
+			.setParallelism(info.parallelism).name(info.name);
 	}
 
 	private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(SortedGrouping<IN>
op1, PythonOperationInfo info, TypeInformation<OUT> type) {
 		return op1
-			.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep")
+			.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonGroupReducePreStep")
 			.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID,
type))
-			.setParallelism(getParallelism(info)).name(info.name);
+			.setParallelism(info.parallelism).name(info.name);
 	}
 
 	private <IN1, IN2, OUT> void createJoinOperation(DatasizeHint mode, PythonOperationInfo
info, TypeInformation<OUT> type) {
@@ -611,11 +576,11 @@ public class PythonPlanBinder {
 		DataSet<IN2> op2 = sets.getDataSet(info.otherID);
 
 		if (info.usesUDF) {
-			sets.add(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, getParallelism(info))
+			sets.add(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, info.parallelism)
 				.mapPartition(new PythonMapPartition<Tuple2<byte[], byte[]>, OUT>(operatorConfig,
info.envID, info.setID, type))
-				.setParallelism(getParallelism(info)).name(info.name));
+				.setParallelism(info.parallelism).name(info.name));
 		} else {
-			sets.add(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, getParallelism(info)));
+			sets.add(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, info.parallelism));
 		}
 	}
 
@@ -642,14 +607,14 @@ public class PythonPlanBinder {
 		DataSet<IN> op1 = sets.getDataSet(info.parentID);
 		sets.add(info.setID, op1
 			.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID,
type))
-			.setParallelism(getParallelism(info)).name(info.name));
+			.setParallelism(info.parallelism).name(info.name));
 	}
 
 	private <IN, OUT> void createMapPartitionOperation(PythonOperationInfo info, TypeInformation<OUT>
type) {
 		DataSet<IN> op1 = sets.getDataSet(info.parentID);
 		sets.add(info.setID, op1
 			.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID,
type))
-			.setParallelism(getParallelism(info)).name(info.name));
+			.setParallelism(info.parallelism).name(info.name));
 	}
 
 	private void createReduceOperation(PythonOperationInfo info) {
@@ -664,15 +629,15 @@ public class PythonPlanBinder {
 
 	private <IN, OUT> DataSet<OUT> applyReduceOperation(DataSet<IN> op1, PythonOperationInfo
info, TypeInformation<OUT> type) {
 		return op1
-			.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep")
+			.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonReducePreStep")
 			.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID,
type))
-			.setParallelism(getParallelism(info)).name(info.name);
+			.setParallelism(info.parallelism).name(info.name);
 	}
 
 	private <IN, OUT> DataSet<OUT> applyReduceOperation(UnsortedGrouping<IN>
op1, PythonOperationInfo info, TypeInformation<OUT> type) {
 		return op1
-			.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep")
+			.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonReducePreStep")
 			.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID,
type))
-			.setParallelism(getParallelism(info)).name(info.name);
+			.setParallelism(info.parallelism).name(info.name);
 	}
 }


Mime
View raw message