flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject flink git commit: [FLINK-3290] [py] Generalize OperationInfo transfer
Date Wed, 27 Jan 2016 14:07:26 GMT
Repository: flink
Updated Branches:
  refs/heads/master f681d9b83 -> 499b60fed


[FLINK-3290] [py] Generalize OperationInfo transfer

-identifier saved in java OpInfo
-changed default values to prevent null exceptions
-all operations use the same routine to transfer parameters
-PyPlRcv can handle Tuple0
-labeled py OpInfo fields as transferred/internal
-fixed broadcast OpInfo not having correct identifier
-removed unused projection code


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/499b60fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/499b60fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/499b60fe

Branch: refs/heads/master
Commit: 499b60fedd6db1cd0c1a4e1cc8c59a94b89c5c84
Parents: f681d9b
Author: zentol <s.motsu@web.de>
Authored: Wed Jan 27 14:52:27 2016 +0100
Committer: zentol <s.motsu@web.de>
Committed: Wed Jan 27 14:55:34 2016 +0100

----------------------------------------------------------------------
 .../flink/python/api/PythonOperationInfo.java   | 213 +++++--------------
 .../flink/python/api/PythonPlanBinder.java      | 207 ++++++++----------
 .../api/streaming/plan/PythonPlanReceiver.java  |   2 +-
 .../api/flink/functions/GroupReduceFunction.py  |   4 +-
 .../api/flink/functions/ReduceFunction.py       |   4 +-
 .../flink/python/api/flink/plan/DataSet.py      |   1 +
 .../flink/python/api/flink/plan/Environment.py  | 141 +++---------
 .../python/api/flink/plan/OperationInfo.py      |  30 +--
 8 files changed, 196 insertions(+), 406 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
index 30a7133..1e3005d 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
@@ -20,10 +20,10 @@ import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.tuple.Tuple;
 import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.python.api.PythonPlanBinder.Operation;
 import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
 
 public class PythonOperationInfo {
+	public String identifier;
 	public int parentID; //DataSet that an operation is applied on
 	public int otherID; //secondary DataSet
 	public int setID; //ID for new DataSet
@@ -35,7 +35,6 @@ public class PythonOperationInfo {
 	public Object[] values;
 	public int count;
 	public String field;
-	public int[] fields;
 	public Order order;
 	public String path;
 	public String fieldDelimiter;
@@ -47,154 +46,59 @@ public class PythonOperationInfo {
 	public String name;
 	public boolean usesUDF;
 
-	public PythonOperationInfo(PythonPlanStreamer streamer, Operation identifier) throws IOException
{
-		Object tmpType;
-		switch (identifier) {
-			case SOURCE_CSV:
-				setID = (Integer) streamer.getRecord(true);
-				path = (String) streamer.getRecord();
-				fieldDelimiter = (String) streamer.getRecord();
-				lineDelimiter = (String) streamer.getRecord();
-				tmpType = (Tuple) streamer.getRecord();
-				types = tmpType == null ? null : getForObject(tmpType);
-				return;
-			case SOURCE_TEXT:
-				setID = (Integer) streamer.getRecord(true);
-				path = (String) streamer.getRecord();
-				return;
-			case SOURCE_VALUE:
-				setID = (Integer) streamer.getRecord(true);
-				int valueCount = (Integer) streamer.getRecord(true);
-				values = new Object[valueCount];
-				for (int x = 0; x < valueCount; x++) {
-					values[x] = streamer.getRecord();
-				}
-				return;
-			case SOURCE_SEQ:
-				setID = (Integer) streamer.getRecord(true);
-				from = (Long) streamer.getRecord();
-				to = (Long) streamer.getRecord();
-				return;
-			case SINK_CSV:
-				parentID = (Integer) streamer.getRecord(true);
-				path = (String) streamer.getRecord();
-				fieldDelimiter = (String) streamer.getRecord();
-				lineDelimiter = (String) streamer.getRecord();
-				writeMode = ((Integer) streamer.getRecord(true)) == 1
-						? WriteMode.OVERWRITE
-						: WriteMode.NO_OVERWRITE;
-				return;
-			case SINK_TEXT:
-				parentID = (Integer) streamer.getRecord(true);
-				path = (String) streamer.getRecord();
-				writeMode = ((Integer) streamer.getRecord(true)) == 1
-						? WriteMode.OVERWRITE
-						: WriteMode.NO_OVERWRITE;
-				return;
-			case SINK_PRINT:
-				parentID = (Integer) streamer.getRecord(true);
-				toError = (Boolean) streamer.getRecord();
-				return;
-			case BROADCAST:
-				parentID = (Integer) streamer.getRecord(true);
-				otherID = (Integer) streamer.getRecord(true);
-				name = (String) streamer.getRecord();
-				return;
-		}
-		setID = (Integer) streamer.getRecord(true);
+	public PythonOperationInfo(PythonPlanStreamer streamer) throws IOException {
+		identifier = (String) streamer.getRecord();
 		parentID = (Integer) streamer.getRecord(true);
-		switch (identifier) {
-			case AGGREGATE:
-				count = (Integer) streamer.getRecord(true);
-				aggregates = new AggregationEntry[count];
-				for (int x = 0; x < count; x++) {
-					int encodedAgg = (Integer) streamer.getRecord(true);
-					int field = (Integer) streamer.getRecord(true);
-					aggregates[x] = new AggregationEntry(encodedAgg, field);
-				}
-				return;
-			case FIRST:
-				count = (Integer) streamer.getRecord(true);
-				return;
-			case DISTINCT:
-			case GROUPBY:
-			case PARTITION_HASH:
-				keys = normalizeKeys(streamer.getRecord(true));
-				return;
-			case PROJECTION:
-				fields = toIntArray(streamer.getRecord(true));
-				return;
-			case REBALANCE:
-				return;
-			case SORT:
-				field = "f0.f" + (Integer) streamer.getRecord(true);
-				int encodedOrder = (Integer) streamer.getRecord(true);
-				switch (encodedOrder) {
-					case 0:
-						order = Order.NONE;
-						break;
-					case 1:
-						order = Order.ASCENDING;
-						break;
-					case 2:
-						order = Order.DESCENDING;
-						break;
-					case 3:
-						order = Order.ANY;
-						break;
-					default:
-						order = Order.NONE;
-						break;
-				}
-				return;
-			case UNION:
-				otherID = (Integer) streamer.getRecord(true);
-				return;
-			case COGROUP:
-				otherID = (Integer) streamer.getRecord(true);
-				keys1 = normalizeKeys(streamer.getRecord(true));
-				keys2 = normalizeKeys(streamer.getRecord(true));
-				tmpType = streamer.getRecord();
-				types = tmpType == null ? null : getForObject(tmpType);
-				name = (String) streamer.getRecord();
-				return;
-			case CROSS:
-			case CROSS_H:
-			case CROSS_T:
-				otherID = (Integer) streamer.getRecord(true);
-				usesUDF = (Boolean) streamer.getRecord();
-				tmpType = streamer.getRecord();
-				types = tmpType == null ? null : getForObject(tmpType);
-				name = (String) streamer.getRecord();
-				return;
-			case REDUCE:
-			case GROUPREDUCE:
-				tmpType = streamer.getRecord();
-				types = tmpType == null ? null : getForObject(tmpType);
-				name = (String) streamer.getRecord();
-				return;
-			case JOIN:
-			case JOIN_H:
-			case JOIN_T:
-				keys1 = normalizeKeys(streamer.getRecord(true));
-				keys2 = normalizeKeys(streamer.getRecord(true));
-				otherID = (Integer) streamer.getRecord(true);
-				usesUDF = (Boolean) streamer.getRecord();
-				tmpType = streamer.getRecord();
-				types = tmpType == null ? null : getForObject(tmpType);
-				name = (String) streamer.getRecord();
-				return;
-			case MAPPARTITION:
-			case FLATMAP:
-			case MAP:
-			case FILTER:
-				tmpType = streamer.getRecord();
-				types = tmpType == null ? null : getForObject(tmpType);
-				name = (String) streamer.getRecord();
-				return;
+		otherID = (Integer) streamer.getRecord(true);
+		field = "f0.f" + (Integer) streamer.getRecord(true);
+		int encodedOrder = (Integer) streamer.getRecord(true);
+		switch (encodedOrder) {
+			case 0:
+				order = Order.NONE;
+				break;
+			case 1:
+				order = Order.ASCENDING;
+				break;
+			case 2:
+				order = Order.DESCENDING;
+				break;
+			case 3:
+				order = Order.ANY;
+				break;
 			default:
-				throw new UnsupportedOperationException("This operation is not implemented in the Python
API: " + identifier);
+				order = Order.NONE;
+				break;
+		}
+		keys = normalizeKeys(streamer.getRecord(true));
+		keys1 = normalizeKeys(streamer.getRecord(true));
+		keys2 = normalizeKeys(streamer.getRecord(true));
+		Object tmpType = streamer.getRecord();
+		types = tmpType == null ? null : getForObject(tmpType);
+		usesUDF = (Boolean) streamer.getRecord();
+		name = (String) streamer.getRecord();
+		lineDelimiter = (String) streamer.getRecord();
+		fieldDelimiter = (String) streamer.getRecord();
+		writeMode = ((Integer) streamer.getRecord(true)) == 1
+			? WriteMode.OVERWRITE
+			: WriteMode.NO_OVERWRITE;
+		path = (String) streamer.getRecord();
+		setID = (Integer) streamer.getRecord(true);
+		toError = (Boolean) streamer.getRecord();
+		count = (Integer) streamer.getRecord(true);
+		int valueCount = (Integer) streamer.getRecord(true);
+		values = new Object[valueCount];
+		for (int x = 0; x < valueCount; x++) {
+			values[x] = streamer.getRecord();
 		}
+
+		/*
+		aggregates = new AggregationEntry[count];
+		for (int x = 0; x < count; x++) {
+			int encodedAgg = (Integer) streamer.getRecord(true);
+			int field = (Integer) streamer.getRecord(true);
+			aggregates[x] = new AggregationEntry(encodedAgg, field);
+		}
+		*/
 	}
 
 	@Override
@@ -283,21 +187,6 @@ public class PythonOperationInfo {
 		throw new RuntimeException("Key argument is neither an int[] nor a Tuple: " + keys.toString());
 	}
 
-	private static int[] toIntArray(Object key) {
-		if (key instanceof Tuple) {
-			Tuple tuple = (Tuple) key;
-			int[] keys = new int[tuple.getArity()];
-			for (int y = 0; y < tuple.getArity(); y++) {
-				keys[y] = (Integer) tuple.getField(y);
-			}
-			return keys;
-		}
-		if (key instanceof int[]) {
-			return (int[]) key;
-		}
-		throw new RuntimeException("Key argument is neither an int[] nor a Tuple.");
-	}
-
 	private static String[] tupleToStringArray(Tuple tuple) {
 		String[] keys = new String[tuple.getArity()];
 		for (int y = 0; y < tuple.getArity(); y++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 5b0d846..3877ef1 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -280,7 +280,7 @@ public class PythonPlanBinder {
 	 */
 	protected enum Operation {
 		SOURCE_CSV, SOURCE_TEXT, SOURCE_VALUE, SOURCE_SEQ, SINK_CSV, SINK_TEXT, SINK_PRINT,
-		PROJECTION, SORT, UNION, FIRST, DISTINCT, GROUPBY, AGGREGATE,
+		SORT, UNION, FIRST, DISTINCT, GROUPBY, AGGREGATE,
 		REBALANCE, PARTITION_HASH,
 		BROADCAST,
 		COGROUP, CROSS, CROSS_H, CROSS_T, FILTER, FLATMAP, GROUPREDUCE, JOIN, JOIN_H, JOIN_T, MAP,
REDUCE, MAPPARTITION
@@ -289,121 +289,105 @@ public class PythonPlanBinder {
 	private void receiveOperations() throws IOException {
 		Integer operationCount = (Integer) streamer.getRecord(true);
 		for (int x = 0; x < operationCount; x++) {
-			String identifier = (String) streamer.getRecord();
-			Operation op = null;
+			PythonOperationInfo info = new PythonOperationInfo(streamer);
+			Operation op;
 			try {
-				op = Operation.valueOf(identifier.toUpperCase());
+				op = Operation.valueOf(info.identifier.toUpperCase());
 			} catch (IllegalArgumentException iae) {
-				throw new IllegalArgumentException("Invalid operation specified: " + identifier);
+				throw new IllegalArgumentException("Invalid operation specified: " + info.identifier);
 			}
-			if (op != null) {
-				switch (op) {
-					case SOURCE_CSV:
-						createCsvSource(createOperationInfo(op));
-						break;
-					case SOURCE_TEXT:
-						createTextSource(createOperationInfo(op));
-						break;
-					case SOURCE_VALUE:
-						createValueSource(createOperationInfo(op));
-						break;
-					case SOURCE_SEQ:
-						createSequenceSource(createOperationInfo(op));
-						break;
-					case SINK_CSV:
-						createCsvSink(createOperationInfo(op));
-						break;
-					case SINK_TEXT:
-						createTextSink(createOperationInfo(op));
-						break;
-					case SINK_PRINT:
-						createPrintSink(createOperationInfo(op));
-						break;
-					case BROADCAST:
-						createBroadcastVariable(createOperationInfo(op));
-						break;
-					case AGGREGATE:
-						createAggregationOperation(createOperationInfo(op));
-						break;
-					case DISTINCT:
-						createDistinctOperation(createOperationInfo(op));
-						break;
-					case FIRST:
-						createFirstOperation(createOperationInfo(op));
-						break;
-					case PARTITION_HASH:
-						createHashPartitionOperation(createOperationInfo(op));
-						break;
-					case PROJECTION:
-						createProjectOperation(createOperationInfo(op));
-						break;
-					case REBALANCE:
-						createRebalanceOperation(createOperationInfo(op));
-						break;
-					case GROUPBY:
-						createGroupOperation(createOperationInfo(op));
-						break;
-					case SORT:
-						createSortOperation(createOperationInfo(op));
-						break;
-					case UNION:
-						createUnionOperation(createOperationInfo(op));
-						break;
-					case COGROUP:
-						createCoGroupOperation(createOperationInfo(op));
-						break;
-					case CROSS:
-						createCrossOperation(NONE, createOperationInfo(op));
-						break;
-					case CROSS_H:
-						createCrossOperation(HUGE, createOperationInfo(op));
-						break;
-					case CROSS_T:
-						createCrossOperation(TINY, createOperationInfo(op));
-						break;
-					case FILTER:
-						createFilterOperation(createOperationInfo(op));
-						break;
-					case FLATMAP:
-						createFlatMapOperation(createOperationInfo(op));
-						break;
-					case GROUPREDUCE:
-						createGroupReduceOperation(createOperationInfo(op));
-						break;
-					case JOIN:
-						createJoinOperation(NONE, createOperationInfo(op));
-						break;
-					case JOIN_H:
-						createJoinOperation(HUGE, createOperationInfo(op));
-						break;
-					case JOIN_T:
-						createJoinOperation(TINY, createOperationInfo(op));
-						break;
-					case MAP:
-						createMapOperation(createOperationInfo(op));
-						break;
-					case MAPPARTITION:
-						createMapPartitionOperation(createOperationInfo(op));
-						break;
-					case REDUCE:
-						createReduceOperation(createOperationInfo(op));
-						break;
-				}
+			switch (op) {
+				case SOURCE_CSV:
+					createCsvSource(info);
+					break;
+				case SOURCE_TEXT:
+					createTextSource(info);
+					break;
+				case SOURCE_VALUE:
+					createValueSource(info);
+					break;
+				case SOURCE_SEQ:
+					createSequenceSource(info);
+					break;
+				case SINK_CSV:
+					createCsvSink(info);
+					break;
+				case SINK_TEXT:
+					createTextSink(info);
+					break;
+				case SINK_PRINT:
+					createPrintSink(info);
+					break;
+				case BROADCAST:
+					createBroadcastVariable(info);
+					break;
+				case AGGREGATE:
+					createAggregationOperation(info);
+					break;
+				case DISTINCT:
+					createDistinctOperation(info);
+					break;
+				case FIRST:
+					createFirstOperation(info);
+					break;
+				case PARTITION_HASH:
+					createHashPartitionOperation(info);
+					break;
+				case REBALANCE:
+					createRebalanceOperation(info);
+					break;
+				case GROUPBY:
+					createGroupOperation(info);
+					break;
+				case SORT:
+					createSortOperation(info);
+					break;
+				case UNION:
+					createUnionOperation(info);
+					break;
+				case COGROUP:
+					createCoGroupOperation(info);
+					break;
+				case CROSS:
+					createCrossOperation(NONE, info);
+					break;
+				case CROSS_H:
+					createCrossOperation(HUGE, info);
+					break;
+				case CROSS_T:
+					createCrossOperation(TINY, info);
+					break;
+				case FILTER:
+					createFilterOperation(info);
+					break;
+				case FLATMAP:
+					createFlatMapOperation(info);
+					break;
+				case GROUPREDUCE:
+					createGroupReduceOperation(info);
+					break;
+				case JOIN:
+					createJoinOperation(NONE, info);
+					break;
+				case JOIN_H:
+					createJoinOperation(HUGE, info);
+					break;
+				case JOIN_T:
+					createJoinOperation(TINY, info);
+					break;
+				case MAP:
+					createMapOperation(info);
+					break;
+				case MAPPARTITION:
+					createMapPartitionOperation(info);
+					break;
+				case REDUCE:
+					createReduceOperation(info);
+					break;
 			}
 		}
 	}
 
-	/**
-	 * This method creates an OperationInfo object based on the operation-identifier passed.
-	 *
-	 * @param operationIdentifier
-	 * @return
-	 * @throws IOException
-	 */
-	private PythonOperationInfo createOperationInfo(Operation operationIdentifier) throws IOException
{
-		return new PythonOperationInfo(streamer, operationIdentifier);
-	}
-
 	private void createCsvSource(PythonOperationInfo info) throws IOException {
 		if (!(info.types instanceof TupleTypeInfo)) {
 			throw new RuntimeException("The output type of a csv source has to be a tuple. The derived
type is " + info);
@@ -491,11 +475,6 @@ public class PythonPlanBinder {
 		sets.put(info.setID, op1.partitionByHash(info.keys).map(new KeyDiscarder()).name("HashPartitionPostStep"));
 	}
 
-	private void createProjectOperation(PythonOperationInfo info) throws IOException {
-		DataSet op1 = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, op1.project(info.fields).name("Projection"));
-	}
-
 	private void createRebalanceOperation(PythonOperationInfo info) throws IOException {
 		DataSet op = (DataSet) sets.get(info.parentID);
 		sets.put(info.setID, op.rebalance().name("Rebalance"));

http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
index 6d2dcd1..a54b8dd 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
@@ -49,7 +49,7 @@ public class PythonPlanReceiver implements Serializable {
 
 	private Deserializer getDeserializer() throws IOException {
 		byte type = (byte) input.readByte();
-		if (type > 0 && type < 26) {
+		if (type >= 0 && type < 26) {
 				Deserializer[] d = new Deserializer[type];
 				for (int x = 0; x < d.length; x++) {
 					d[x] = getDeserializer();

http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
index 340497d..8d1934c 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
@@ -27,7 +27,7 @@ class GroupReduceFunction(Function.Function):
 
     def _configure(self, input_file, output_file, port, env, info):
         super(GroupReduceFunction, self)._configure(input_file, output_file, port, env, info)
-        if info.key1 is None:
+        if len(info.key1) == 0:
             self._run = self._run_all_group_reduce
         else:
             self._run = self._run_grouped_group_reduce
@@ -63,4 +63,4 @@ class GroupReduceFunction(Function.Function):
         pass
 
     def combine(self, iterator, collector):
-        self.reduce(iterator, collector)
\ No newline at end of file
+        self.reduce(iterator, collector)

http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
index 95e8b8a..b1d2201 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
@@ -26,7 +26,7 @@ class ReduceFunction(Function.Function):
 
     def _configure(self, input_file, output_file, port, env, info):
         super(ReduceFunction, self)._configure(input_file, output_file, port, env, info)
-        if info.key1 is None:
+        if len(info.key1) == 0:
             self._run = self._run_all_reduce
         else:
             self._run = self._run_grouped_reduce
@@ -64,4 +64,4 @@ class ReduceFunction(Function.Function):
         pass
 
     def combine(self, value1, value2):
-        return self.reduce(value1, value2)
\ No newline at end of file
+        return self.reduce(value1, value2)

http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
index 3132651..5bb34e5 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
@@ -548,6 +548,7 @@ class OperatorSet(DataSet):
 
     def with_broadcast_set(self, name, set):
         child = OperationInfo()
+        child.identifier = _Identifier.BROADCAST
         child.parent = self._info
         child.other = set._info
         child.name = name

http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index 777f30b..b410ead 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -238,11 +238,7 @@ class Environment(object):
 
     def _send_plan(self):
         self._send_parameters()
-        self._collector.collect(len(self._sources) + len(self._sets) + len(self._sinks) +
len(self._broadcast))
-        self._send_sources()
         self._send_operations()
-        self._send_sinks()
-        self._send_broadcast()
 
     def _send_parameters(self):
         collect = self._collector.collect
@@ -251,117 +247,40 @@ class Environment(object):
         collect(("mode", self._local_mode))
         collect(("retry", self._retry))
 
-    def _send_sources(self):
-        for source in self._sources:
-            identifier = source.identifier
-            collect = self._collector.collect
-            collect(identifier)
-            collect(source.id)
-            for case in Switch(identifier):
-                if case(_Identifier.SOURCE_CSV):
-                    collect(source.path)
-                    collect(source.delimiter_field)
-                    collect(source.delimiter_line)
-                    collect(source.types)
-                    break
-                if case(_Identifier.SOURCE_TEXT):
-                    collect(source.path)
-                    break
-                if case(_Identifier.SOURCE_VALUE):
-                    collect(len(source.values))
-                    for value in source.values:
-                        collect(value)
-                    break
-
     def _send_operations(self):
-        collect = self._collector.collect
+        self._collector.collect(len(self._sources) + len(self._sets) + len(self._sinks) +
len(self._broadcast))
+        for source in self._sources:
+            self._send_operation(source)
         for set in self._sets:
-            identifier = set.identifier
-            collect(set.identifier)
-            collect(set.id)
-            collect(set.parent.id)
-            for case in Switch(identifier):
-                if case(_Identifier.REBALANCE):
-                    break
-                if case(_Identifier.DISTINCT, _Identifier.PARTITION_HASH):
-                    collect(set.keys)
-                    break
-                if case(_Identifier.FIRST):
-                    collect(set.count)
-                    break
-                if case(_Identifier.SORT):
-                    collect(set.field)
-                    collect(set.order)
-                    break
-                if case(_Identifier.GROUP):
-                    collect(set.keys)
-                    break
-                if case(_Identifier.COGROUP):
-                    collect(set.other.id)
-                    collect(set.key1)
-                    collect(set.key2)
-                    collect(set.types)
-                    collect(set.name)
-                    break
-                if case(_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST):
-                    collect(set.other.id)
-                    collect(set.uses_udf)
-                    collect(set.types)
-                    collect(set.name)
-                    break
-                if case(_Identifier.REDUCE, _Identifier.GROUPREDUCE):
-                    collect(set.types)
-                    collect(set.name)
-                    break
-                if case(_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT):
-                    collect(set.key1)
-                    collect(set.key2)
-                    collect(set.other.id)
-                    collect(set.uses_udf)
-                    collect(set.types)
-                    collect(set.name)
-                    break
-                if case(_Identifier.MAP, _Identifier.MAPPARTITION, _Identifier.FLATMAP, _Identifier.FILTER):
-                    collect(set.types)
-                    collect(set.name)
-                    break
-                if case(_Identifier.UNION):
-                    collect(set.other.id)
-                    break
-                if case(_Identifier.PROJECTION):
-                    collect(set.keys)
-                    break
-                if case():
-                    raise KeyError("Environment._send_child_sets(): Invalid operation identifier:
" + str(identifier))
-
-    def _send_sinks(self):
+            self._send_operation(set)
         for sink in self._sinks:
-            identifier = sink.identifier
-            collect = self._collector.collect
-            collect(identifier)
-            collect(sink.parent.id)
-            for case in Switch(identifier):
-                if case(_Identifier.SINK_CSV):
-                    collect(sink.path)
-                    collect(sink.delimiter_field)
-                    collect(sink.delimiter_line)
-                    collect(sink.write_mode)
-                    break;
-                if case(_Identifier.SINK_TEXT):
-                    collect(sink.path)
-                    collect(sink.write_mode)
-                    break
-                if case(_Identifier.SINK_PRINT):
-                    collect(sink.to_err)
-                    break
-
-    def _send_broadcast(self):
+            self._send_operation(sink)
+        for bcv in self._broadcast:
+            self._send_operation(bcv)
+
+    def _send_operation(self, set):
         collect = self._collector.collect
-        for entry in self._broadcast:
-            collect(_Identifier.BROADCAST)
-            collect(entry.parent.id)
-            collect(entry.other.id)
-            collect(entry.name)
+        collect(set.identifier)
+        collect(set.parent.id if set.parent is not None else -1)
+        collect(set.other.id if set.other is not None else -1)
+        collect(set.field)
+        collect(set.order)
+        collect(set.keys)
+        collect(set.key1)
+        collect(set.key2)
+        collect(set.types)
+        collect(set.uses_udf)
+        collect(set.name)
+        collect(set.delimiter_line)
+        collect(set.delimiter_field)
+        collect(set.write_mode)
+        collect(set.path)
+        collect(set.id)
+        collect(set.to_err)
+        collect(set.count)
+        collect(len(set.values))
+        for value in set.values:
+            collect(value)
 
     def _receive_result(self):
         jer = JobExecutionResult()

http://git-wip-us.apache.org/repos/asf/flink/blob/499b60fe/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
index 6eb228c..bd7d2b5 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
@@ -21,33 +21,35 @@ from flink.plan.Constants import WriteMode
 class OperationInfo():
     def __init__(self, info=None):
         if info is None:
+            #fields being transferred to the java side
+            self.identifier = -1
             self.parent = None
             self.other = None
-            self.parent_set = None
-            self.other_set = None
-            self.identifier = None
-            self.field = None
-            self.order = None
-            self.keys = None
-            self.key1 = None
-            self.key2 = None
+            self.field = -1
+            self.order = 0
+            self.keys = ()
+            self.key1 = ()
+            self.key2 = ()
             self.types = None
-            self.operator = None
             self.uses_udf = False
             self.name = None
             self.delimiter_line = "\n"
             self.delimiter_field = ","
             self.write_mode = WriteMode.NO_OVERWRITE
-            self.sinks = []
-            self.children = []
-            self.path = None
+            self.path = ""
             self.count = 0
             self.values = []
             self.projections = []
-            self.bcvars = []
-            self.id = None
+            self.id = -1
             self.to_err = False
+            #internally used
+            self.parent_set = None
+            self.other_set = None
             self.chained_info = None
+            self.bcvars = []
+            self.sinks = []
+            self.children = []
+            self.operator = None
         else:
             self.__dict__.update(info.__dict__)
 


Mime
View raw message