flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [5/5] flink git commit: [FLINK-1664] Adds check if a selected sort key is sortable
Date Fri, 03 Apr 2015 19:33:14 GMT
[FLINK-1664] Adds check if a selected sort key is sortable

This closes #541


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f36eb54e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f36eb54e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f36eb54e

Branch: refs/heads/master
Commit: f36eb54ee6d8cc130439def98559b6b0a70b6c7b
Parents: f39aec8
Author: Fabian Hueske <fhueske@apache.org>
Authored: Fri Mar 27 21:37:59 2015 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Fri Apr 3 20:42:05 2015 +0200

----------------------------------------------------------------------
 .../api/common/typeinfo/TypeInformation.java    |  10 +-
 .../api/common/typeutils/CompositeType.java     |  20 ++
 .../flink/api/java/SortPartitionOperator.java   |  30 +-
 .../flink/api/java/operators/DataSink.java      |  28 ++
 .../apache/flink/api/java/operators/Keys.java   |  14 +-
 .../api/java/operators/SortedGrouping.java      |  71 +++--
 .../flink/api/java/typeutils/PojoTypeInfo.java  |   7 +-
 .../api/java/typeutils/TupleTypeInfoBase.java   |  19 --
 .../flink/api/java/operator/DataSinkTest.java   |  46 +--
 .../flink/api/java/operator/GroupingTest.java   | 278 +++++++++++++++++--
 .../api/java/operator/SortPartitionTest.java    | 204 ++++++++++++++
 11 files changed, 637 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
index 4fa02e3..bb50e32 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
@@ -132,7 +132,15 @@ public abstract class TypeInformation<T> implements Serializable
{
 	 * @return True, if the type can be used as a key, false otherwise.
 	 */
 	public abstract boolean isKeyType();
-	
+
+	/**
+	 * Checks whether this type can be used as a key for sorting.
+	 * The order produced by sorting this type must be meaningful.
+	 */
+	public boolean isSortKeyType() {
+		return isKeyType();
+	}
+
 	/**
 	 * Creates a serializer for the type. The serializer may use the ExecutionConfig
 	 * for parameterization.

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
index 54a1e13..de39ec8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
@@ -169,6 +169,26 @@ public abstract class CompositeType<T> extends TypeInformation<T>
{
 		return getFieldIndex(fieldName) >= 0;
 	}
 
+	@Override
+	public boolean isKeyType() {
+		for(int i=0;i<this.getArity();i++) {
+			if (!this.getTypeAt(i).isKeyType()) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	@Override
+	public boolean isSortKeyType() {
+		for(int i=0;i<this.getArity();i++) {
+			if (!this.getTypeAt(i).isSortKeyType()) {
+				return false;
+			}
+		}
+		return true;
+	}
+
 	/**
 	 * Returns the names of the composite fields of this type. The order of the returned array
must
 	 * be consistent with the internal field index ordering.

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
b/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
index c8f8bbc..988144b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/SortPartitionOperator.java
@@ -24,9 +24,11 @@ import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.operators.SingleInputOperator;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 
 import java.util.Arrays;
 
@@ -96,11 +98,23 @@ public class SortPartitionOperator<T> extends SingleInputOperator<T,
T, SortPart
 
 	private int[] getFlatFields(int field) {
 
+		if(!(super.getType() instanceof TupleTypeInfoBase<?>)) {
+			throw new InvalidProgramException("Field positions can only be specified on Tuple or "
+
+					"Case Class types.");
+		}
+		else {
+			// check selected field is sortable type
+			TypeInformation<?> sortKeyType = ((TupleTypeInfoBase<?>) super.getType()).getTypeAt(field);
+			if (!sortKeyType.isSortKeyType()) {
+				throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType);
+			}
+		}
+
 		Keys.ExpressionKeys<T> ek;
 		try {
 			ek = new Keys.ExpressionKeys<T>(new int[]{field}, super.getType());
 		} catch(IllegalArgumentException iae) {
-			throw new InvalidProgramException("Invalid specification of field expression.", iae);
+			throw new InvalidProgramException("Invalid specification of field position.", iae);
 		}
 		return ek.computeLogicalKeyPositions();
 	}
@@ -108,6 +122,13 @@ public class SortPartitionOperator<T> extends SingleInputOperator<T,
T, SortPart
 	private int[] getFlatFields(String fields) {
 
 		if(super.getType() instanceof CompositeType) {
+
+			// check selected field is sortable type
+			TypeInformation<?> sortKeyType = ((CompositeType<?>) super.getType()).getTypeAt(fields);
+			if (!sortKeyType.isSortKeyType()) {
+				throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType);
+			}
+
 			// compute flat field positions for (nested) sorting fields
 			Keys.ExpressionKeys<T> ek;
 			try {
@@ -123,6 +144,12 @@ public class SortPartitionOperator<T> extends SingleInputOperator<T,
T, SortPart
 				throw new InvalidProgramException("Output sorting of non-composite types can only be
defined on the full type. " +
 						"Use a field wildcard for that (\"*\" or \"_\")");
 			} else {
+
+				// check if selected field is sortable type
+				if (!super.getType().isSortKeyType()) {
+					throw new InvalidProgramException("Selected sort key cannot be sorted: " + super.getType());
+				}
+
 				return new int[]{0};
 			}
 		}
@@ -149,7 +176,6 @@ public class SortPartitionOperator<T> extends SingleInputOperator<T,
T, SortPart
 		}
 	}
 
-
 	// --------------------------------------------------------------------------------------------
 	//  Translation
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index 83ec021..5b5b031 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.Nothing;
 import org.apache.flink.api.java.DataSet;
@@ -114,6 +115,7 @@ public class DataSink<T> {
 		if (field >= this.type.getArity()) {
 			throw new InvalidProgramException("Order key out of tuple bounds.");
 		}
+		isValidSortKeyType(field);
 
 		// get flat keys
 		Keys.ExpressionKeys<T> ek;
@@ -166,9 +168,11 @@ public class DataSink<T> {
 		Order[] orders;
 
 		if(this.type instanceof CompositeType) {
+
 			// compute flat field positions for (nested) sorting fields
 			Keys.ExpressionKeys<T> ek;
 			try {
+				isValidSortKeyType(fieldExpression);
 				ek = new Keys.ExpressionKeys<T>(new String[]{fieldExpression}, this.type);
 			} catch(IllegalArgumentException iae) {
 				throw new InvalidProgramException("Invalid specification of field expression.", iae);
@@ -183,6 +187,8 @@ public class DataSink<T> {
 				throw new InvalidProgramException("Output sorting of non-composite types can only be
defined on the full type. " +
 						"Use a field wildcard for that (\"*\" or \"_\")");
 			} else {
+				isValidSortKeyType(fieldExpression);
+
 				numFields = 1;
 				fields = new int[]{0};
 				orders = new Order[]{order};
@@ -208,6 +214,28 @@ public class DataSink<T> {
 		return this;
 	}
 
+	private void isValidSortKeyType(int field) {
+		TypeInformation<?> sortKeyType = ((TupleTypeInfoBase<?>) this.type).getTypeAt(field);
+		if (!sortKeyType.isSortKeyType()) {
+			throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType);
+		}
+	}
+
+	private void isValidSortKeyType(String field) {
+		TypeInformation<?> sortKeyType;
+
+		field = field.trim();
+		if(field.equals("*") || field.equals("_")) {
+			sortKeyType = this.type;
+		} else {
+			sortKeyType = ((CompositeType<?>) this.type).getTypeAt(field);
+		}
+
+		if (!sortKeyType.isSortKeyType()) {
+			throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType);
+		}
+	}
+
 	/**
 	 * @return Configuration for the OutputFormat.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 2c067fd..a2cde07 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -82,19 +82,19 @@ public abstract class Keys<T> {
 
 			this.keyExtractor = keyExtractor;
 			this.keyType = keyType;
-			
+
+			if(!keyType.isKeyType()) {
+				throw new InvalidProgramException("Return type "+keyType+" of KeySelector "+keyExtractor.getClass()+"
is not a valid key type");
+			}
+
 			// we have to handle a special case here:
-			// if the keyType is a tuple type, we need to select the full tuple with all its fields.
-			if(keyType.isTupleType()) {
+			// if the keyType is a composite type, we need to select the full type with all its fields.
+			if(keyType instanceof CompositeType) {
 				ExpressionKeys<K> ek = new ExpressionKeys<K>(new String[] {ExpressionKeys.SELECT_ALL_CHAR},
keyType);
 				logicalKeyFields = ek.computeLogicalKeyPositions();
 			} else {
 				logicalKeyFields = new int[] {0};
 			}
-
-			if (!this.keyType.isKeyType()) {
-				throw new IllegalArgumentException("Invalid type of KeySelector keys");
-			}
 		}
 
 		public TypeInformation<K> getKeyType() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 287bf82..4c6c952 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import com.google.common.base.Preconditions;
@@ -63,6 +64,8 @@ public class SortedGrouping<T> extends Grouping<T> {
 		if (field >= dataSet.getType().getArity()) {
 			throw new IllegalArgumentException("Order key out of tuple bounds.");
 		}
+		isValidSortKeyType(field);
+
 		// use int-based expression key to properly resolve nested tuples for grouping
 		ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, dataSet.getType());
 		this.groupSortKeyPositions = ek.computeLogicalKeyPositions();
@@ -79,6 +82,8 @@ public class SortedGrouping<T> extends Grouping<T> {
 		if (!(dataSet.getType() instanceof CompositeType)) {
 			throw new InvalidProgramException("Specifying order keys via field positions is only valid
for composite data types (pojo / tuple / case class)");
 		}
+		isValidSortKeyType(field);
+
 		// resolve String-field to int using the expression keys
 		ExpressionKeys<T> ek = new ExpressionKeys<T>(new String[]{field}, dataSet.getType());
 		this.groupSortKeyPositions = ek.computeLogicalKeyPositions();
@@ -95,6 +100,10 @@ public class SortedGrouping<T> extends Grouping<T> {
 		if (!(this.keys instanceof Keys.SelectorFunctionKeys)) {
 			throw new InvalidProgramException("Sorting on KeySelector keys only works with KeySelector
grouping.");
 		}
+		TypeInformation<?> sortKeyType = keySelector.getKeyType();
+		if(!sortKeyType.isSortKeyType()) {
+			throw new InvalidProgramException("Key type " + sortKeyType +" is not sortable.");
+		}
 
 		this.groupSortKeyPositions = keySelector.computeLogicalKeyPositions();
 		for (int i = 0; i < groupSortKeyPositions.length; i++) {
@@ -218,35 +227,22 @@ public class SortedGrouping<T> extends Grouping<T> {
 		if (field >= dataSet.getType().getArity()) {
 			throw new IllegalArgumentException("Order key out of tuple bounds.");
 		}
+		isValidSortKeyType(field);
+
 		ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, dataSet.getType());
 		addSortGroupInternal(ek, order);
 		return this;
 	}
-	
-	private void addSortGroupInternal(ExpressionKeys<T> ek, Order order) {
-		Preconditions.checkArgument(order != null, "Order can not be null");
-		int[] additionalKeyPositions = ek.computeLogicalKeyPositions();
-		
-		int newLength = this.groupSortKeyPositions.length + additionalKeyPositions.length;
-		this.groupSortKeyPositions = Arrays.copyOf(this.groupSortKeyPositions, newLength);
-		this.groupSortOrders = Arrays.copyOf(this.groupSortOrders, newLength);
-		int pos = newLength - additionalKeyPositions.length;
-		int off = newLength - additionalKeyPositions.length;
-		for(;pos < newLength; pos++) {
-			this.groupSortKeyPositions[pos] = additionalKeyPositions[pos - off];
-			this.groupSortOrders[pos] = order; // use the same order
-		}
-	}
-	
+
 	/**
 	 * Sorts {@link org.apache.flink.api.java.tuple.Tuple} or POJO elements within a group on
the specified field in the specified {@link Order}.</br>
 	 * <b>Note: Only groups of Tuple or Pojo elements can be sorted.</b><br/>
 	 * Groups can be sorted by multiple fields by chaining {@link #sortGroup(String, Order)}
calls.
-	 * 
+	 *
 	 * @param field The Tuple or Pojo field on which the group is sorted.
 	 * @param order The Order in which the specified field is sorted.
 	 * @return A SortedGrouping with specified order of group element.
-	 * 
+	 *
 	 * @see org.apache.flink.api.java.tuple.Tuple
 	 * @see Order
 	 */
@@ -257,9 +253,48 @@ public class SortedGrouping<T> extends Grouping<T> {
 		if (! (dataSet.getType() instanceof CompositeType)) {
 			throw new InvalidProgramException("Specifying order keys via field positions is only valid
for composite data types (pojo / tuple / case class)");
 		}
+		isValidSortKeyType(field);
+
 		ExpressionKeys<T> ek = new ExpressionKeys<T>(new String[]{field}, dataSet.getType());
 		addSortGroupInternal(ek, order);
 		return this;
 	}
+	
+	private void addSortGroupInternal(ExpressionKeys<T> ek, Order order) {
+		Preconditions.checkArgument(order != null, "Order can not be null");
+		int[] additionalKeyPositions = ek.computeLogicalKeyPositions();
+		
+		int newLength = this.groupSortKeyPositions.length + additionalKeyPositions.length;
+		this.groupSortKeyPositions = Arrays.copyOf(this.groupSortKeyPositions, newLength);
+		this.groupSortOrders = Arrays.copyOf(this.groupSortOrders, newLength);
+		int pos = newLength - additionalKeyPositions.length;
+		int off = newLength - additionalKeyPositions.length;
+		for(;pos < newLength; pos++) {
+			this.groupSortKeyPositions[pos] = additionalKeyPositions[pos - off];
+			this.groupSortOrders[pos] = order; // use the same order
+		}
+	}
+
+	private void isValidSortKeyType(int field) {
+		TypeInformation<?> sortKeyType = ((TupleTypeInfoBase<?>) dataSet.getType()).getTypeAt(field);
+		if (!sortKeyType.isSortKeyType()) {
+			throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType);
+		}
+	}
+
+	private void isValidSortKeyType(String field) {
+		TypeInformation<?> sortKeyType;
+
+		field = field.trim();
+		if(field.equals("*") || field.equals("_")) {
+			sortKeyType = this.getDataSet().getType();
+		} else {
+			sortKeyType = ((CompositeType<?>) this.getDataSet().getType()).getTypeAt(field);
+		}
+
+		if (!sortKeyType.isSortKeyType()) {
+			throw new InvalidProgramException("Selected sort key is not a sortable type " + sortKeyType);
+		}
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 1dcee24..2f3db7c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -118,8 +118,11 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 	}
 
 	@Override
-	public boolean isKeyType() {
-		return Comparable.class.isAssignableFrom(typeClass);
+	public boolean isSortKeyType() {
+		// Support for sorting POJOs that implement Comparable is not implemented yet.
+		// Since the order of fields in a POJO type is not well defined, sorting on fields
+		//   gives only some undefined order.
+		return false;
 	}
 	
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index d1c2c9d..5051449 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -223,11 +223,6 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T>
{
 	}
 	
 	@Override
-	public boolean isKeyType() {
-		return isValidKeyType(this);
-	}
-
-	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof TupleTypeInfoBase) {
 			@SuppressWarnings("unchecked")
@@ -245,20 +240,6 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T>
{
 		return this.types.hashCode() ^ Arrays.deepHashCode(this.types);
 	}
 
-	private boolean isValidKeyType(TypeInformation<?> typeInfo) {
-		if(typeInfo instanceof TupleTypeInfoBase) {
-			TupleTypeInfoBase<?> tupleType = ((TupleTypeInfoBase<?>)typeInfo);
-			for(int i=0;i<tupleType.getArity();i++) {
-				if (!isValidKeyType(tupleType.getTypeAt(i))) {
-					return false;
-				}
-			}
-			return true;
-		} else  {
-			return typeInfo.isKeyType();
-		}
-	}
-
 	@Override
 	public String toString() {
 		StringBuilder bld = new StringBuilder("Tuple");

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
index 7a7ed14..37ad381 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
@@ -256,23 +256,6 @@ public class DataSinkTest {
 	}
 
 	@Test
-	public void testPojoSingleOrderFull() {
-
-		final ExecutionEnvironment env = ExecutionEnvironment
-				.getExecutionEnvironment();
-		DataSet<CustomType> pojoDs = env
-				.fromCollection(pojoData);
-
-		// should work
-		try {
-			pojoDs.writeAsText("/tmp/willNotHappen")
-					.sortLocalOutput("*", Order.ASCENDING);
-		} catch (Exception e) {
-			Assert.fail();
-		}
-	}
-
-	@Test
 	public void testPojoTwoOrder() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment
@@ -317,6 +300,35 @@ public class DataSinkTest {
 				.sortLocalOutput("notThere", Order.DESCENDING);
 	}
 
+	@Test(expected = InvalidProgramException.class)
+	public void testPojoSingleOrderFull() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<CustomType> pojoDs = env
+				.fromCollection(pojoData);
+
+		// must not work
+		pojoDs.writeAsText("/tmp/willNotHappen")
+				.sortLocalOutput("*", Order.ASCENDING);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testArrayOrderFull() {
+
+		List<Object[]> arrayData = new ArrayList<Object[]>();
+		arrayData.add(new Object[0]);
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<Object[]> pojoDs = env
+				.fromCollection(arrayData);
+
+		// must not work
+		pojoDs.writeAsText("/tmp/willNotHappen")
+				.sortLocalOutput("*", Order.ASCENDING);
+	}
+
 	/**
 	 * Custom data type, for testing purposes.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index c958680..314695f 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -24,13 +24,16 @@ import java.util.List;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -48,11 +51,23 @@ public class GroupingTest {
 					BasicTypeInfo.LONG_TYPE_INFO,
 					BasicTypeInfo.INT_TYPE_INFO
 			);
-	
+
+	private final TupleTypeInfo<Tuple4<Integer, Long, CustomType, Long[]>> tupleWithCustomInfo
= new
+			TupleTypeInfo<Tuple4<Integer, Long, CustomType, Long[]>>(
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO,
+				TypeExtractor.createTypeInfo(CustomType.class),
+				BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO
+			);
+
 	// LONG DATA
 	private final List<Long> emptyLongData = new ArrayList<Long>();
 	
 	private final List<CustomType> customTypeData = new ArrayList<CustomType>();
+
+	private final List<Tuple4<Integer, Long, CustomType, Long[]>> tupleWithCustomData
=
+			new ArrayList<Tuple4<Integer, Long, CustomType, Long[]>>();
+
 	
 	@Test  
 	public void testGroupByKeyFields1() {
@@ -187,7 +202,6 @@ public class GroupingTest {
 		// should not work, key out of tuple bounds
 		ds.groupBy("nested.myNonExistent");
 	}
-
 	
 	@Test
 	@SuppressWarnings("serial")
@@ -233,41 +247,67 @@ public class GroupingTest {
 			Assert.fail();
 		}
 	}
-	
-	@Test(expected=IllegalArgumentException.class)
+
+	@Test
 	@SuppressWarnings("serial")
 	public void testGroupByKeySelector3() {
 		
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		this.customTypeData.add(new CustomType());
-		
-		DataSet<CustomType> customDs = env.fromCollection(customTypeData);
-		// should not work
-		customDs.groupBy(
-				new KeySelector<GroupingTest.CustomType, CustomType>() {
-					@Override
-					public CustomType getKey(CustomType value) {
-						return value;
-				}
-		});
+
+		try {
+			DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+			// should not work
+			customDs.groupBy(
+					new KeySelector<GroupingTest.CustomType, CustomType>() {
+						@Override
+						public CustomType getKey(CustomType value) {
+							return value;
+						}
+					});
+		} catch(Exception e) {
+			Assert.fail();
+		}
 	}
-	
-	@Test(expected=IllegalArgumentException.class)
+
+	@Test
 	@SuppressWarnings("serial")
 	public void testGroupByKeySelector4() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		this.customTypeData.add(new CustomType());
-		
+
+		try {
+			DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+			// should not work
+			customDs.groupBy(
+					new KeySelector<GroupingTest.CustomType, Tuple2<Integer, GroupingTest.CustomType>>()
{
+						@Override
+						public Tuple2<Integer, CustomType> getKey(CustomType value) {
+							return new Tuple2<Integer, CustomType>(value.myInt, value);
+						}
+					});
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	@SuppressWarnings("serial")
+	public void testGroupByKeySelector5() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		this.customTypeData.add(new CustomType());
+
 		DataSet<CustomType> customDs = env.fromCollection(customTypeData);
 		// should not work
 		customDs.groupBy(
-				new KeySelector<GroupingTest.CustomType, Tuple2<Integer, GroupingTest.CustomType>>()
{
+				new KeySelector<GroupingTest.CustomType, CustomType2>() {
 					@Override
-					public Tuple2<Integer, CustomType> getKey(CustomType value) {
-						return new Tuple2<Integer, CustomType>(value.myInt, value);
-				}
-		});
+					public CustomType2 getKey(CustomType value) {
+						return new CustomType2();
+					}
+				});
 	}
 	
 	@Test
@@ -313,6 +353,30 @@ public class GroupingTest {
 		}).sortGroup(0, Order.ASCENDING);
 		
 	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testGroupSortKeyFields4() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should not work
+		tupleDs.groupBy(0)
+				.sortGroup(2, Order.ASCENDING);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testGroupSortKeyFields5() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should not work
+		tupleDs.groupBy(0)
+				.sortGroup(3, Order.ASCENDING);
+	}
 	
 	@Test
 	public void testChainedGroupSortKeyFields() {
@@ -327,7 +391,166 @@ public class GroupingTest {
 			Assert.fail();
 		}
 	}
-	
+
+	@Test
+	public void testGroupSortByKeyExpression1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should work
+		try {
+			tupleDs.groupBy("f0").sortGroup("f1", Order.ASCENDING);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test
+	public void testGroupSortByKeyExpression2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should work
+		try {
+			tupleDs.groupBy("f0").sortGroup("f2.myString", Order.ASCENDING);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test
+	public void testGroupSortByKeyExpression3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should work
+		try {
+			tupleDs.groupBy("f0")
+					.sortGroup("f2.myString", Order.ASCENDING)
+					.sortGroup("f1", Order.DESCENDING);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testGroupSortByKeyExpression4() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should not work
+		tupleDs.groupBy("f0")
+				.sortGroup("f2", Order.ASCENDING);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testGroupSortByKeyExpression5() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should not work
+		tupleDs.groupBy("f0")
+				.sortGroup("f1", Order.ASCENDING)
+				.sortGroup("f2", Order.ASCENDING);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testGroupSortByKeyExpression6() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should not work
+		tupleDs.groupBy("f0")
+				.sortGroup("f3", Order.ASCENDING);
+	}
+
+	@SuppressWarnings("serial")
+	@Test
+	public void testGroupSortByKeySelector1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should not work
+		tupleDs.groupBy(
+				new KeySelector<Tuple4<Integer,Long,CustomType,Long[]>, Long>() {
+					@Override
+					public Long getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception
{
+						return value.f1;
+					}
+				})
+				.sortGroup(
+						new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, Integer>() {
+							@Override
+							public Integer getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws
Exception {
+								return value.f0;
+							}
+						}, Order.ASCENDING);
+	}
+
+	@SuppressWarnings("serial")
+	@Test(expected = InvalidProgramException.class)
+	public void testGroupSortByKeySelector2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should not work
+		tupleDs.groupBy(
+				new KeySelector<Tuple4<Integer,Long,CustomType,Long[]>, Long>() {
+					@Override
+					public Long getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception
{
+						return value.f1;
+					}
+				})
+				.sortGroup(
+						new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, CustomType>()
{
+							@Override
+							public CustomType getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws
Exception {
+								return value.f2;
+							}
+						}, Order.ASCENDING);
+	}
+
+	@SuppressWarnings("serial")
+	@Test(expected = InvalidProgramException.class)
+	public void testGroupSortByKeySelector3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
+				env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
+
+		// should not work
+		tupleDs.groupBy(
+				new KeySelector<Tuple4<Integer,Long,CustomType,Long[]>, Long>() {
+					@Override
+					public Long getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception
{
+						return value.f1;
+					}
+				})
+				.sortGroup(
+						new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, Long[]>() {
+							@Override
+							public Long[] getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws
Exception {
+								return value.f3;
+							}
+						}, Order.ASCENDING);
+	}
+
 
 	public static class CustomType implements Serializable {
 		
@@ -354,4 +577,11 @@ public class GroupingTest {
 			return myInt+","+myLong+","+myString;
 		}
 	}
+
+	public static class CustomType2 implements Serializable {
+
+		public int myInt;
+		public int[] myIntArray;
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f36eb54e/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
new file mode 100644
index 0000000..a4e2bbc
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SortPartitionTest {
+
+	// TUPLE DATA
+	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData
=
+			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo
= new
+			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+					BasicTypeInfo.INT_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.STRING_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO
+			);
+
+	private final TupleTypeInfo<Tuple4<Integer, Long, CustomType, Long[]>> tupleWithCustomInfo
= new
+			TupleTypeInfo<Tuple4<Integer, Long, CustomType, Long[]>>(
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO,
+				TypeExtractor.createTypeInfo(CustomType.class),
+				BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO
+			);
+
+	// LONG DATA
+	private final List<Long> emptyLongData = new ArrayList<Long>();
+
+	private final List<CustomType> customTypeData = new ArrayList<CustomType>();
+
+	private final List<Tuple4<Integer, Long, CustomType, Long[]>> tupleWithCustomData
=
+			new ArrayList<Tuple4<Integer, Long, CustomType, Long[]>>();
+
+
+	@Test
+	public void testSortPartitionPositionKeys1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.sortPartition(0, Order.ASCENDING);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test
+	public void testSortPartitionPositionKeys2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs
+					.sortPartition(0, Order.ASCENDING)
+					.sortPartition(3, Order.DESCENDING);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testSortPartitionWithPositionKeys3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData,
tupleWithCustomInfo);
+
+		// must not work
+		tupleDs.sortPartition(2, Order.ASCENDING);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testSortPartitionWithPositionKeys4() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData,
tupleWithCustomInfo);
+
+		// must not work
+		tupleDs.sortPartition(3, Order.ASCENDING);
+	}
+
+	@Test
+	public void testSortPartitionExpressionKeys1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.sortPartition("f1", Order.ASCENDING);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test
+	public void testSortPartitionExpressionKeys2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData,
tupleWithCustomInfo);
+
+		// should work
+		try {
+			tupleDs
+					.sortPartition("f0", Order.ASCENDING)
+					.sortPartition("f2.nested.myInt", Order.DESCENDING);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testSortPartitionWithExpressionKeys3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData,
tupleWithCustomInfo);
+
+		// must not work
+		tupleDs.sortPartition("f2.nested", Order.ASCENDING);
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testSortPartitionWithExpressionKeys4() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData,
tupleWithCustomInfo);
+
+		// must not work
+		tupleDs.sortPartition("f3", Order.ASCENDING);
+	}
+
+	public static class CustomType implements Serializable {
+		
+		public static class Nest {
+			public int myInt;
+		}
+		private static final long serialVersionUID = 1L;
+		
+		public int myInt;
+		public long myLong;
+		public String myString;
+		public Nest nested;
+		
+		public CustomType() {};
+		
+		public CustomType(int i, long l, String s) {
+			myInt = i;
+			myLong = l;
+			myString = s;
+		}
+		
+		@Override
+		public String toString() {
+			return myInt+","+myLong+","+myString;
+		}
+	}
+
+	public static class CustomType2 implements Serializable {
+
+		public int myInt;
+		public int[] myIntArray;
+
+	}
+}


Mime
View raw message