flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject flink git commit: [FLINK-1490][fix][java-api] Fix incorrect local output sorting of nested types with field position keys.
Date Fri, 06 Feb 2015 15:01:43 GMT
Repository: flink
Updated Branches:
  refs/heads/master a0e71b88a -> 6187292bc


[FLINK-1490][fix][java-api] Fix incorrect local output sorting of nested types with field
position keys.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6187292b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6187292b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6187292b

Branch: refs/heads/master
Commit: 6187292bca47e7a42fe73a3f8fead50d522be060
Parents: a0e71b8
Author: Fabian Hueske <fhueske@apache.org>
Authored: Fri Feb 6 15:11:25 2015 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Fri Feb 6 16:00:50 2015 +0100

----------------------------------------------------------------------
 .../flink/api/java/operators/DataSink.java      | 25 ++++++++---
 .../test/javaApiOperators/DataSinkITCase.java   | 44 ++++++++++++++++----
 .../util/CollectionDataSets.java                | 21 ++++++++++
 3 files changed, 76 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6187292b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index 9da5433..e646891 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -115,18 +115,33 @@ public class DataSink<T> {
 			throw new InvalidProgramException("Order key out of tuple bounds.");
 		}
 
+		// get flat keys
+		Keys.ExpressionKeys<T> ek;
+		try {
+			ek = new Keys.ExpressionKeys<T>(new int[]{field}, this.type);
+		} catch(IllegalArgumentException iae) {
+			throw new InvalidProgramException("Invalid specification of field expression.", iae);
+		}
+		int[] flatKeys = ek.computeLogicalKeyPositions();
+
 		if(this.sortKeyPositions == null) {
 			// set sorting info
-			this.sortKeyPositions = new int[] {field};
-			this.sortOrders = new Order[] {order};
+			this.sortKeyPositions = flatKeys;
+			this.sortOrders = new Order[flatKeys.length];
+			Arrays.fill(this.sortOrders, order);
 		} else {
 			// append sorting info to exising info
-			int newLength = this.sortKeyPositions.length + 1;
+			int oldLength = this.sortKeyPositions.length;
+			int newLength = oldLength + flatKeys.length;
 			this.sortKeyPositions = Arrays.copyOf(this.sortKeyPositions, newLength);
 			this.sortOrders = Arrays.copyOf(this.sortOrders, newLength);
-			this.sortKeyPositions[newLength-1] = field;
-			this.sortOrders[newLength-1] = order;
+
+			for(int i=0; i<flatKeys.length; i++) {
+				this.sortKeyPositions[oldLength+i] = flatKeys[i];
+				this.sortOrders[oldLength+i] = order;
+			}
 		}
+
 		return this;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6187292b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
index d8663e8..6bd678f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
@@ -208,8 +208,8 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds =
-				CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
+		DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> ds =
+				CollectionDataSets.getGroupSortedNestedTupleDataSet2(env);
 		ds.writeAsText(resultPath)
 				.sortLocalOutput("f0.f1", Order.ASCENDING)
 				.sortLocalOutput("f1", Order.DESCENDING)
@@ -218,13 +218,39 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
 		env.execute();
 
 		expected =
-				"((2,1),a)\n" +
-				"((2,2),b)\n" +
-				"((1,2),a)\n" +
-				"((3,3),c)\n" +
-				"((1,3),a)\n" +
-				"((3,6),c)\n" +
-				"((4,9),c)\n";
+				"((2,1),a,3)\n" +
+				"((2,2),b,4)\n" +
+				"((1,2),a,1)\n" +
+				"((3,3),c,5)\n" +
+				"((1,3),a,2)\n" +
+				"((3,6),c,6)\n" +
+				"((4,9),c,7)\n";
+
+		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
+	}
+
+	@Test
+	public void testTupleSortingNestedDOP1_2() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> ds =
+				CollectionDataSets.getGroupSortedNestedTupleDataSet2(env);
+		ds.writeAsText(resultPath)
+				.sortLocalOutput(1, Order.ASCENDING)
+				.sortLocalOutput(2, Order.DESCENDING)
+				.setParallelism(1);
+
+		env.execute();
+
+		expected =
+				"((2,1),a,3)\n" +
+				"((1,3),a,2)\n" +
+				"((1,2),a,1)\n" +
+				"((2,2),b,4)\n" +
+				"((4,9),c,7)\n" +
+				"((3,6),c,6)\n" +
+				"((3,3),c,5)\n";
 
 		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6187292b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index 632e7c0..ef6b8a9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -184,6 +184,27 @@ public class CollectionDataSets {
 		return env.fromCollection(data, type);
 	}
 
+	public static DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>>
getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env) {
+
+		List<Tuple3<Tuple2<Integer, Integer>, String, Integer>> data = new ArrayList<Tuple3<Tuple2<Integer,
Integer>, String, Integer>>();
+		data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer,
Integer>(1, 3), "a", 2));
+		data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer,
Integer>(1, 2), "a", 1));
+		data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer,
Integer>(2, 1), "a", 3));
+		data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer,
Integer>(2, 2), "b", 4));
+		data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer,
Integer>(3, 3), "c", 5));
+		data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer,
Integer>(3, 6), "c", 6));
+		data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer,
Integer>(4, 9), "c", 7));
+
+		TupleTypeInfo<Tuple3<Tuple2<Integer, Integer>, String, Integer>> type
= new
+				TupleTypeInfo<Tuple3<Tuple2<Integer, Integer>, String, Integer>>(
+				new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO),
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.INT_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+
 	public static DataSet<String> getStringDataSet(ExecutionEnvironment env) {
 
 		List<String> data = new ArrayList<String>();


Mime
View raw message