flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject flink git commit: [FLINK-3275] [py] Support for DataSet.setParallelism()
Date Thu, 28 Jan 2016 10:51:14 GMT
Repository: flink
Updated Branches:
  refs/heads/master 440137cc3 -> 40422d505


[FLINK-3275] [py] Support for DataSet.setParallelism()

-parallelism is stored Value object within the OperationInfo, so it can be passed as a reference
to multiple operations (in cases where a set is internally executed as multiple operations)
-setParallelism is called for every DataSet with either a user-set value or env.getParallelism
-added a DataSink set, providing access to name() and setParallelism() for sinks


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40422d50
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40422d50
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40422d50

Branch: refs/heads/master
Commit: 40422d5057e5c1d7b75aec48bacbd7518cd7c9e1
Parents: 440137c
Author: zentol <s.motsu@web.de>
Authored: Thu Jan 28 10:00:25 2016 +0100
Committer: zentol <s.motsu@web.de>
Committed: Thu Jan 28 11:50:51 2016 +0100

----------------------------------------------------------------------
 .../flink/python/api/PythonOperationInfo.java   |   2 +
 .../flink/python/api/PythonPlanBinder.java      | 117 ++++++++++---------
 .../flink/python/api/flink/plan/DataSet.py      |  40 ++++++-
 .../flink/python/api/flink/plan/Environment.py  |   1 +
 .../python/api/flink/plan/OperationInfo.py      |   6 +
 5 files changed, 109 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40422d50/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
index 1e3005d..7f7a993 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
@@ -45,6 +45,7 @@ public class PythonOperationInfo {
 	public boolean toError;
 	public String name;
 	public boolean usesUDF;
+	public int parallelism;
 
 	public PythonOperationInfo(PythonPlanStreamer streamer) throws IOException {
 		identifier = (String) streamer.getRecord();
@@ -90,6 +91,7 @@ public class PythonOperationInfo {
 		for (int x = 0; x < valueCount; x++) {
 			values[x] = streamer.getRecord();
 		}
+		parallelism = (Integer) streamer.getRecord(true);
 
 		/*
 		aggregates = new AggregationEntry[count];

http://git-wip-us.apache.org/repos/asf/flink/blob/40422d50/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 3877ef1..1534ebf 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -388,42 +388,53 @@ public class PythonPlanBinder {
 		}
 	}
 
+	private int getParallelism(PythonOperationInfo info) {
+		return info.parallelism == -1 ? env.getParallelism() : info.parallelism;
+	}
+
 	private void createCsvSource(PythonOperationInfo info) throws IOException {
 		if (!(info.types instanceof TupleTypeInfo)) {
 			throw new RuntimeException("The output type of a csv source has to be a tuple. The derived
type is " + info);
 		}
-
-		sets.put(info.setID, env.createInput(new TupleCsvInputFormat(new Path(info.path),
-				info.lineDelimiter, info.fieldDelimiter, (TupleTypeInfo) info.types), info.types)
-				.name("CsvSource").map(new SerializerMap()).name("CsvSourcePostStep"));
+		Path path = new Path(info.path);
+		String lineD = info.lineDelimiter;
+		String fieldD = info.fieldDelimiter;
+		TupleTypeInfo<?> types = (TupleTypeInfo) info.types;
+		sets.put(info.setID, env.createInput(new TupleCsvInputFormat(path, lineD, fieldD, types),
info.types).setParallelism(getParallelism(info)).name("CsvSource")
+				.map(new SerializerMap()).setParallelism(getParallelism(info)).name("CsvSourcePostStep"));
 	}
 
 	private void createTextSource(PythonOperationInfo info) throws IOException {
-		sets.put(info.setID, env.readTextFile(info.path).name("TextSource").map(new SerializerMap()).name("TextSourcePostStep"));
+		sets.put(info.setID, env.readTextFile(info.path).setParallelism(getParallelism(info)).name("TextSource")
+				.map(new SerializerMap()).setParallelism(getParallelism(info)).name("TextSourcePostStep"));
 	}
 
 	private void createValueSource(PythonOperationInfo info) throws IOException {
-		sets.put(info.setID, env.fromElements(info.values).name("ValueSource").map(new SerializerMap()).name("ValueSourcePostStep"));
+		sets.put(info.setID, env.fromElements(info.values).setParallelism(getParallelism(info)).name("ValueSource")
+				.map(new SerializerMap()).setParallelism(getParallelism(info)).name("ValueSourcePostStep"));
 	}
 
 	private void createSequenceSource(PythonOperationInfo info) throws IOException {
-		sets.put(info.setID, env.generateSequence(info.from, info.to).name("SequenceSource").map(new
SerializerMap()).name("SequenceSourcePostStep"));
+		sets.put(info.setID, env.generateSequence(info.from, info.to).setParallelism(getParallelism(info)).name("SequenceSource")
+				.map(new SerializerMap()).setParallelism(getParallelism(info)).name("SequenceSourcePostStep"));
 	}
 
 	private void createCsvSink(PythonOperationInfo info) throws IOException {
 		DataSet parent = (DataSet) sets.get(info.parentID);
-		parent.map(new StringTupleDeserializerMap()).name("CsvSinkPreStep")
-				.writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).name("CsvSink");
+		parent.map(new StringTupleDeserializerMap()).setParallelism(getParallelism(info)).name("CsvSinkPreStep")
+				.writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).setParallelism(getParallelism(info)).name("CsvSink");
 	}
 
 	private void createTextSink(PythonOperationInfo info) throws IOException {
 		DataSet parent = (DataSet) sets.get(info.parentID);
-		parent.map(new StringDeserializerMap()).writeAsText(info.path, info.writeMode).name("TextSink");
+		parent.map(new StringDeserializerMap()).setParallelism(getParallelism(info))
+			.writeAsText(info.path, info.writeMode).setParallelism(getParallelism(info)).name("TextSink");
 	}
 
 	private void createPrintSink(PythonOperationInfo info) throws IOException {
 		DataSet parent = (DataSet) sets.get(info.parentID);
-		parent.map(new StringDeserializerMap()).name("PrintSinkPreStep").output(new PrintingOutputFormat(info.toError));
+		parent.map(new StringDeserializerMap()).setParallelism(getParallelism(info)).name("PrintSinkPreStep")
+			.output(new PrintingOutputFormat(info.toError)).setParallelism(getParallelism(info));
 	}
 
 	private void createBroadcastVariable(PythonOperationInfo info) throws IOException {
@@ -452,17 +463,18 @@ public class PythonPlanBinder {
 			ao = ao.and(info.aggregates[x].agg, info.aggregates[x].field);
 		}
 
-		sets.put(info.setID, ao.name("Aggregation"));
+		sets.put(info.setID, ao.setParallelism(getParallelism(info)).name("Aggregation"));
 	}
 
 	private void createDistinctOperation(PythonOperationInfo info) throws IOException {
 		DataSet op = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, op.distinct(info.keys).name("Distinct").map(new KeyDiscarder()).name("DistinctPostStep"));
+		sets.put(info.setID, op.distinct(info.keys).setParallelism(getParallelism(info)).name("Distinct")
+				.map(new KeyDiscarder()).setParallelism(getParallelism(info)).name("DistinctPostStep"));
 	}
 
 	private void createFirstOperation(PythonOperationInfo info) throws IOException {
 		DataSet op = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, op.first(info.count).name("First"));
+		sets.put(info.setID, op.first(info.count).setParallelism(getParallelism(info)).name("First"));
 	}
 
 	private void createGroupOperation(PythonOperationInfo info) throws IOException {
@@ -472,12 +484,13 @@ public class PythonPlanBinder {
 
 	private void createHashPartitionOperation(PythonOperationInfo info) throws IOException {
 		DataSet op1 = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, op1.partitionByHash(info.keys).map(new KeyDiscarder()).name("HashPartitionPostStep"));
+		sets.put(info.setID, op1.partitionByHash(info.keys).setParallelism(getParallelism(info))
+				.map(new KeyDiscarder()).setParallelism(getParallelism(info)).name("HashPartitionPostStep"));
 	}
 
 	private void createRebalanceOperation(PythonOperationInfo info) throws IOException {
 		DataSet op = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, op.rebalance().name("Rebalance"));
+		sets.put(info.setID, op.rebalance().setParallelism(getParallelism(info)).name("Rebalance"));
 	}
 
 	private void createSortOperation(PythonOperationInfo info) throws IOException {
@@ -494,19 +507,16 @@ public class PythonPlanBinder {
 	private void createUnionOperation(PythonOperationInfo info) throws IOException {
 		DataSet op1 = (DataSet) sets.get(info.parentID);
 		DataSet op2 = (DataSet) sets.get(info.otherID);
-		sets.put(info.setID, op1.union(op2).name("Union"));
+		sets.put(info.setID, op1.union(op2).setParallelism(getParallelism(info)).name("Union"));
 	}
 
 	private void createCoGroupOperation(PythonOperationInfo info) {
 		DataSet op1 = (DataSet) sets.get(info.parentID);
 		DataSet op2 = (DataSet) sets.get(info.otherID);
-		sets.put(info.setID, new CoGroupRawOperator(
-				op1,
-				op2,
-				new Keys.ExpressionKeys(info.keys1, op1.getType()),
-				new Keys.ExpressionKeys(info.keys2, op2.getType()),
-				new PythonCoGroup(info.setID, info.types),
-				info.types, info.name));
+		Keys.ExpressionKeys<?> key1 = new Keys.ExpressionKeys(info.keys1, op1.getType());
+		Keys.ExpressionKeys<?> key2 = new Keys.ExpressionKeys(info.keys2, op2.getType());
+		PythonCoGroup pcg = new PythonCoGroup(info.setID, info.types);
+		sets.put(info.setID, new CoGroupRawOperator(op1, op2, key1, key2, pcg, info.types, info.name).setParallelism(getParallelism(info)));
 	}
 
 	private void createCrossOperation(DatasizeHint mode, PythonOperationInfo info) {
@@ -527,8 +537,10 @@ public class PythonPlanBinder {
 			default:
 				throw new IllegalArgumentException("Invalid Cross mode specified: " + mode);
 		}
+
+		defaultResult.setParallelism(getParallelism(info));
 		if (info.usesUDF) {
-			sets.put(info.setID, defaultResult.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name));
+			sets.put(info.setID, defaultResult.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name));
 		} else {
 			sets.put(info.setID, defaultResult.name("DefaultCross"));
 		}
@@ -536,12 +548,12 @@ public class PythonPlanBinder {
 
 	private void createFilterOperation(PythonOperationInfo info) {
 		DataSet op1 = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name));
+		sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name));
 	}
 
 	private void createFlatMapOperation(PythonOperationInfo info) {
 		DataSet op1 = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name));
+		sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name));
 	}
 
 	private void createGroupReduceOperation(PythonOperationInfo info) {
@@ -560,24 +572,18 @@ public class PythonPlanBinder {
 	}
 
 	private DataSet applyGroupReduceOperation(DataSet op1, PythonOperationInfo info) {
-		return op1.reduceGroup(new IdentityGroupReduce())
-				.setCombinable(false).name("PythonGroupReducePreStep")
-				.mapPartition(new PythonMapPartition(info.setID, info.types))
-				.name(info.name);
+		return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(getParallelism(info))
+				.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name);
 	}
 
 	private DataSet applyGroupReduceOperation(UnsortedGrouping op1, PythonOperationInfo info)
{
-		return op1.reduceGroup(new IdentityGroupReduce())
-				.setCombinable(false).name("PythonGroupReducePreStep")
-				.mapPartition(new PythonMapPartition(info.setID, info.types))
-				.name(info.name);
+		return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep")
+				.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name);
 	}
 
 	private DataSet applyGroupReduceOperation(SortedGrouping op1, PythonOperationInfo info)
{
-		return op1.reduceGroup(new IdentityGroupReduce())
-				.setCombinable(false).name("PythonGroupReducePreStep")
-				.mapPartition(new PythonMapPartition(info.setID, info.types))
-				.name(info.name);
+		return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep")
+				.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name);
 	}
 
 	private void createJoinOperation(DatasizeHint mode, PythonOperationInfo info) {
@@ -585,21 +591,24 @@ public class PythonPlanBinder {
 		DataSet op2 = (DataSet) sets.get(info.otherID);
 
 		if (info.usesUDF) {
-			sets.put(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode)
-					.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name));
+			sets.put(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, getParallelism(info))
+					.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name));
 		} else {
-			sets.put(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode));
+			sets.put(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, getParallelism(info)));
 		}
 	}
 
-	private DataSet createDefaultJoin(DataSet op1, DataSet op2, String[] firstKeys, String[]
secondKeys, DatasizeHint mode) {
+	private DataSet createDefaultJoin(DataSet op1, DataSet op2, String[] firstKeys, String[]
secondKeys, DatasizeHint mode, int parallelism) {
 		switch (mode) {
 			case NONE:
-				return op1.join(op2).where(firstKeys).equalTo(secondKeys).map(new NestedKeyDiscarder()).name("DefaultJoinPostStep");
+				return op1.join(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)
+					.map(new NestedKeyDiscarder()).setParallelism(parallelism).name("DefaultJoinPostStep");
 			case HUGE:
-				return op1.joinWithHuge(op2).where(firstKeys).equalTo(secondKeys).map(new NestedKeyDiscarder()).name("DefaultJoinPostStep");
+				return op1.joinWithHuge(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)
+					.map(new NestedKeyDiscarder()).setParallelism(parallelism).name("DefaultJoinPostStep");
 			case TINY:
-				return op1.joinWithTiny(op2).where(firstKeys).equalTo(secondKeys).map(new NestedKeyDiscarder()).name("DefaultJoinPostStep");
+				return op1.joinWithTiny(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)
+					.map(new NestedKeyDiscarder()).setParallelism(parallelism).name("DefaultJoinPostStep");
 			default:
 				throw new IllegalArgumentException("Invalid join mode specified.");
 		}
@@ -607,12 +616,12 @@ public class PythonPlanBinder {
 
 	private void createMapOperation(PythonOperationInfo info) {
 		DataSet op1 = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name));
+		sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name));
 	}
 
 	private void createMapPartitionOperation(PythonOperationInfo info) {
 		DataSet op1 = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name));
+		sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name));
 	}
 
 	private void createReduceOperation(PythonOperationInfo info) {
@@ -627,16 +636,12 @@ public class PythonPlanBinder {
 	}
 
 	private DataSet applyReduceOperation(DataSet op1, PythonOperationInfo info) {
-		return op1.reduceGroup(new IdentityGroupReduce())
-				.setCombinable(false).name("PythonReducePreStep")
-				.mapPartition(new PythonMapPartition(info.setID, info.types))
-				.name(info.name);
+		return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep")
+				.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name);
 	}
 
 	private DataSet applyReduceOperation(UnsortedGrouping op1, PythonOperationInfo info) {
-		return op1.reduceGroup(new IdentityGroupReduce())
-				.setCombinable(false).name("PythonReducePreStep")
-				.mapPartition(new PythonMapPartition(info.setID, info.types))
-				.name(info.name);
+		return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep")
+				.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/40422d50/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
index 5bb34e5..e024a38 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
@@ -55,6 +55,22 @@ class CsvStringify(MapFunction):
             return str(value)
 
 
+class DataSink(object):
+    def __init__(self, env, info):
+        self._env = env
+        self._info = info
+        info.id = env._counter
+        env._counter += 1
+
+    def name(self, name):
+        self._info.name = name
+        return self
+
+    def set_parallelism(self, parallelism):
+        self._info.parallelism.value = parallelism
+        return self
+
+
 class DataSet(object):
     def __init__(self, env, info):
         self._env = env
@@ -66,15 +82,18 @@ class DataSet(object):
         """
         Writes a DataSet to the standard output stream (stdout).
         """
-        self.map(Stringify())._output(to_error)
+        return self.map(Stringify())._output(to_error)
 
     def _output(self, to_error):
         child = OperationInfo()
+        child_set = DataSink(self._env, child)
         child.identifier = _Identifier.SINK_PRINT
         child.parent = self._info
         child.to_err = to_error
+        self._info.parallelism = child.parallelism
         self._info.sinks.append(child)
         self._env._sinks.append(child)
+        return child_set
 
     def write_text(self, path, write_mode=WriteMode.NO_OVERWRITE):
         """
@@ -87,12 +106,15 @@ class DataSet(object):
 
     def _write_text(self, path, write_mode):
         child = OperationInfo()
+        child_set = DataSink(self._env, child)
         child.identifier = _Identifier.SINK_TEXT
         child.parent = self._info
         child.path = path
         child.write_mode = write_mode
+        self._info.parallelism = child.parallelism
         self._info.sinks.append(child)
         self._env._sinks.append(child)
+        return child_set
 
     def write_csv(self, path, line_delimiter="\n", field_delimiter=',', write_mode=WriteMode.NO_OVERWRITE):
         """
@@ -106,14 +128,17 @@ class DataSet(object):
 
     def _write_csv(self, path, line_delimiter, field_delimiter, write_mode):
         child = OperationInfo()
+        child_set = DataSink(self._env, child)
         child.identifier = _Identifier.SINK_CSV
         child.path = path
         child.parent = self._info
         child.delimiter_field = field_delimiter
         child.delimiter_line = line_delimiter
         child.write_mode = write_mode
+        self._info.parallelism = child.parallelism
         self._info.sinks.append(child)
         self._env._sinks.append(child)
+        return child_set
 
     def reduce_group(self, operator, combinable=False):
         """
@@ -303,6 +328,7 @@ class DataSet(object):
         child.identifier = _Identifier.DISTINCT
         child.parent = self._info
         child.keys = fields
+        self._info.parallelism = child.parallelism
         self._info.children.append(child)
         self._env._sets.append(child)
         return child_set
@@ -498,6 +524,7 @@ class DataSet(object):
         child.identifier = _Identifier.PARTITION_HASH
         child.parent = self._info
         child.keys = fields
+        self._info.parallelism = child.parallelism
         self._info.children.append(child)
         self._env._sets.append(child)
         return child_set
@@ -541,6 +568,10 @@ class DataSet(object):
         self._info.name = name
         return self
 
+    def set_parallelism(self, parallelism):
+        self._info.parallelism.value = parallelism
+        return self
+
 
 class OperatorSet(DataSet):
     def __init__(self, env, info):
@@ -611,6 +642,7 @@ class Grouping(object):
         child.types = _createArrayTypeInfo()
         child.name = "PythonGroupReduce"
         child.key1 = self._child_chain[0].keys
+        self._info.parallelism = child.parallelism
         self._info.children.append(child)
         self._env._sets.append(child)
 
@@ -666,6 +698,7 @@ class UnsortedGrouping(Grouping):
         child.name = "PythonReduce"
         child.types = _createArrayTypeInfo()
         child.key1 = self._child_chain[0].keys
+        self._info.parallelism = child.parallelism
         self._info.children.append(child)
         self._env._sets.append(child)
 
@@ -830,6 +863,8 @@ class CoGroupOperatorUsing(object):
         self._info.key2 = tuple([x for x in range(len(self._info.key2))])
         operator._keys1 = self._info.key1
         operator._keys2 = self._info.key2
+        self._info.parent.parallelism = self._info.parallelism
+        self._info.other.parallelism = self._info.parallelism
         self._info.operator = operator
         self._info.types = _createArrayTypeInfo()
         self._info.name = "PythonCoGroup"
@@ -864,6 +899,7 @@ class JoinOperatorWhere(object):
         new_parent_set = self._info.parent_set.map(lambda x: (f(x), x))
         new_parent_set._info.types = _createKeyValueTypeInfo(len(fields))
         self._info.parent = new_parent_set._info
+        self._info.parent.parallelism = self._info.parallelism
         self._info.parent.children.append(self._info)
         self._info.key1 = tuple([x for x in range(len(fields))])
         return JoinOperatorTo(self._env, self._info)
@@ -895,6 +931,7 @@ class JoinOperatorTo(object):
         new_other_set = self._info.other_set.map(lambda x: (f(x), x))
         new_other_set._info.types = _createKeyValueTypeInfo(len(fields))
         self._info.other = new_other_set._info
+        self._info.other.parallelism = self._info.parallelism
         self._info.other.children.append(self._info)
         self._info.key2 = tuple([x for x in range(len(fields))])
         self._env._sets.append(self._info)
@@ -977,6 +1014,7 @@ class Projectable:
         child.parent = info
         child.types = _createArrayTypeInfo()
         child.name = "Projector"
+        child.parallelism = info.parallelism
         info.children.append(child)
         env._sets.append(child)
         return child_set

http://git-wip-us.apache.org/repos/asf/flink/blob/40422d50/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index b410ead..a9f7f14 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -281,6 +281,7 @@ class Environment(object):
         collect(len(set.values))
         for value in set.values:
             collect(value)
+        collect(set.parallelism.value)
 
     def _receive_result(self):
         jer = JobExecutionResult()

http://git-wip-us.apache.org/repos/asf/flink/blob/40422d50/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
index bd7d2b5..5d83e33 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
@@ -18,6 +18,11 @@
 from flink.plan.Constants import WriteMode
 
 
+class Value():
+    def __init__(self, value):
+        self.value = value
+
+
 class OperationInfo():
     def __init__(self, info=None):
         if info is None:
@@ -42,6 +47,7 @@ class OperationInfo():
             self.projections = []
             self.id = -1
             self.to_err = False
+            self.parallelism = Value(-1)
             #internally used
             self.parent_set = None
             self.other_set = None


Mime
View raw message