flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [1/2] flink git commit: [FLINK-4411] [py] Properly propagate chained dual input children
Date Wed, 17 Aug 2016 11:52:13 GMT
Repository: flink
Updated Branches:
  refs/heads/master b19648eb4 -> 84d28ba00


[FLINK-4411] [py] Properly propagate chained dual input children


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10508477
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10508477
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10508477

Branch: refs/heads/master
Commit: 1050847787b416399a6c03c0568969df93ed4822
Parents: b19648e
Author: zentol <chesnay@apache.org>
Authored: Wed Aug 17 12:15:37 2016 +0200
Committer: zentol <chesnay@apache.org>
Committed: Wed Aug 17 13:51:55 2016 +0200

----------------------------------------------------------------------
 .../flink/python/api/flink/plan/Environment.py       | 15 ++++++++++++---
 .../python/org/apache/flink/python/api/test_main2.py |  4 ++--
 2 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/10508477/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index 9d08baf..a54dac8 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -219,15 +219,21 @@ class Environment(object):
         dual_input = set([_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT, _Identifier.CROSS,
_Identifier.CROSSH, _Identifier.CROSST, _Identifier.COGROUP, _Identifier.UNION])
         x = len(self._sets) - 1
         while x > -1:
+            # CHAIN(parent -> child) -> grand_child
+            # for all intents and purposes the child set ceases to exist; it is merged into
the parent
             child = self._sets[x]
             child_type = child.identifier
             if child_type in chainable:
                 parent = child.parent
+                # we can only chain to an actual python udf (=> operator is not None)
+                # we may only chain if the parent has only 1 child
                 if parent.operator is not None and len(parent.children) == 1 and len(parent.sinks)
== 0:
                     parent.chained_info = child
                     parent.name += " -> " + child.name
                     parent.types = child.types
+                    # grand_children now belong to the parent
                     for grand_child in child.children:
+                        # dual_input operations have 2 parents; hence we have to change the
correct one
                         if grand_child.identifier in dual_input:
                             if grand_child.parent.id == child.id:
                                 grand_child.parent = parent
@@ -235,15 +241,18 @@ class Environment(object):
                                 grand_child.other = parent
                         else:
                             grand_child.parent = parent
-                            parent.children.append(grand_child)
-                    parent.children.remove(child)
+                        parent.children.append(grand_child)
+                    # child sinks now belong to the parent
                     for sink in child.sinks:
                         sink.parent = parent
                         parent.sinks.append(sink)
+                    # child broadcast variables now belong to the parent
                     for bcvar in child.bcvars:
                         bcvar.parent = parent
                         parent.bcvars.append(bcvar)
-                    self._remove_set((child))
+                    # remove child set as it has been merged into the parent
+                    parent.children.remove(child)
+                    self._remove_set(child)
             x -= 1
 
     def _remove_set(self, set):

http://git-wip-us.apache.org/repos/asf/flink/blob/10508477/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
index ceb26d0..f1d40e1 100644
--- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
@@ -22,14 +22,14 @@ from flink.functions.CrossFunction import CrossFunction
 from flink.functions.JoinFunction import JoinFunction
 from flink.functions.CoGroupFunction import CoGroupFunction
 from flink.functions.Aggregation import Max, Min, Sum
-from utils import Verify, Verify2
+from utils import Verify, Verify2, Id
 
 if __name__ == "__main__":
     env = get_environment()
 
     d1 = env.from_elements(1, 6, 12)
 
-    d2 = env.from_elements((1, 0.5, "hello", True), (2, 0.4, "world", False))
+    d2 = env.from_elements((1, 0.5, "hello", True), (2, 0.4, "world", False)).map(Id()).map(Id())
 # force map chaining
 
     d3 = env.from_elements(("hello",), ("world",))
 


Mime
View raw message