flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject flink git commit: [FLINK-3063][py] Remove combiner
Date Wed, 20 Jan 2016 09:19:35 GMT
Repository: flink
Updated Branches:
  refs/heads/master c7ada8d78 -> 54b52c9be


[FLINK-3063][py] Remove combiner


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54b52c9b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54b52c9b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54b52c9b

Branch: refs/heads/master
Commit: 54b52c9be66a981360fed121fcfd7f1be28ac1cb
Parents: c7ada8d
Author: zentol <chesnay@apache.org>
Authored: Mon Nov 23 19:27:30 2015 +0100
Committer: zentol <s.motsu@web.de>
Committed: Wed Jan 20 07:39:59 2016 +0100

----------------------------------------------------------------------
 .../flink/python/api/PythonOperationInfo.java   |  3 -
 .../flink/python/api/PythonPlanBinder.java      | 64 ++++---------
 .../api/functions/IdentityGroupReduce.java      | 25 ++++++
 .../api/functions/PythonCombineIdentity.java    | 79 ----------------
 .../api/flink/functions/GroupReduceFunction.py  | 94 +++++---------------
 .../api/flink/functions/ReduceFunction.py       | 89 +++++-------------
 .../flink/python/api/flink/plan/DataSet.py      | 25 +-----
 .../flink/python/api/flink/plan/Environment.py  | 72 +++++----------
 .../python/api/flink/plan/OperationInfo.py      |  2 -
 .../org/apache/flink/python/api/test_main.py    | 29 ------
 10 files changed, 109 insertions(+), 373 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/54b52c9b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
index 5cf3621..6ecd683 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
@@ -33,7 +33,6 @@ public class PythonOperationInfo {
 	public TypeInformation<?> types; //typeinformation about output type
 	public AggregationEntry[] aggregates;
 	public ProjectionEntry[] projections; //projectFirst/projectSecond
-	public boolean combine;
 	public Object[] values;
 	public int count;
 	public int field;
@@ -178,7 +177,6 @@ public class PythonOperationInfo {
 			case GROUPREDUCE:
 				tmpType = streamer.getRecord();
 				types = tmpType == null ? null : getForObject(tmpType);
-				combine = (Boolean) streamer.getRecord();
 				name = (String) streamer.getRecord();
 				return;
 			case JOIN:
@@ -224,7 +222,6 @@ public class PythonOperationInfo {
 		sb.append("Keys: ").append(Arrays.toString(keys)).append("\n");
 		sb.append("Aggregates: ").append(Arrays.toString(aggregates)).append("\n");
 		sb.append("Projections: ").append(Arrays.toString(projections)).append("\n");
-		sb.append("Combine: ").append(combine).append("\n");
 		sb.append("Count: ").append(count).append("\n");
 		sb.append("Field: ").append(field).append("\n");
 		sb.append("Order: ").append(order.toString()).append("\n");

http://git-wip-us.apache.org/repos/asf/flink/blob/54b52c9b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index d178dcb..2e64a56 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -48,7 +48,7 @@ import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.NONE;
 import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.TINY;
 import org.apache.flink.python.api.PythonOperationInfo.ProjectionEntry;
 import org.apache.flink.python.api.functions.PythonCoGroup;
-import org.apache.flink.python.api.functions.PythonCombineIdentity;
+import org.apache.flink.python.api.functions.IdentityGroupReduce;
 import org.apache.flink.python.api.functions.PythonMapPartition;
 import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
 import org.apache.flink.runtime.filecache.FileCache;
@@ -592,45 +592,24 @@ public class PythonPlanBinder {
 	}
 
 	private DataSet applyGroupReduceOperation(DataSet op1, PythonOperationInfo info) {
-		if (info.combine) {
-			return op1.reduceGroup(new PythonCombineIdentity(info.setID * -1))
-					.setCombinable(true).name("PythonCombine")
-					.mapPartition(new PythonMapPartition(info.setID, info.types))
-					.name(info.name);
-		} else {
-			return op1.reduceGroup(new PythonCombineIdentity())
-					.setCombinable(false).name("PythonGroupReducePreStep")
-					.mapPartition(new PythonMapPartition(info.setID, info.types))
-					.name(info.name);
-		}
+		return op1.reduceGroup(new IdentityGroupReduce())
+				.setCombinable(false).name("PythonGroupReducePreStep")
+				.mapPartition(new PythonMapPartition(info.setID, info.types))
+				.name(info.name);
 	}
 
 	private DataSet applyGroupReduceOperation(UnsortedGrouping op1, PythonOperationInfo info)
{
-		if (info.combine) {
-			return op1.reduceGroup(new PythonCombineIdentity(info.setID * -1))
-					.setCombinable(true).name("PythonCombine")
-					.mapPartition(new PythonMapPartition(info.setID, info.types))
-					.name(info.name);
-		} else {
-			return op1.reduceGroup(new PythonCombineIdentity())
-					.setCombinable(false).name("PythonGroupReducePreStep")
-					.mapPartition(new PythonMapPartition(info.setID, info.types))
-					.name(info.name);
-		}
+		return op1.reduceGroup(new IdentityGroupReduce())
+				.setCombinable(false).name("PythonGroupReducePreStep")
+				.mapPartition(new PythonMapPartition(info.setID, info.types))
+				.name(info.name);
 	}
 
 	private DataSet applyGroupReduceOperation(SortedGrouping op1, PythonOperationInfo info)
{
-		if (info.combine) {
-			return op1.reduceGroup(new PythonCombineIdentity(info.setID * -1))
-					.setCombinable(true).name("PythonCombine")
-					.mapPartition(new PythonMapPartition(info.setID, info.types))
-					.name(info.name);
-		} else {
-			return op1.reduceGroup(new PythonCombineIdentity())
-					.setCombinable(false).name("PythonGroupReducePreStep")
-					.mapPartition(new PythonMapPartition(info.setID, info.types))
-					.name(info.name);
-		}
+		return op1.reduceGroup(new IdentityGroupReduce())
+				.setCombinable(false).name("PythonGroupReducePreStep")
+				.mapPartition(new PythonMapPartition(info.setID, info.types))
+				.name(info.name);
 	}
 
 	private void createJoinOperation(DatasizeHint mode, PythonOperationInfo info) {
@@ -696,23 +675,16 @@ public class PythonPlanBinder {
 	}
 
 	private DataSet applyReduceOperation(DataSet op1, PythonOperationInfo info) {
-		return op1.reduceGroup(new PythonCombineIdentity())
+		return op1.reduceGroup(new IdentityGroupReduce())
 				.setCombinable(false).name("PythonReducePreStep")
 				.mapPartition(new PythonMapPartition(info.setID, info.types))
 				.name(info.name);
 	}
 
 	private DataSet applyReduceOperation(UnsortedGrouping op1, PythonOperationInfo info) {
-		if (info.combine) {
-			return op1.reduceGroup(new PythonCombineIdentity(info.setID * -1))
-					.setCombinable(true).name("PythonCombine")
-					.mapPartition(new PythonMapPartition(info.setID, info.types))
-					.name(info.name);
-		} else {
-			return op1.reduceGroup(new PythonCombineIdentity())
-					.setCombinable(false).name("PythonReducePreStep")
-					.mapPartition(new PythonMapPartition(info.setID, info.types))
-					.name(info.name);
-		}
+		return op1.reduceGroup(new IdentityGroupReduce())
+				.setCombinable(false).name("PythonReducePreStep")
+				.mapPartition(new PythonMapPartition(info.setID, info.types))
+				.name(info.name);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/54b52c9b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/IdentityGroupReduce.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/IdentityGroupReduce.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/IdentityGroupReduce.java
new file mode 100644
index 0000000..a420153
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/IdentityGroupReduce.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api.functions;
+
+import org.apache.flink.util.Collector;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+
+public class IdentityGroupReduce<IN> implements GroupReduceFunction<IN, IN> {
+	@Override
+	public final void reduce(Iterable<IN> values, Collector<IN> out) throws Exception
{
+		for (IN value : values) {
+			out.collect(value);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54b52c9b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java
deleted file mode 100644
index f80d975..0000000
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.python.api.functions;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.streaming.data.PythonStreamer;
-import org.apache.flink.util.Collector;
-import java.io.IOException;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-
-/**
- * Multi-purpose class, used for Combine-operations using a python script, and as a preprocess
step for
- * GroupReduce-operations.
- *
- * @param <IN>
- */
-public class PythonCombineIdentity<IN> extends RichGroupReduceFunction<IN, IN>
{
-	private PythonStreamer streamer;
-
-	public PythonCombineIdentity() {
-		streamer = null;
-	}
-
-	public PythonCombineIdentity(int id) {
-		streamer = new PythonStreamer(this, id);
-	}
-
-	@Override
-	public void open(Configuration config) throws IOException {
-		if (streamer != null) {
-			streamer.open();
-			streamer.sendBroadCastVariables(config);
-		}
-	}
-
-	/**
-	 * Calls the external python function.
-	 *
-	 * @param values function input
-	 * @param out collector
-	 * @throws IOException
-	 */
-	@Override
-	public final void reduce(Iterable<IN> values, Collector<IN> out) throws Exception
{
-		for (IN value : values) {
-			out.collect(value);
-		}
-	}
-
-	/**
-	 * Calls the external python function.
-	 *
-	 * @param values function input
-	 * @param out collector
-	 * @throws IOException
-	 */
-	@Override
-	public final void combine(Iterable<IN> values, Collector<IN> out) throws Exception
{
-		streamer.streamBufferWithoutGroups(values.iterator(), out);
-	}
-
-	@Override
-	public void close() throws IOException {
-		if (streamer != null) {
-			streamer.close();
-			streamer = null;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/54b52c9b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
index 23e39ab..b758c19 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
@@ -25,41 +25,35 @@ class GroupReduceFunction(Function.Function):
     def __init__(self):
         super(GroupReduceFunction, self).__init__()
         self._keys = None
-        self._sort_ops = []
-        self._combine = False
-        self._values = []
 
     def _configure(self, input_file, output_file, port, env):
-        if self._combine:
-            self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file,
port)
-            self._iterator = Iterator.Iterator(self._connection, env)
-            self._collector = Collector.Collector(self._connection, env)
-            self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
-            self._run = self._run_combine
+        self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file,
port)
+        self._iterator = Iterator.Iterator(self._connection, env)
+        if self._keys is None:
+            self._run = self._run_all_group_reduce
         else:
-            self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file,
port)
-            self._iterator = Iterator.Iterator(self._connection, env)
+            self._run = self._run_grouped_group_reduce
             self._group_iterator = Iterator.GroupIterator(self._iterator, self._keys)
-            self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
-            self._configure_chain(Collector.Collector(self._connection, env))
-        self._open()
-
-    def _open(self):
-        if self._keys is None:
-            self._extract_keys = self._extract_keys_id
-
-    def _close(self):
-        self._sort_and_combine()
-        self._collector._close()
+        self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
+        self._collector = Collector.Collector(self._connection, env)
 
     def _set_grouping_keys(self, keys):
         self._keys = keys
 
-    def _set_sort_ops(self, ops):
-        self._sort_ops = ops
+    def _run(self):
+        pass
+
+    def _run_all_group_reduce(self):
+        collector = self._collector
+        function = self.reduce
+        iterator = self._iterator
+        result = function(iterator, collector)
+        if result is not None:
+            for value in result:
+                collector.collect(value)
+        collector._close()
 
-    def _run(self):#reduce
-        connection = self._connection
+    def _run_grouped_group_reduce(self):
         collector = self._collector
         function = self.reduce
         iterator = self._group_iterator
@@ -71,54 +65,6 @@ class GroupReduceFunction(Function.Function):
                 for value in result:
                     collector.collect(value)
         collector._close()
-        connection.send_end_signal()
-
-    def _run_combine(self):#unchained combine
-        connection = self._connection
-        collector = self._collector
-        function = self.combine
-        iterator = self._iterator
-        while 1:
-            result = function(iterator, collector)
-            if result is not None:
-                for value in result:
-                    collector.collect(value)
-            connection.send_end_signal()
-            connection.reset()
-
-    def collect(self, value):#chained combine
-        self._values.append(value)
-        if len(self._values) > 1000:
-            self._sort_and_combine()
-
-    def _sort_and_combine(self):
-        values = self._values
-        function = self.combine
-        collector = self._collector
-        extractor = self._extract_keys
-        grouping = defaultdict(list)
-        for value in values:
-            grouping[extractor(value)].append(value)
-        keys = list(grouping.keys())
-        keys.sort()
-        for key in keys:
-            values = grouping[key]
-            for op in reversed(self._sort_ops):
-                values.sort(key=lambda x:x[op[0]], reverse = op[1] == Order.DESCENDING)
-            result = function(Iterator.ListIterator(values), collector)
-            if result is not None:
-                for res in result:
-                    collector.collect(res)
-        self._values = []
-
-    def _extract_keys(self, x):
-        return tuple([x[k] for k in self._keys])
-
-    def _extract_sort_keys(self, x):
-        return tuple(x[k] for k in self._sort_keys)
-
-    def _extract_keys_id(self, x):
-        return x
 
     def reduce(self, iterator, collector):
         pass

http://git-wip-us.apache.org/repos/asf/flink/blob/54b52c9b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
index 4d19c13..45a22da 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
@@ -24,48 +24,25 @@ class ReduceFunction(Function.Function):
     def __init__(self):
         super(ReduceFunction, self).__init__()
         self._keys = None
-        self._combine = False
-        self._values = []
 
     def _configure(self, input_file, output_file, port, env):
-        if self._combine:
-            self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file,
port)
-            self._iterator = Iterator.Iterator(self._connection, env)
-            self._collector = Collector.Collector(self._connection, env)
-            self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
-            self._run = self._run_combine
+        self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file,
port)
+        self._iterator = Iterator.Iterator(self._connection, env)
+        if self._keys is None:
+            self._run = self._run_all_reduce
         else:
-            self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file,
port)
-            self._iterator = Iterator.Iterator(self._connection, env)
-            if self._keys is None:
-                self._run = self._run_allreduce
-            else:
-                self._group_iterator = Iterator.GroupIterator(self._iterator, self._keys)
-            self._configure_chain(Collector.Collector(self._connection, env))
-            self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
+            self._run = self._run_grouped_reduce
+            self._group_iterator = Iterator.GroupIterator(self._iterator, self._keys)
+        self._collector = Collector.Collector(self._connection, env)
+        self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
 
     def _set_grouping_keys(self, keys):
         self._keys = keys
 
-    def _close(self):
-        self._sort_and_combine()
-        self._collector._close()
-
-    def _run(self):#grouped reduce
-        collector = self._collector
-        function = self.reduce
-        iterator = self._group_iterator
-        iterator._init()
-        while iterator.has_group():
-            iterator.next_group()
-            if iterator.has_next():
-                base = iterator.next()
-                for value in iterator:
-                    base = function(base, value)
-            collector.collect(base)
-        collector._close()
+    def _run(self):
+        pass
 
-    def _run_allreduce(self):#ungrouped reduce
+    def _run_all_reduce(self):
         collector = self._collector
         function = self.reduce
         iterator = self._iterator
@@ -76,45 +53,19 @@ class ReduceFunction(Function.Function):
             collector.collect(base)
         collector._close()
 
-    def _run_combine(self):#unchained combine
-        connection = self._connection
+    def _run_grouped_reduce(self):
         collector = self._collector
-        function = self.combine
-        iterator = self._iterator
-        while 1:
+        function = self.reduce
+        iterator = self._group_iterator
+        iterator._init()
+        while iterator.has_group():
+            iterator.next_group()
             if iterator.has_next():
                 base = iterator.next()
-                while iterator.has_next():
-                    base = function(base, iterator.next())
-            collector.collect(base)
-            connection.send_end_signal()
-            connection.reset()
-
-    def collect(self, value):#chained combine
-        self._values.append(value)
-        if len(self._values) > 1000:
-            self._sort_and_combine()
-
-    def _sort_and_combine(self):
-        values = self._values
-        function = self.combine
-        collector = self._collector
-        extractor = self._extract_keys
-        grouping = defaultdict(list)
-        for value in values:
-            grouping[extractor(value)].append(value)
-        keys = list(grouping.keys())
-        keys.sort()
-        for key in keys:
-            iterator = Iterator.ListIterator(grouping[key])
-            base = iterator.next()
-            while iterator.has_next():
-                base = function(base, iterator.next())
+                for value in iterator:
+                    base = function(base, value)
             collector.collect(base)
-        self._values = []
-
-    def _extract_keys(self, x):
-        return tuple([x[k] for k in self._keys])
+        collector._close()
 
     def reduce(self, value1, value2):
         pass

http://git-wip-us.apache.org/repos/asf/flink/blob/54b52c9b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
index 7ef5488..25ec8b8 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
@@ -147,13 +147,8 @@ class Set(object):
         child_set = OperatorSet(self._env, child)
         child.identifier = _Identifier.GROUPREDUCE
         child.parent = self._info
-        child.operator = copy.deepcopy(operator)
-        child.operator._combine = False
-        child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        child.operator = operator
         child.types = types
-        child.combine = combinable
-        child.combineop = operator
-        child.combineop._combine = True
         child.name = "PythonGroupReduce"
         self._info.children.append(child)
         self._env._sets.append(child)
@@ -183,9 +178,6 @@ class ReduceSet(Set):
         child.identifier = _Identifier.REDUCE
         child.parent = self._info
         child.operator = operator
-        child.combineop = operator
-        child.combine = False
-        child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
         child.name = "PythonReduce"
         child.types = deduct_output_type(self._info)
         self._info.children.append(child)
@@ -532,18 +524,12 @@ class Grouping(object):
             operator = GroupReduceFunction()
             operator.reduce = f
         operator._set_grouping_keys(self._child_chain[0].keys)
-        operator._set_sort_ops([(x.field, x.order) for x in self._child_chain[1:]])
         child = OperationInfo()
         child_set = OperatorSet(self._env, child)
         child.identifier = _Identifier.GROUPREDUCE
         child.parent = self._info
-        child.operator = copy.deepcopy(operator)
-        child.operator._combine = False
-        child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
+        child.operator = operator
         child.types = types
-        child.combine = combinable
-        child.combineop = operator
-        child.combineop._combine = True
         child.name = "PythonGroupReduce"
         self._info.children.append(child)
         self._env._sets.append(child)
@@ -594,12 +580,7 @@ class UnsortedGrouping(Grouping):
         child_set = OperatorSet(self._env, child)
         child.identifier = _Identifier.REDUCE
         child.parent = self._info
-        child.operator = copy.deepcopy(operator)
-        child.operator._combine = False
-        child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__)
-        child.combine = True
-        child.combineop = operator
-        child.combineop._combine = True
+        child.operator = operator
         child.name = "PythonReduce"
         child.types = deduct_output_type(self._info)
         self._info.children.append(child)

http://git-wip-us.apache.org/repos/asf/flink/blob/54b52c9b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index 472592a..a2279c3 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -202,60 +202,35 @@ class Environment(object):
         self._find_chains()
 
     def _find_chains(self):
-        udf = set([_Identifier.MAP, _Identifier.FLATMAP, _Identifier.FILTER, _Identifier.MAPPARTITION,
-                   _Identifier.GROUPREDUCE, _Identifier.REDUCE, _Identifier.COGROUP,
-                   _Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST,
-                   _Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT])
-        chainable = set([_Identifier.MAP, _Identifier.FILTER, _Identifier.FLATMAP, _Identifier.GROUPREDUCE,
_Identifier.REDUCE])
-        multi_input = set([_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT, _Identifier.CROSS,
_Identifier.CROSSH, _Identifier.CROSST, _Identifier.COGROUP, _Identifier.UNION])
+        chainable = set([_Identifier.MAP, _Identifier.FILTER, _Identifier.FLATMAP])
+        dual_input = set([_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT, _Identifier.CROSS,
_Identifier.CROSSH, _Identifier.CROSST, _Identifier.COGROUP, _Identifier.UNION])
         x = len(self._sets) - 1
         while x > -1:
             child = self._sets[x]
             child_type = child.identifier
             if child_type in chainable:
                 parent = child.parent
-                parent_type = parent.identifier
-                if len(parent.sinks) == 0:
-                    if child_type == _Identifier.GROUPREDUCE or child_type == _Identifier.REDUCE:
-                        if child.combine:
-                            while parent_type == _Identifier.GROUP or parent_type == _Identifier.SORT:
-                                parent = parent.parent
-                                parent_type = parent.identifier
-                            if parent_type in udf and len(parent.children) == 1:
-                                if parent.operator is not None:
-                                    function = child.combineop
-                                    parent.operator._chain(function)
-                                    child.combine = False
-                                    parent.name += " -> PythonCombine"
-                                    for bcvar in child.bcvars:
-                                        bcvar_copy = copy.deepcopy(bcvar)
-                                        bcvar_copy.parent = parent
-                                        self._broadcast.append(bcvar_copy)
-                    else:
-                        if parent_type in udf and len(parent.children) == 1:
-                            parent_op = parent.operator
-                            if parent_op is not None:
-                                function = child.operator
-                                parent_op._chain(function)
-                                parent.name += " -> " + child.name
-                                parent.types = child.types
-                                for grand_child in child.children:
-                                    if grand_child.identifier in multi_input:
-                                        if grand_child.parent.id == child.id:
-                                            grand_child.parent = parent
-                                        else:
-                                            grand_child.other = parent
-                                    else:
-                                        grand_child.parent = parent
-                                        parent.children.append(grand_child)
-                                parent.children.remove(child)
-                                for sink in child.sinks:
-                                    sink.parent = parent
-                                    parent.sinks.append(sink)
-                                for bcvar in child.bcvars:
-                                    bcvar.parent = parent
-                                    parent.bcvars.append(bcvar)
-                                self._remove_set((child))
+                if parent.operator is not None and len(parent.children) == 1 and len(parent.sinks)
== 0:
+                    parent.operator._chain(child.operator)
+                    parent.name += " -> " + child.name
+                    parent.types = child.types
+                    for grand_child in child.children:
+                        if grand_child.identifier in dual_input:
+                            if grand_child.parent.id == child.id:
+                                grand_child.parent = parent
+                            else:
+                                grand_child.other = parent
+                        else:
+                            grand_child.parent = parent
+                            parent.children.append(grand_child)
+                    parent.children.remove(child)
+                    for sink in child.sinks:
+                        sink.parent = parent
+                        parent.sinks.append(sink)
+                    for bcvar in child.bcvars:
+                        bcvar.parent = parent
+                        parent.bcvars.append(bcvar)
+                    self._remove_set((child))
             x -= 1
 
     def _remove_set(self, set):
@@ -331,7 +306,6 @@ class Environment(object):
                     break
                 if case(_Identifier.REDUCE, _Identifier.GROUPREDUCE):
                     collect(set.types)
-                    collect(set.combine)
                     collect(set.name)
                     break
                 if case(_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT):

http://git-wip-us.apache.org/repos/asf/flink/blob/54b52c9b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
index faa2215..c47fab5 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
@@ -32,13 +32,11 @@ class OperationInfo():
             self.types = None
             self.operator = None
             self.name = None
-            self.combine = False
             self.delimiter_line = "\n"
             self.delimiter_field = ","
             self.write_mode = WriteMode.NO_OVERWRITE
             self.sinks = []
             self.children = []
-            self.combineop = None
             self.path = None
             self.values = []
             self.projections = []

http://git-wip-us.apache.org/repos/asf/flink/blob/54b52c9b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
index 16e1a8c..c9bc404 100644
--- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
@@ -177,12 +177,6 @@ if __name__ == "__main__":
     d1 \
         .reduce(Reduce()) \
         .map_partition(Verify([19], "AllReduce"), STRING).output()
-    d4 \
-        .group_by(2).reduce(Reduce2()) \
-        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "CombineReduce"),
STRING).output()
-    d4 \
-        .map(Id(), (INT, FLOAT, STRING, BOOL)).group_by(2).reduce(Reduce2()) \
-        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "ChainedReduce"),
STRING).output()
 
     #GroupReduce
     class GroupReduce(GroupReduceFunction):
@@ -199,32 +193,9 @@ if __name__ == "__main__":
         def reduce(self, iterator, collector):
             for value in iterator:
                 collector.collect(value)
-
-    class GroupReduce3(GroupReduceFunction):
-        def reduce(self, iterator, collector):
-            collector.collect(iterator.next())
-
-        def combine(self, iterator, collector):
-            if iterator.has_next():
-                v1 = iterator.next()
-            if iterator.has_next():
-                v2 = iterator.next()
-            if v1[0] < v2[0]:
-                collector.collect(v1)
-            else:
-                collector.collect(v2)
     d4 \
         .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=False)
\
         .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "AllGroupReduce"),
STRING).output()
-    d4 \
-        .map(Id(), (INT, FLOAT, STRING, BOOL)).group_by(2).reduce_group(GroupReduce(), (INT,
FLOAT, STRING, BOOL), combinable=True) \
-        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "ChainedGroupReduce"),
STRING).output()
-    d4 \
-        .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=True)
\
-        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "CombineGroupReduce"),
STRING).output()
-    d5 \
-        .group_by(2).sort_group(0, Order.DESCENDING).sort_group(1, Order.ASCENDING).reduce_group(GroupReduce3(),
(FLOAT, FLOAT, INT), combinable=True) \
-        .map_partition(Verify([(4.3, 4.4, 1), (4.1, 4.1, 3)], "ChainedSortedGroupReduce"),
STRING).output()
 
     #Execution
     env.set_parallelism(1)


Mime
View raw message