flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/2] git commit: [FLINK-925] Support KeySelector function returning Tuple types
Date Mon, 08 Sep 2014 14:18:52 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master c0c2abda5 -> 122c9b023


[FLINK-925] Support KeySelector function returning Tuple types


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/fb3bdeac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/fb3bdeac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/fb3bdeac

Branch: refs/heads/master
Commit: fb3bdeac0b0c73e905945a1ecdbf29bf83ba3a6e
Parents: c0c2abd
Author: TobiasWiens <tobwiens@gmail.com>
Authored: Sun Jul 6 17:47:00 2014 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon Sep 8 16:17:32 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java |   3 +-
 .../flink/api/java/operators/JoinOperator.java  |   2 +-
 .../flink/api/java/typeutils/TupleTypeInfo.java |  17 +-
 .../java/typeutils/runtime/TupleComparator.java |   3 +-
 .../runtime/TupleComparatorTTT1Test.java        | 155 +++++++++++++++++++
 .../runtime/TupleComparatorTTT2Test.java        | 153 ++++++++++++++++++
 .../runtime/TupleComparatorTTT3Test.java        | 154 ++++++++++++++++++
 7 files changed, 481 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index ca2a5e9..4688349 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -456,7 +456,8 @@ public abstract class DataSet<T> {
 	 * @see org.apache.flink.api.java.operators.GroupReduceOperator
 	 * @see DataSet
 	 */
-	public <K extends Comparable<K>> UnsortedGrouping<T> groupBy(KeySelector<T,
K> keyExtractor) {
+
+	public <K> UnsortedGrouping<T> groupBy(KeySelector<T, K> keyExtractor)
{
 		return new UnsortedGrouping<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor,
getType()));
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 2efe7e9..1ca2ec9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -756,7 +756,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 * @see KeySelector
 		 * @see DataSet
 		 */
-		public <K extends Comparable<K>> JoinOperatorSetsPredicate where(KeySelector<I1,
K> keySelector) {
+		public <K> JoinOperatorSetsPredicate where(KeySelector<I1, K> keySelector)
{
 			return new JoinOperatorSetsPredicate(new Keys.SelectorFunctionKeys<I1, K>(keySelector,
input1.getType()));
 		}
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index 737be56..94d3252 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -120,7 +120,7 @@ public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T>
implement
 		}
 		
 		// special case for tuples where field zero is the key field
-		if (logicalKeyFields.length == 1 && logicalKeyFields[0] == 0) {
+		if (logicalKeyFields.length == 1 && logicalKeyFields[0] == 0 && !types[0].isTupleType())
{
 			return createLeadingFieldComparator(orders[0], types[0]);
 		}
 		
@@ -141,8 +141,21 @@ public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T>
implement
 			int keyPos = logicalKeyFields[i];
 			if (types[keyPos].isKeyType() && types[keyPos] instanceof AtomicType) {
 				fieldComparators[i] = ((AtomicType<?>) types[keyPos]).createComparator(orders[i]);
+			} else if(types[keyPos].isTupleType() && types[keyPos] instanceof TupleTypeInfo){
// Check for tuple
+				TupleTypeInfo<?> tupleType = (TupleTypeInfo<?>) types[keyPos];
+				
+				// All fields are key
+				int[] allFieldsKey = new int[tupleType.types.length];
+				for(int h = 0; h < tupleType.types.length; h++){
+					allFieldsKey[h]=h;
+				}
+				
+				// Prepare order
+				boolean[] tupleOrders = new boolean[tupleType.types.length];
+				Arrays.fill(tupleOrders, orders[i]);
+				fieldComparators[i] = tupleType.createComparator(allFieldsKey, tupleOrders);
 			} else {
-				throw new IllegalArgumentException("The field at position " + i + " (" + types[keyPos]
+ ") is no atomic key type.");
+				throw new IllegalArgumentException("The field at position " + i + " (" + types[keyPos]
+ ") is no atomic key type nor tuple type.");
 			}
 		}
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
index 48cf08b..9de9824 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
@@ -236,8 +236,7 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T>
im
 		try {
 			for (; i < keyPositions.length; i++) {
 				int keyPos = keyPositions[i];
-				@SuppressWarnings("unchecked")
-				int cmp = comparators[i].compare((T)first.getFieldNotNull(keyPos), (T)second.getFieldNotNull(keyPos));
+				int cmp = comparators[i].compare(first.getFieldNotNull(keyPos), second.getFieldNotNull(keyPos));
 
 				if (cmp != 0) {
 					return cmp;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
new file mode 100644
index 0000000..d406529
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringComparator;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
+
+public class TupleComparatorTTT1Test extends TupleComparatorTestBase<Tuple3<Tuple2<String,
Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> {
+
+	@SuppressWarnings("unchecked")
+	Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[]
dataISD = new Tuple3[]{
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("hello", 1.0), new Tuple2<Long, Long>(1L,
1L), new Tuple2<Integer, Long>(4, -10L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("hello", 2.0), new Tuple2<Long, Long>(1L,
2L), new Tuple2<Integer, Long>(4, -5L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("hello", 3.0), new Tuple2<Long, Long>(1L,
3L), new Tuple2<Integer, Long>(4, 0L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("hello", 3.5), new Tuple2<Long, Long>(1L,
4L), new Tuple2<Integer, Long>(4, 5L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("hello", 4325.12), new Tuple2<Long, Long>(1L,
5L), new Tuple2<Integer, Long>(4, 15L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("world", 1.0), new Tuple2<Long, Long>(2L,
4L), new Tuple2<Integer, Long>(45, -5L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("world", 2.0), new Tuple2<Long, Long>(2L,
6L), new Tuple2<Integer, Long>(45, 5L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("world", 3.0), new Tuple2<Long, Long>(2L,
8L), new Tuple2<Integer, Long>(323, 2L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("world", 3.5), new Tuple2<Long, Long>(2L,
9L), new Tuple2<Integer, Long>(323, 5L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("world", 4325.12), new Tuple2<Long, Long>(2L,
123L), new Tuple2<Integer, Long>(555, 1L))
+		
+	};
+	
+	@SuppressWarnings("unchecked")
+	@Override
+	protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>,
Tuple2<Integer, Long>>> createComparator(
+			boolean ascending) {
+		return new TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>,
Tuple2<Integer, Long>>>(
+				new int[] { 0 },
+				new TypeComparator[] {
+						new TupleComparator<Tuple2<String, Double>>(
+								new int[] { 0, 1 },
+								new TypeComparator[] {
+								new StringComparator(ascending),
+								new DoubleComparator(ascending) },
+								new TypeSerializer[] {
+										StringSerializer.INSTANCE,
+										DoubleSerializer.INSTANCE }),
+						new TupleComparator<Tuple2<Long, Long>>(
+								new int[] { 0, 1 },
+								new TypeComparator[] {
+								new LongComparator(ascending),
+								new LongComparator(ascending) },
+								new TypeSerializer[] {
+										LongSerializer.INSTANCE,
+										LongSerializer.INSTANCE }),
+						new TupleComparator<Tuple2<Integer, Long>>(
+								new int[] {	0, 1 },
+								new TypeComparator[] {
+								new IntComparator(ascending),
+								new LongComparator(ascending) },
+								new TypeSerializer[] {
+										IntSerializer.INSTANCE,
+										LongSerializer.INSTANCE }) },
+				new TypeSerializer[] {
+						new TupleSerializer<Tuple2<String, Double>>(
+								(Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
+								new TypeSerializer[] {
+										StringSerializer.INSTANCE,
+										DoubleSerializer.INSTANCE }),
+						new TupleSerializer<Tuple2<Long, Long>>(
+								(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
+								new TypeSerializer[] {
+										LongSerializer.INSTANCE,
+										LongSerializer.INSTANCE }),
+						new TupleSerializer<Tuple2<Integer, Long>>(
+								(Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
+								new TypeSerializer[] {
+										IntSerializer.INSTANCE,
+										LongSerializer.INSTANCE }) });
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>,
Tuple2<Integer, Long>>> createSerializer() {
+		return new  TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long,
Long>, Tuple2<Integer, Long>>>(
+				(Class<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>>) (Class<?>) Tuple3.class,
+				new TypeSerializer[]{
+					new TupleSerializer<Tuple2<String, Double>> (
+							(Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
+							new TypeSerializer[]{
+									StringSerializer.INSTANCE,
+									DoubleSerializer.INSTANCE
+					}),
+					new TupleSerializer<Tuple2<Long, Long>> (
+							(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
+							new TypeSerializer[]{
+									LongSerializer.INSTANCE,
+									LongSerializer.INSTANCE
+					}),
+					new TupleSerializer<Tuple2<Integer, Long>> (
+							(Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
+							new TypeSerializer[]{
+									IntSerializer.INSTANCE,
+									LongSerializer.INSTANCE
+					})
+				});
+	}
+
+	@Override
+	protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>[] getSortedTestData() {
+		return this.dataISD;
+	}
+	
+	@Override
+	protected void deepEquals(
+			String message,
+			Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>
should,
+			Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>
is) {
+		
+		for (int x = 0; x < should.getArity(); x++) {
+			// Check whether field is of type Tuple2 because assertEquals must be called on the non
Tuple2 fields.
+			if(should.getField(x) instanceof Tuple2) {
+				this.deepEquals(message, (Tuple2<?,?>) should.getField(x), (Tuple2<?,?>)is.getField(x));
+			}
+			else {
+				assertEquals(message, should.getField(x), is.getField(x));
+			}
+		}// For
+	}
+	
+	protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is)
{
+		for (int x = 0; x < should.getArity(); x++) {
+			assertEquals(message, should.getField(x), is.getField(x));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java
new file mode 100644
index 0000000..11f3a5e
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringComparator;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
+
+public class TupleComparatorTTT2Test extends TupleComparatorTestBase<Tuple3<Tuple2<String,
Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> {
+
+	@SuppressWarnings("unchecked")
+	Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[]
dataISD = new Tuple3[]{
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("hello", 1.0), new Tuple2<Long, Long>(1L,
1L), new Tuple2<Integer, Long>(4, -10L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("hello", 2.0), new Tuple2<Long, Long>(1L,
2L), new Tuple2<Integer, Long>(4, -5L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("hello", 3.0), new Tuple2<Long, Long>(1L,
3L), new Tuple2<Integer, Long>(4, 0L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("hello", 3.5), new Tuple2<Long, Long>(1L,
4L), new Tuple2<Integer, Long>(4, 5L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("hello", 4325.12), new Tuple2<Long, Long>(1L,
5L), new Tuple2<Integer, Long>(4, 15L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("world", 1.0), new Tuple2<Long, Long>(2L,
4L), new Tuple2<Integer, Long>(45, -5L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("world", 2.0), new Tuple2<Long, Long>(2L,
6L), new Tuple2<Integer, Long>(45, 5L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("world", 3.0), new Tuple2<Long, Long>(2L,
8L), new Tuple2<Integer, Long>(323, 2L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("world", 3.5), new Tuple2<Long, Long>(2L,
9L), new Tuple2<Integer, Long>(323, 5L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("world", 4325.12), new Tuple2<Long, Long>(2L,
123L), new Tuple2<Integer, Long>(555, 1L))
+		
+	};
+	
+	@SuppressWarnings("unchecked")
+	@Override
+	protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>,
Tuple2<Integer, Long>>> createComparator(
+			boolean ascending) {
+		return new TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>,
Tuple2<Integer, Long>>>(
+				new int[] { 0, 2 },
+				new TypeComparator[] {
+						new TupleComparator<Tuple2<String, Double>>(
+								new int[] { 0, 1 },
+								new TypeComparator[] {
+								new StringComparator(ascending),
+								new DoubleComparator(ascending) },
+								new TypeSerializer[] {
+										StringSerializer.INSTANCE,
+										DoubleSerializer.INSTANCE }),
+						new TupleComparator<Tuple2<Long, Long>>(
+								new int[] { 0, 1 },
+								new TypeComparator[] {
+								new LongComparator(ascending),
+								new LongComparator(ascending) },
+								new TypeSerializer[] {
+										LongSerializer.INSTANCE,
+										LongSerializer.INSTANCE }),
+						new TupleComparator<Tuple2<Integer, Long>>(
+								new int[] {	0, 1 },
+								new TypeComparator[] {
+								new IntComparator(ascending),
+								new LongComparator(ascending) },
+								new TypeSerializer[] {
+										IntSerializer.INSTANCE,
+										LongSerializer.INSTANCE }) },
+				new TypeSerializer[] {
+						new TupleSerializer<Tuple2<String, Double>>(
+								(Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
+								new TypeSerializer[] {
+										StringSerializer.INSTANCE,
+										DoubleSerializer.INSTANCE }),
+						new TupleSerializer<Tuple2<Long, Long>>(
+								(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
+								new TypeSerializer[] {
+										LongSerializer.INSTANCE,
+										LongSerializer.INSTANCE }),
+						new TupleSerializer<Tuple2<Integer, Long>>(
+								(Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
+								new TypeSerializer[] {
+										IntSerializer.INSTANCE,
+										LongSerializer.INSTANCE }) });
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>,
Tuple2<Integer, Long>>> createSerializer() {
+		return new  TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long,
Long>, Tuple2<Integer, Long>>>(
+				(Class<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>>) (Class<?>) Tuple3.class,
+				new TypeSerializer[]{
+					new TupleSerializer<Tuple2<String, Double>> (
+							(Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
+							new TypeSerializer[]{
+									StringSerializer.INSTANCE,
+									DoubleSerializer.INSTANCE}),
+					new TupleSerializer<Tuple2<Long, Long>> (
+							(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
+							new TypeSerializer[]{
+									LongSerializer.INSTANCE,
+									LongSerializer.INSTANCE}),
+					new TupleSerializer<Tuple2<Integer, Long>> (
+							(Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
+							new TypeSerializer[]{
+									IntSerializer.INSTANCE,
+									LongSerializer.INSTANCE})
+				});
+	}
+
+	@Override
+	protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>[] getSortedTestData() {
+		return this.dataISD;
+	}
+	
+	@Override
+	protected void deepEquals(
+			String message,
+			Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>
should,
+			Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>
is) {
+		
+		for (int x = 0; x < should.getArity(); x++) {
+			// Check whether field is of type Tuple2 because assertEquals must be called on the non
Tuple2 fields.
+			if(should.getField(x) instanceof Tuple2) {
+				this.deepEquals(message, (Tuple2<?,?>) should.getField(x), (Tuple2<?,?>)is.getField(x));
+			}
+			else {
+				assertEquals(message, should.getField(x), is.getField(x));
+			}
+		}// For
+	}
+	
+	protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is)
{
+		for (int x = 0; x < should.getArity(); x++) {
+			assertEquals(message, should.getField(x), is.getField(x));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java
new file mode 100644
index 0000000..1339bca
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringComparator;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
+
+public class TupleComparatorTTT3Test extends TupleComparatorTestBase<Tuple3<Tuple2<String,
Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>{
+	@SuppressWarnings("unchecked")
+	Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[]
dataISD = new Tuple3[]{
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("hello", 1.0), new Tuple2<Long, Long>(1L,
1L), new Tuple2<Integer, Long>(4, -10L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("hello", 2.0), new Tuple2<Long, Long>(1L,
2L), new Tuple2<Integer, Long>(4, -5L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("hello", 3.0), new Tuple2<Long, Long>(1L,
3L), new Tuple2<Integer, Long>(4, 0L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("hello", 3.5), new Tuple2<Long, Long>(1L,
4L), new Tuple2<Integer, Long>(4, 5L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("hello", 4325.12), new Tuple2<Long, Long>(1L,
5L), new Tuple2<Integer, Long>(4, 15L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("world", 1.0), new Tuple2<Long, Long>(2L,
4L), new Tuple2<Integer, Long>(45, -5L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("world", 2.0), new Tuple2<Long, Long>(2L,
6L), new Tuple2<Integer, Long>(45, 5L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("world", 3.0), new Tuple2<Long, Long>(2L,
8L), new Tuple2<Integer, Long>(323, 2L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("world", 3.5), new Tuple2<Long, Long>(2L,
9L), new Tuple2<Integer, Long>(323, 5L)),
+		new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>(new Tuple2<String, Double>("world", 4325.12), new Tuple2<Long, Long>(2L,
123L), new Tuple2<Integer, Long>(555, 1L))
+		
+	};
+	
+	@SuppressWarnings("unchecked")
+	@Override
+	protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>,
Tuple2<Integer, Long>>> createComparator(
+			boolean ascending) {
+		return new TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>,
Tuple2<Integer, Long>>>(
+				new int[] { 0, 1, 2 },
+				new TypeComparator[] {
+						new TupleComparator<Tuple2<String, Double>>(
+								new int[] { 0, 1 },
+								new TypeComparator[] {
+								new StringComparator(ascending),
+								new DoubleComparator(ascending) },
+								new TypeSerializer[] {
+										StringSerializer.INSTANCE,
+										DoubleSerializer.INSTANCE }),
+						new TupleComparator<Tuple2<Long, Long>>(
+								new int[] { 0, 1 },
+								new TypeComparator[] {
+								new LongComparator(ascending),
+								new LongComparator(ascending) },
+								new TypeSerializer[] {
+										LongSerializer.INSTANCE,
+										LongSerializer.INSTANCE }),
+						new TupleComparator<Tuple2<Integer, Long>>(
+								new int[] {	0, 1 },
+								new TypeComparator[] {
+								new IntComparator(ascending),
+								new LongComparator(ascending) },
+								new TypeSerializer[] {
+										IntSerializer.INSTANCE,
+										LongSerializer.INSTANCE }) },
+				new TypeSerializer[] {
+						new TupleSerializer<Tuple2<String, Double>>(
+								(Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
+								new TypeSerializer[] {
+										StringSerializer.INSTANCE,
+										DoubleSerializer.INSTANCE }),
+						new TupleSerializer<Tuple2<Long, Long>>(
+								(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
+								new TypeSerializer[] {
+										LongSerializer.INSTANCE,
+										LongSerializer.INSTANCE }),
+						new TupleSerializer<Tuple2<Integer, Long>>(
+								(Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
+								new TypeSerializer[] {
+										IntSerializer.INSTANCE,
+										LongSerializer.INSTANCE }) });
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>,
Tuple2<Integer, Long>>> createSerializer() {
+		return new  TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long,
Long>, Tuple2<Integer, Long>>>(
+				(Class<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>>) (Class<?>) Tuple3.class,
+				new TypeSerializer[]{
+					new TupleSerializer<Tuple2<String, Double>> (
+							(Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
+							new TypeSerializer[]{
+									StringSerializer.INSTANCE,
+									DoubleSerializer.INSTANCE
+					}),
+					new TupleSerializer<Tuple2<Long, Long>> (
+							(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
+							new TypeSerializer[]{
+									LongSerializer.INSTANCE,
+									LongSerializer.INSTANCE
+					}),
+					new TupleSerializer<Tuple2<Integer, Long>> (
+							(Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
+							new TypeSerializer[]{
+									IntSerializer.INSTANCE,
+									LongSerializer.INSTANCE
+					})
+				});
+	}
+
+	@Override
+	protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer,
Long>>[] getSortedTestData() {
+		return this.dataISD;
+	}
+	
+	@Override
+	protected void deepEquals(
+			String message,
+			Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>
should,
+			Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>
is) {
+		
+		for (int x = 0; x < should.getArity(); x++) {
+			// Check whether field is of type Tuple2 because assertEquals must be called on the non
Tuple2 fields.
+			if(should.getField(x) instanceof Tuple2) {
+				this.deepEquals(message, (Tuple2<?,?>) should.getField(x), (Tuple2<?,?>)is.getField(x));
+			}
+			else {
+				assertEquals(message, should.getField(x), is.getField(x));
+			}
+		}// For
+	}
+	
+	protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is)
{
+		for (int x = 0; x < should.getArity(); x++) {
+			assertEquals(message, should.getField(x), is.getField(x));
+		}
+	}
+}


Mime
View raw message