flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [1/4] flink git commit: [FLINK-4804] [py] Fix first() failing when applied to groupings
Date Fri, 21 Oct 2016 09:03:45 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6f0faf9bb -> 5c83e787c


[FLINK-4804] [py] Fix first() failing when applied to groupings


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c83e787
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c83e787
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c83e787

Branch: refs/heads/master
Commit: 5c83e787c7a4edafa3db34e5a58548728cc27b6c
Parents: 41d5167
Author: zentol <chesnay@apache.org>
Authored: Wed Oct 12 12:19:04 2016 +0200
Committer: zentol <chesnay@apache.org>
Committed: Fri Oct 21 11:03:03 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/python/api/PythonPlanBinder.java  | 17 +++++++++++++++--
 .../org/apache/flink/python/api/test_main2.py      |  4 ++++
 2 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c83e787/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index d55b9d4..cc63ef4 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -459,9 +459,22 @@ public class PythonPlanBinder {
 				.map(new KeyDiscarder()).setParallelism(getParallelism(info)).name("DistinctPostStep"));
 	}
 
+	@SuppressWarnings("unchecked")
 	private void createFirstOperation(PythonOperationInfo info) throws IOException {
-		DataSet op = (DataSet) sets.get(info.parentID);
-		sets.put(info.setID, op.first(info.count).setParallelism(getParallelism(info)).name("First"));
+		Object op = sets.get(info.parentID);
+		if (op instanceof DataSet) {
+			sets.put(info.setID, ((DataSet) op).first(info.count).setParallelism(getParallelism(info)).name("First"));
+			return;
+		}
+		if (op instanceof UnsortedGrouping) {
+			sets.put(info.setID, ((UnsortedGrouping) op).first(info.count).setParallelism(getParallelism(info)).name("First")
+				.map(new KeyDiscarder()).setParallelism(getParallelism(info)).name("FirstPostStep"));
+			return;
+		}
+		if (op instanceof SortedGrouping) {
+			sets.put(info.setID, ((SortedGrouping) op).first(info.count).setParallelism(getParallelism(info)).name("First")
+				.map(new KeyDiscarder()).setParallelism(getParallelism(info)).name("FirstPostStep"));
+		}
 	}
 
 	private void createGroupOperation(PythonOperationInfo info) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/5c83e787/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
index f1d40e1..25b9d29 100644
--- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
@@ -126,6 +126,10 @@ if __name__ == "__main__":
     d1 \
         .first(1) \
         .map_partition(Verify([1], "First")).output()
+    d4 \
+        .group_by(0) \
+        .first(1) \
+        .map_partition(Verify([(1, 0.5, "hello", True), (2, 0.4, "world", False)], "Grouped
First")).output()
     d1 \
         .rebalance()
     d6 \


Mime
View raw message