flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [06/11] [FLINK-1032] Rework support for POJO types in the Java API
Date Wed, 08 Oct 2014 09:40:26 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
index 9ae2d6b..9d7eed4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.lang.reflect.Field;
+import java.util.List;
 
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
@@ -32,10 +34,11 @@ import org.apache.flink.types.NullKeyFieldException;
 import org.apache.flink.util.InstantiationUtil;
 
 
-public final class PojoComparator<T> extends TypeComparator<T> implements java.io.Serializable {
-
+public final class PojoComparator<T> extends CompositeTypeComparator<T> implements java.io.Serializable {
+	
 	private static final long serialVersionUID = 1L;
 
+	// Reflection fields for the comp fields
 	private transient Field[] keyFields;
 
 	private final TypeComparator<Object>[] comparators;
@@ -52,8 +55,6 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
 
 	private final Class<T> type;
 
-	private final Comparable[] extractedKeys;
-
 	@SuppressWarnings("unchecked")
 	public PojoComparator(Field[] keyFields, TypeComparator<?>[] comparators, TypeSerializer<T> serializer, Class<T> type) {
 		this.keyFields = keyFields;
@@ -70,6 +71,12 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
 
 		for (int i = 0; i < this.comparators.length; i++) {
 			TypeComparator<?> k = this.comparators[i];
+			if(k == null) {
+				throw new IllegalArgumentException("One of the passed comparators is null");
+			}
+			if(keyFields[i] == null) {
+				throw new IllegalArgumentException("One of the passed reflection fields is null");
+			}
 
 			// as long as the leading keys support normalized keys, we can build up the composite key
 			if (k.supportsNormalizedKey()) {
@@ -102,8 +109,6 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
 		this.numLeadingNormalizableKeys = nKeys;
 		this.normalizableKeyPrefixLen = nKeyLen;
 		this.invertNormKey = inverted;
-
-		extractedKeys = new Comparable[keyFields.length];
 	}
 
 	@SuppressWarnings("unchecked")
@@ -130,8 +135,6 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
 		} catch (ClassNotFoundException e) {
 			throw new RuntimeException("Cannot copy serializer", e);
 		}
-
-		extractedKeys = new Comparable[keyFields.length];
 	}
 
 	private void writeObject(ObjectOutputStream out)
@@ -150,87 +153,87 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
 		int numKeyFields = in.readInt();
 		keyFields = new Field[numKeyFields];
 		for (int i = 0; i < numKeyFields; i++) {
-			Class<?> clazz = (Class<?>)in.readObject();
+			Class<?> clazz = (Class<?>) in.readObject();
 			String fieldName = in.readUTF();
-			keyFields[i] = null;
 			// try superclasses as well
 			while (clazz != null) {
 				try {
-					keyFields[i] = clazz.getDeclaredField(fieldName);
-					keyFields[i].setAccessible(true);
+					Field field = clazz.getDeclaredField(fieldName);
+					field.setAccessible(true);
+					keyFields[i] = field;
 					break;
 				} catch (NoSuchFieldException e) {
 					clazz = clazz.getSuperclass();
 				}
 			}
-			if (keyFields[i] == null) {
+			if (keyFields[i] == null ) {
 				throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup."
 						+ " (" + fieldName + ")");
 			}
 		}
 	}
 
-
 	public Field[] getKeyFields() {
 		return this.keyFields;
 	}
 
-	public TypeComparator[] getComparators() {
-		return this.comparators;
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Override
+	public void getFlatComparator(List<TypeComparator> flatComparators) {
+		for(int i = 0; i < comparators.length; i++) {
+			if(comparators[i] instanceof CompositeTypeComparator) {
+				((CompositeTypeComparator)comparators[i]).getFlatComparator(flatComparators);
+			} else {
+				flatComparators.add(comparators[i]);
+			}
+		}
+	}
+	
+	/**
+	 * This method is handling the IllegalAccess exceptions of Field.get()
+	 */
+	private final Object accessField(Field field, Object object) {
+		try {
+			object = field.get(object);
+		} catch (NullPointerException npex) {
+			throw new NullKeyFieldException("Unable to access field "+field+" on object "+object);
+		} catch (IllegalAccessException iaex) {
+			throw new RuntimeException("This should not happen since we call setAccesssible(true) in PojoTypeInfo."
+			+ " fiels: " + field + " obj: " + object);
+		}
+		return object;
 	}
 
 	@Override
 	public int hash(T value) {
 		int i = 0;
-		try {
-			int code = 0;
-			for (; i < this.keyFields.length; i++) {
-				code ^= this.comparators[i].hash(this.keyFields[i].get(value));
-				code *= HASH_SALT[i & 0x1F]; // salt code with (i % HASH_SALT.length)-th salt component
-			}
-			return code;
-		}
-		catch (NullPointerException npex) {
-			throw new NullKeyFieldException(this.keyFields[i].toString());
-		}
-		catch (IllegalAccessException iaex) {
-			throw new RuntimeException("This should not happen since we call setAccesssible(true) in PojoTypeInfo.");
+		int code = 0;
+		for (; i < this.keyFields.length; i++) {
+			code *= TupleComparatorBase.HASH_SALT[i & 0x1F];
+			code += this.comparators[i].hash(accessField(keyFields[i], value));
+			
 		}
+		return code;
+
 	}
 
 	@Override
 	public void setReference(T toCompare) {
 		int i = 0;
-		try {
-			for (; i < this.keyFields.length; i++) {
-				this.comparators[i].setReference(this.keyFields[i].get(toCompare));
-			}
-		}
-		catch (NullPointerException npex) {
-			throw new NullKeyFieldException(this.keyFields[i].toString());
-		}
-		catch (IllegalAccessException iaex) {
-			throw new RuntimeException("This should not happen since we call setAccesssible(true) in PojoTypeInfo.");
+		for (; i < this.keyFields.length; i++) {
+			this.comparators[i].setReference(accessField(keyFields[i], toCompare));
 		}
 	}
 
 	@Override
 	public boolean equalToReference(T candidate) {
 		int i = 0;
-		try {
-			for (; i < this.keyFields.length; i++) {
-				if (!this.comparators[i].equalToReference(this.keyFields[i].get(candidate))) {
-					return false;
-				}
+		for (; i < this.keyFields.length; i++) {
+			if (!this.comparators[i].equalToReference(accessField(keyFields[i], candidate))) {
+				return false;
 			}
-			return true;
-		}
-		catch (NullPointerException npex) {
-			throw new NullKeyFieldException(this.keyFields[i].toString());
-		}
-		catch (IllegalAccessException iaex) {
-			throw new RuntimeException("This should not happen since we call setAccesssible(true) in PojoTypeInfo.");
 		}
+		return true;
 	}
 
 	@Override
@@ -255,22 +258,17 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
 	@Override
 	public int compare(T first, T second) {
 		int i = 0;
-		try {
-			for (; i < keyFields.length; i++) {
-				int cmp = comparators[i].compare(keyFields[i].get(first),keyFields[i].get(second));
-				if (cmp != 0) {
-					return cmp;
-				}
+		for (; i < keyFields.length; i++) {
+			int cmp = comparators[i].compare(accessField(keyFields[i], first), accessField(keyFields[i], second));
+			if (cmp != 0) {
+				return cmp;
 			}
-
-			return 0;
-		} catch (NullPointerException npex) {
-			throw new NullKeyFieldException(keyFields[i].toString() + " " + first.toString() + " " + second.toString());
-		} catch (IllegalAccessException iaex) {
-			throw new RuntimeException("This should not happen since we call setAccesssible(true) in PojoTypeInfo.");
 		}
+
+		return 0;
 	}
 
+	
 	@Override
 	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
 		T first = this.serializer.createInstance();
@@ -302,21 +300,13 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
 	@Override
 	public void putNormalizedKey(T value, MemorySegment target, int offset, int numBytes) {
 		int i = 0;
-		try {
-			for (; i < this.numLeadingNormalizableKeys & numBytes > 0; i++)
-			{
-				int len = this.normalizedKeyLengths[i];
-				len = numBytes >= len ? len : numBytes;
-				this.comparators[i].putNormalizedKey(this.keyFields[i].get(value), target, offset, len);
-				numBytes -= len;
-				offset += len;
-			}
-		}
-		catch (IllegalAccessException iaex) {
-			throw new RuntimeException("This should not happen since we call setAccesssible(true) in PojoTypeInfo.");
-		}
-		catch (NullPointerException npex) {
-			throw new NullKeyFieldException(this.keyFields[i].toString());
+		for (; i < this.numLeadingNormalizableKeys & numBytes > 0; i++)
+		{
+			int len = this.normalizedKeyLengths[i];
+			len = numBytes >= len ? len : numBytes;
+			this.comparators[i].putNormalizedKey(accessField(keyFields[i], value), target, offset, len);
+			numBytes -= len;
+			offset += len;
 		}
 	}
 
@@ -347,36 +337,21 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
 	}
 
 	@Override
-	public Object[] extractKeys(T record) {
-		int i = 0;
-		try {
-			for (; i < keyFields.length; i++) {
-				extractedKeys[i] = (Comparable) keyFields[i].get(record);
+	public int extractKeys(Object record, Object[] target, int index) {
+		int localIndex = index;
+		for (int i = 0; i < comparators.length; i++) {
+			if(comparators[i] instanceof PojoComparator || comparators[i] instanceof TupleComparator) {
+				localIndex += comparators[i].extractKeys(accessField(keyFields[i], record), target, localIndex) -1;
+			} else {
+				// non-composite case (= atomic). We can assume this to have only one key.
+				// comparators[i].extractKeys(accessField(keyFields[i], record), target, i);
+				target[localIndex] = accessField(keyFields[i], record);
 			}
+			localIndex++;
 		}
-		catch (IllegalAccessException iaex) {
-			throw new RuntimeException("This should not happen since we call setAccesssible(true) in PojoTypeInfo.");
-		}
-		catch (NullPointerException npex) {
-			throw new NullKeyFieldException(this.keyFields[i].toString());
-		}
-		return extractedKeys;
+		return localIndex - index;
 	}
 
 	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * A sequence of prime numbers to be used for salting the computed hash values.
-	 * Based on some empirical evidence, we are using a 32-element subsequence of the
-	 * OEIS sequence #A068652 (numbers such that every cyclic permutation is a prime).
-	 *
-	 * @see: http://en.wikipedia.org/wiki/List_of_prime_numbers
-	 * @see: http://oeis.org/A068652
-	 */
-	private static final int[] HASH_SALT = new int[] {
-		73   , 79   , 97   , 113  , 131  , 197  , 199  , 311   ,
-		337  , 373  , 719  , 733  , 919  , 971  , 991  , 1193  ,
-		1931 , 3119 , 3779 , 7793 , 7937 , 9311 , 9377 , 11939 ,
-		19391, 19937, 37199, 39119, 71993, 91193, 93719, 93911 };
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 71f2cd8..99b9f65 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -174,11 +174,22 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 	@Override
 	public void serialize(T value, DataOutputView target) throws IOException {
+		// handle null values
+		if (value == null) {
+			target.writeBoolean(true);
+			return;
+		} else {
+			target.writeBoolean(false);
+		}
 		try {
-
 			for (int i = 0; i < numFields; i++) {
 				Object o = fields[i].get(value);
-				fieldSerializers[i].serialize(o, target);
+				if(o == null) {
+					target.writeBoolean(true); // null field handling
+				} else {
+					target.writeBoolean(false);
+					fieldSerializers[i].serialize(o, target);
+				}
 			}
 		} catch (IllegalAccessException e) {
 			throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" +
@@ -188,6 +199,10 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 	@Override
 	public T deserialize(DataInputView source) throws IOException {
+		boolean isNull = source.readBoolean();
+		if(isNull) {
+			return null;
+		}
 		T target;
 		try {
 			target = clazz.newInstance();
@@ -198,8 +213,13 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 		
 		try {
 			for (int i = 0; i < numFields; i++) {
-				Object field = fieldSerializers[i].deserialize(source);
-				fields[i].set(target, field);
+				isNull = source.readBoolean();
+				if(isNull) {
+					fields[i].set(target, null);
+				} else {
+					Object field = fieldSerializers[i].deserialize(source);
+					fields[i].set(target, field);
+				}
 			}
 		} catch (IllegalAccessException e) {
 			throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" +
@@ -210,10 +230,20 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 	
 	@Override
 	public T deserialize(T reuse, DataInputView source) throws IOException {
+		// handle null values
+		boolean isNull = source.readBoolean();
+		if (isNull) {
+			return null;
+		}
 		try {
 			for (int i = 0; i < numFields; i++) {
-				Object field = fieldSerializers[i].deserialize(fields[i].get(reuse), source);
-				fields[i].set(reuse, field);
+				isNull = source.readBoolean();
+				if(isNull) {
+					fields[i].set(reuse, null);
+				} else {
+					Object field = fieldSerializers[i].deserialize(fields[i].get(reuse), source);
+					fields[i].set(reuse, field);
+				}
 			}
 		} catch (IllegalAccessException e) {
 			throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" +
@@ -224,7 +254,10 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		// copy the Non-Null/Null tag
+		target.writeBoolean(source.readBoolean());
 		for (int i = 0; i < numFields; i++) {
+			target.writeBoolean(source.readBoolean());
 			fieldSerializers[i].copy(source, target);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
index eee6643..31e28f7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
@@ -28,7 +28,6 @@ public final class RuntimePairComparatorFactory<T1, T2>
 
 	private static final long serialVersionUID = 1L;
 
-	@SuppressWarnings("unchecked")
 	@Override
 	public TypePairComparator<T1, T2> createComparator12(
 			TypeComparator<T1> comparator1,
@@ -36,7 +35,6 @@ public final class RuntimePairComparatorFactory<T1, T2>
 		return new GenericPairComparator<T1, T2>(comparator1, comparator2);
 	}
 
-	@SuppressWarnings("unchecked")
 	@Override
 	public TypePairComparator<T2, T1> createComparator21(
 			TypeComparator<T1> comparator1,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
index f9b4084..61a1567 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
@@ -30,26 +30,20 @@ import org.apache.flink.types.NullKeyFieldException;
 public final class TupleComparator<T extends Tuple> extends TupleComparatorBase<T> {
 
 	private static final long serialVersionUID = 1L;
-
-	private final Object[] extractedKeys;
-
-	@SuppressWarnings("unchecked")
+	
 	public TupleComparator(int[] keyPositions, TypeComparator<?>[] comparators, TypeSerializer<?>[] serializers) {
 		super(keyPositions, comparators, serializers);
-		extractedKeys = new Object[keyPositions.length];
 	}
 	
-	@SuppressWarnings("unchecked")
 	private TupleComparator(TupleComparator<T> toClone) {
 		super(toClone);
-		extractedKeys = new Object[keyPositions.length];
-
 	}
 	
 	// --------------------------------------------------------------------------------------------
 	//  Comparator Methods
 	// --------------------------------------------------------------------------------------------
 	
+	@SuppressWarnings("unchecked")
 	@Override
 	public int hash(T value) {
 		int i = 0;
@@ -70,6 +64,7 @@ public final class TupleComparator<T extends Tuple> extends TupleComparatorBase<
 		}
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
 	public void setReference(T toCompare) {
 		int i = 0;
@@ -86,6 +81,7 @@ public final class TupleComparator<T extends Tuple> extends TupleComparatorBase<
 		}
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
 	public boolean equalToReference(T candidate) {
 		int i = 0;
@@ -105,6 +101,7 @@ public final class TupleComparator<T extends Tuple> extends TupleComparatorBase<
 		}
 	}
 	
+	@SuppressWarnings("unchecked")
 	@Override
 	public int compare(T first, T second) {
 		int i = 0;
@@ -127,6 +124,7 @@ public final class TupleComparator<T extends Tuple> extends TupleComparatorBase<
 		}
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
 	public void putNormalizedKey(T value, MemorySegment target, int offset, int numBytes) {
 		int i = 0;
@@ -146,11 +144,19 @@ public final class TupleComparator<T extends Tuple> extends TupleComparatorBase<
 	}
 
 	@Override
-	public Object[] extractKeys(T record) {
-		for (int i = 0; i < keyPositions.length; i++) {
-			extractedKeys[i] = record.getField(keyPositions[i]);
+	public int extractKeys(Object record, Object[] target, int index) {
+		int localIndex = index;
+		for(int i = 0; i < comparators.length; i++) {
+			// handle nested case
+			if(comparators[i] instanceof TupleComparator || comparators[i] instanceof PojoComparator) {
+				localIndex += comparators[i].extractKeys(((Tuple) record).getField(keyPositions[i]), target, localIndex) -1;
+			} else {
+				// flat
+				target[localIndex] = ((Tuple) record).getField(keyPositions[i]);
+			}
+			localIndex++;
 		}
-		return extractedKeys;
+		return localIndex - index;
 	}
 
 	public TypeComparator<T> duplicate() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
index 4c26318..abcf89c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
@@ -15,9 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.api.java.typeutils.runtime;
 
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -26,15 +29,16 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.KeyFieldOutOfBoundsException;
 import org.apache.flink.types.NullKeyFieldException;
 
-import java.io.IOException;
 
+public abstract class TupleComparatorBase<T> extends CompositeTypeComparator<T> implements java.io.Serializable {
 
-public abstract class TupleComparatorBase<T> extends TypeComparator<T> implements java.io.Serializable {
+	private static final long serialVersionUID = 1L;
 
 	/** key positions describe which fields are keys in what order */
 	protected int[] keyPositions;
 
 	/** comparators for the key fields, in the same order as the key fields */
+	@SuppressWarnings("rawtypes")
 	protected TypeComparator[] comparators;
 
 	/** serializer factories to duplicate non thread-safe serializers */
@@ -51,6 +55,7 @@ public abstract class TupleComparatorBase<T> extends TypeComparator<T> implement
 
 
 	/** serializers to deserialize the first n fields for comparison */
+	@SuppressWarnings("rawtypes")
 	protected transient TypeSerializer[] serializers;
 
 	// cache for the deserialized field objects
@@ -115,7 +120,6 @@ public abstract class TupleComparatorBase<T> extends TypeComparator<T> implement
 		this.invertNormKey = inverted;
 	}
 
-	@SuppressWarnings("unchecked")
 	protected TupleComparatorBase(TupleComparatorBase<T> toClone) {
 		privateDuplicate(toClone);
 	}
@@ -146,14 +150,23 @@ public abstract class TupleComparatorBase<T> extends TypeComparator<T> implement
 		return this.keyPositions;
 	}
 	
-	public TypeComparator[] getComparators() {
-		return this.comparators;
-	}
 	
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Override
+	public void getFlatComparator(List<TypeComparator> flatComparators) {
+		for(int i = 0; i < comparators.length; i++) {
+			if(comparators[i] instanceof CompositeTypeComparator) {
+				((CompositeTypeComparator)comparators[i]).getFlatComparator(flatComparators);
+			} else {
+				flatComparators.add(comparators[i]);
+			}
+		}
+	}	
 	// --------------------------------------------------------------------------------------------
 	//  Comparator Methods
 	// --------------------------------------------------------------------------------------------
 
+
 	@Override
 	public int compareToReference(TypeComparator<T> referencedComparator) {
 		TupleComparatorBase<T> other = (TupleComparatorBase<T>) referencedComparator;
@@ -161,6 +174,7 @@ public abstract class TupleComparatorBase<T> extends TypeComparator<T> implement
 		int i = 0;
 		try {
 			for (; i < this.keyPositions.length; i++) {
+				@SuppressWarnings("unchecked")
 				int cmp = this.comparators[i].compareToReference(other.comparators[i]);
 				if (cmp != 0) {
 					return cmp;
@@ -176,6 +190,7 @@ public abstract class TupleComparatorBase<T> extends TypeComparator<T> implement
 		}
 	}
 	
+	@SuppressWarnings("unchecked")
 	@Override
 	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
 		if (deserializedFields1 == null) {
@@ -201,7 +216,7 @@ public abstract class TupleComparatorBase<T> extends TypeComparator<T> implement
 		} catch (NullPointerException npex) {
 			throw new NullKeyFieldException(keyPositions[i]);
 		} catch (IndexOutOfBoundsException iobex) {
-			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+			throw new KeyFieldOutOfBoundsException(keyPositions[i], iobex);
 		}
 	}
 	
@@ -245,7 +260,6 @@ public abstract class TupleComparatorBase<T> extends TypeComparator<T> implement
 	
 	// --------------------------------------------------------------------------------------------
 	
-	@SuppressWarnings("unchecked")
 	protected final void instantiateDeserializationUtils() {
 		if (this.serializers == null) {
 			this.serializers = new TypeSerializer[this.serializerFactories.length];

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
index a510863..eca1e6c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
@@ -47,8 +47,7 @@ public class ValueComparator<T extends Value & Comparable<T>> extends TypeCompar
 	
 	private transient Kryo kryo;
 
-	private final Comparable[] extractedKey = new Comparable[1];
-
+	@SuppressWarnings("rawtypes")
 	private final TypeComparator[] comparators = new TypeComparator[] {this};
 
 	public ValueComparator(boolean ascending, Class<T> type) {
@@ -145,13 +144,14 @@ public class ValueComparator<T extends Value & Comparable<T>> extends TypeCompar
 	}
 
 	@Override
-	public Object[] extractKeys(T record) {
-		extractedKey[0] = record;
-		return extractedKey;
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
 	}
 
+	@SuppressWarnings("rawtypes")
 	@Override
-	public TypeComparator[] getComparators() {
+	public TypeComparator[] getFlatComparators() {
 		return comparators;
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
index a8c4ef5..88985bb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
@@ -44,8 +44,7 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
 	
 	private transient Kryo kryo;
 
-	private final Comparable[] extractedKey = new Comparable[1];
-
+	@SuppressWarnings("rawtypes")
 	private final TypeComparator[] comparators = new TypeComparator[] {this};
 
 	public WritableComparator(boolean ascending, Class<T> type) {
@@ -129,12 +128,13 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
 	}
 
 	@Override
-	public Object[] extractKeys(T record) {
-		extractedKey[0] = record;
-		return extractedKey;
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
 	}
 
-	@Override public TypeComparator[] getComparators() {
+	@SuppressWarnings("rawtypes")
+	@Override public TypeComparator[] getFlatComparators() {
 		return comparators;
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
index 898eae5..d686633 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.operator;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -29,10 +28,10 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operator.JoinOperatorTest.CustomType;
 
 @SuppressWarnings("serial")
 public class CoGroupOperatorTest {
@@ -127,7 +126,6 @@ public class CoGroupOperatorTest {
 		ds1.coGroup(ds2).where(5).equalTo(0);
 	}
 
-	@Ignore
 	@Test
 	public void testCoGroupKeyExpressions1() {
 
@@ -137,13 +135,12 @@ public class CoGroupOperatorTest {
 
 		// should work
 		try {
-//			ds1.coGroup(ds2).where("myInt").equalTo("myInt");
+			ds1.coGroup(ds2).where("myInt").equalTo("myInt");
 		} catch(Exception e) {
 			Assert.fail();
 		}
 	}
 
-	@Ignore
 	@Test(expected = InvalidProgramException.class)
 	public void testCoGroupKeyExpressions2() {
 
@@ -152,10 +149,9 @@ public class CoGroupOperatorTest {
 		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
 
 		// should not work, incompatible cogroup key types
-//		ds1.coGroup(ds2).where("myInt").equalTo("myString");
+		ds1.coGroup(ds2).where("myInt").equalTo("myString");
 	}
 
-	@Ignore
 	@Test(expected = InvalidProgramException.class)
 	public void testCoGroupKeyExpressions3() {
 
@@ -164,10 +160,9 @@ public class CoGroupOperatorTest {
 		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
 
 		// should not work, incompatible number of cogroup keys
-//		ds1.coGroup(ds2).where("myInt", "myString").equalTo("myString");
+		ds1.coGroup(ds2).where("myInt", "myString").equalTo("myString");
 	}
 
-	@Ignore
 	@Test(expected = IllegalArgumentException.class)
 	public void testCoGroupKeyExpressions4() {
 
@@ -176,9 +171,58 @@ public class CoGroupOperatorTest {
 		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
 
 		// should not work, cogroup key non-existent
-//		ds1.coGroup(ds2).where("myNonExistent").equalTo("myInt");
+		ds1.coGroup(ds2).where("myNonExistent").equalTo("myInt");
+	}
+	
+	@Test
+	public void testCoGroupKeyExpressions1Nested() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+
+		// should work
+		try {
+			ds1.coGroup(ds2).where("nested.myInt").equalTo("nested.myInt");
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
 	}
 
+	@Test(expected = InvalidProgramException.class)
+	public void testCoGroupKeyExpressions2Nested() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+
+		// should not work, incompatible cogroup key types
+		ds1.coGroup(ds2).where("nested.myInt").equalTo("nested.myString");
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testCoGroupKeyExpressions3Nested() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+
+		// should not work, incompatible number of cogroup keys
+		ds1.coGroup(ds2).where("nested.myInt", "nested.myString").equalTo("nested.myString");
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testCoGroupKeyExpressions4Nested() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+
+		// should not work, cogroup key non-existent
+		ds1.coGroup(ds2).where("nested.myNonExistent").equalTo("nested.myInt");
+	}
+	
 	@Test
 	public void testCoGroupKeySelectors1() {
 		
@@ -304,26 +348,4 @@ public class CoGroupOperatorTest {
 					}
 				);
 	}
-		
-	public static class CustomType implements Serializable {
-		
-		private static final long serialVersionUID = 1L;
-		
-		public int myInt;
-		public long myLong;
-		public String myString;
-		
-		public CustomType() {};
-		
-		public CustomType(int i, long l, String s) {
-			myInt = i;
-			myLong = l;
-			myString = s;
-		}
-		
-		@Override
-		public String toString() {
-			return myInt+","+myLong+","+myString;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index adc8917..c958680 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class GroupingTest {
@@ -112,7 +111,6 @@ public class GroupingTest {
 		tupleDs.groupBy(-1);
 	}
 
-	@Ignore
 	@Test
 	public void testGroupByKeyExpressions1() {
 
@@ -124,24 +122,22 @@ public class GroupingTest {
 
 		// should work
 		try {
-//			ds.groupBy("myInt");
+			ds.groupBy("myInt");
 		} catch(Exception e) {
 			Assert.fail();
 		}
 	}
 
-	@Ignore
-	@Test(expected = UnsupportedOperationException.class)
+	@Test(expected = IllegalArgumentException.class)
 	public void testGroupByKeyExpressions2() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
 		// should not work: groups on basic type
-//		longDs.groupBy("myInt");
+		longDs.groupBy("myInt");
 	}
 
-	@Ignore
 	@Test(expected = InvalidProgramException.class)
 	public void testGroupByKeyExpressions3() {
 
@@ -150,12 +146,11 @@ public class GroupingTest {
 		this.customTypeData.add(new CustomType());
 
 		DataSet<CustomType> customDs = env.fromCollection(customTypeData);
-		// should not work: groups on custom type
+		// should not work: tuple selector on custom type
 		customDs.groupBy(0);
 
 	}
 
-	@Ignore
 	@Test(expected = IllegalArgumentException.class)
 	public void testGroupByKeyExpressions4() {
 
@@ -163,7 +158,34 @@ public class GroupingTest {
 		DataSet<CustomType> ds = env.fromCollection(customTypeData);
 
 		// should not work, key out of tuple bounds
-//		ds.groupBy("myNonExistent");
+		ds.groupBy("myNonExistent");
+	}
+
+	@Test
+	public void testGroupByKeyExpressions1Nested() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		this.customTypeData.add(new CustomType());
+
+		DataSet<CustomType> ds = env.fromCollection(customTypeData);
+
+		// should work
+		try {
+			ds.groupBy("nested.myInt");
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testGroupByKeyExpressions2Nested() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomType> ds = env.fromCollection(customTypeData);
+
+		// should not work, key out of tuple bounds
+		ds.groupBy("nested.myNonExistent");
 	}
 
 	
@@ -309,11 +331,15 @@ public class GroupingTest {
 
 	public static class CustomType implements Serializable {
 		
+		public static class Nest {
+			public int myInt;
+		}
 		private static final long serialVersionUID = 1L;
 		
 		public int myInt;
 		public long myLong;
 		public String myString;
+		public Nest nested;
 		
 		public CustomType() {};
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
index 962755e..de50fd8 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
@@ -22,17 +22,20 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.junit.Assert;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 
 @SuppressWarnings("serial")
 public class JoinOperatorTest {
@@ -41,20 +44,51 @@ public class JoinOperatorTest {
 	private static final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData = 
 			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
 	
-	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new 
-			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = 
+			new TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
 					BasicTypeInfo.INT_TYPE_INFO,
 					BasicTypeInfo.LONG_TYPE_INFO,
 					BasicTypeInfo.STRING_TYPE_INFO,
 					BasicTypeInfo.LONG_TYPE_INFO,
 					BasicTypeInfo.INT_TYPE_INFO
 			);
+	// TUPLE DATA with nested Tuple2
+	private static final List<Tuple5<Tuple2<Integer, String>, Long, String, Long, Integer>> emptyNestedTupleData = 
+			new ArrayList<Tuple5<Tuple2<Integer, String>, Long, String, Long, Integer>>();
 
+	private final TupleTypeInfo<Tuple5<Tuple2<Integer, String>, Long, String, Long, Integer>> nestedTupleTypeInfo =
+			new TupleTypeInfo<Tuple5<Tuple2<Integer, String>, Long, String, Long, Integer>>(
+					new TupleTypeInfo<Tuple2<Integer, String>> (BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.STRING_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO
+			);
+	
+	// TUPLE DATA with nested CustomType
+	private static final List<Tuple5<CustomType, Long, String, Long, Integer>> emptyNestedCustomTupleData = 
+			new ArrayList<Tuple5<CustomType, Long, String, Long, Integer>>();
+
+	private final TupleTypeInfo<Tuple5<CustomType, Long, String, Long, Integer>> nestedCustomTupleTypeInfo =
+			new TupleTypeInfo<Tuple5<CustomType, Long, String, Long, Integer>>(
+					TypeExtractor.getForClass(CustomType.class),
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.STRING_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO
+			);
+	
+	private static List<CustomTypeWithTuple> customTypeWithTupleData = new ArrayList<CustomTypeWithTuple>();
 	private static List<CustomType> customTypeData = new ArrayList<CustomType>();
 	
+	private static List<NestedCustomType> customNestedTypeData = new ArrayList<NestedCustomType>();
+	
+	
 	@BeforeClass
 	public static void insertCustomData() {
 		customTypeData.add(new CustomType());
+		customTypeWithTupleData.add(new CustomTypeWithTuple());
+		customNestedTypeData.add(new NestedCustomType());
 	}
 	
 	@Test  
@@ -127,7 +161,6 @@ public class JoinOperatorTest {
 		ds1.join(ds2).where(5).equalTo(0);
 	}
 
-	@Ignore
 	@Test
 	public void testJoinKeyExpressions1() {
 
@@ -137,13 +170,29 @@ public class JoinOperatorTest {
 
 		// should work
 		try {
-//			ds1.join(ds2).where("myInt").equalTo("myInt");
+			ds1.join(ds2).where("myInt").equalTo("myInt");
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+	
+	@Test
+	public void testJoinKeyExpressionsNested() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<NestedCustomType> ds1 = env.fromCollection(customNestedTypeData);
+		DataSet<NestedCustomType> ds2 = env.fromCollection(customNestedTypeData);
+
+		// should work
+		try {
+			ds1.join(ds2).where("myInt").equalTo("myInt");
 		} catch(Exception e) {
 			Assert.fail();
 		}
 	}
+	
+	
 
-	@Ignore
 	@Test(expected = InvalidProgramException.class)
 	public void testJoinKeyExpressions2() {
 
@@ -152,10 +201,9 @@ public class JoinOperatorTest {
 		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
 
 		// should not work, incompatible join key types
-//		ds1.join(ds2).where("myInt").equalTo("myString");
+		ds1.join(ds2).where("myInt").equalTo("myString");
 	}
 
-	@Ignore
 	@Test(expected = InvalidProgramException.class)
 	public void testJoinKeyExpressions3() {
 
@@ -164,10 +212,9 @@ public class JoinOperatorTest {
 		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
 
 		// should not work, incompatible number of join keys
-//		ds1.join(ds2).where("myInt", "myString").equalTo("myString");
+		ds1.join(ds2).where("myInt", "myString").equalTo("myString");
 	}
 
-	@Ignore
 	@Test(expected = IllegalArgumentException.class)
 	public void testJoinKeyExpressions4() {
 
@@ -176,7 +223,230 @@ public class JoinOperatorTest {
 		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
 
 		// should not work, join key non-existent
-//		ds1.join(ds2).where("myNonExistent").equalTo("myInt");
+		ds1.join(ds2).where("myNonExistent").equalTo("myInt");
+	}
+	
+	/**
+	 * Test if mixed types of key selectors are properly working.
+	 */
+	@Test
+	public void testJoinKeyMixedKeySelector() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+		try {
+			ds1.join(ds2).where("myInt").equalTo(new KeySelector<CustomType, Integer>() {
+				@Override
+				public Integer getKey(CustomType value) throws Exception {
+					return value.myInt;
+				}
+			});
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+	
+	@Test
+	public void testJoinKeyMixedKeySelectorTurned() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+		try {
+			ds1.join(ds2).where(new KeySelector<CustomType, Integer>() {
+				@Override
+				public Integer getKey(CustomType value) throws Exception {
+					return value.myInt;
+				}
+			}).equalTo("myInt");
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+	
+	@Test
+	public void testJoinKeyMixedTupleIndex() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		try {
+			ds1.join(ds2).where("f0").equalTo(4);
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+	
+	@Test
+	public void testJoinKeyNestedTuples() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Tuple2<Integer, String>, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyNestedTupleData, nestedTupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		try {
+			ds1.join(ds2).where("f0.f0").equalTo(4);
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+	
+	@Test
+	public void testJoinKeyNestedTuplesWithCustom() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<CustomType, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyNestedCustomTupleData, nestedCustomTupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		try {
+			TypeInformation<?> t = ds1.join(ds2).where("f0.myInt").equalTo(4).getType();
+			Assert.assertTrue("not a composite type", t instanceof CompositeType);
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+	
+	@Test
+	public void testJoinKeyWithCustomContainingTuple0() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomTypeWithTuple> ds1 = env.fromCollection(customTypeWithTupleData);
+		DataSet<CustomTypeWithTuple> ds2 = env.fromCollection(customTypeWithTupleData);
+		try {
+			ds1.join(ds2).where("intByString.f0").equalTo("myInt");
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+	
+	@Test
+	public void testJoinKeyWithCustomContainingTuple1() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomTypeWithTuple> ds1 = env.fromCollection(customTypeWithTupleData);
+		DataSet<CustomTypeWithTuple> ds2 = env.fromCollection(customTypeWithTupleData);
+		try {
+			ds1.join(ds2).where("nested.myInt").equalTo("intByString.f0");
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+	
+	@Test
+	public void testJoinKeyWithCustomContainingTuple2() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomTypeWithTuple> ds1 = env.fromCollection(customTypeWithTupleData);
+		DataSet<CustomTypeWithTuple> ds2 = env.fromCollection(customTypeWithTupleData);
+		try {
+			ds1.join(ds2).where("nested.myInt", "myInt", "intByString.f1").equalTo("intByString.f0","myInt", "myString");
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+	
+	@Test(expected = InvalidProgramException.class)
+	public void testJoinKeyNestedTuplesWrongType() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Tuple2<Integer, String>, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyNestedTupleData, nestedTupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		ds1.join(ds2).where("f0.f1").equalTo(4); // f0.f1 is a String
+	}
+	
+	@Test
+	public void testJoinKeyMixedTupleIndexTurned() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		try {
+			ds1.join(ds2).where(0).equalTo("f0");
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+	
+	@Test(expected = InvalidProgramException.class)
+	public void testJoinKeyMixedTupleIndexWrongType() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		ds1.join(ds2).where("f0").equalTo(3); // 3 is of type long, so it should fail
+	}
+	
+	@Test
+	public void testJoinKeyMixedTupleIndex2() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		try {
+			ds1.join(ds2).where("myInt").equalTo(4);
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testJoinKeyMixedWrong() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+		// wrongly mix String and Integer
+		ds1.join(ds2).where("myString").equalTo(new KeySelector<CustomType, Integer>() {
+			@Override
+			public Integer getKey(CustomType value) throws Exception {
+				return value.myInt;
+			}
+		});
+	}
+	
+	@Test
+	public void testJoinKeyExpressions1Nested() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+
+		// should work
+		try {
+			ds1.join(ds2).where("nested.myInt").equalTo("nested.myInt");
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testJoinKeyExpressions2Nested() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+
+		// should not work, incompatible join key types
+		ds1.join(ds2).where("nested.myInt").equalTo("nested.myString");
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testJoinKeyExpressions3Nested() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+
+		// should not work, incompatible number of join keys
+		ds1.join(ds2).where("nested.myInt", "nested.myString").equalTo("nested.myString");
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testJoinKeyExpressions4Nested() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+
+		// should not work, join key non-existent
+		ds1.join(ds2).where("nested.myNonExistent").equalTo("nested.myInt");
 	}
 
 	
@@ -235,6 +505,7 @@ public class JoinOperatorTest {
 					)
 			.equalTo(3);
 		} catch(Exception e) {
+			e.printStackTrace();
 			Assert.fail();
 		}
 	}
@@ -403,7 +674,6 @@ public class JoinOperatorTest {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
 		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
-
 		// should work
 		try {
 			ds1.join(ds2)
@@ -430,6 +700,7 @@ public class JoinOperatorTest {
 				.types(CustomType.class, CustomType.class);
 		} catch(Exception e) {
 			System.out.println("FAILED: " + e);
+			e.printStackTrace();
 			Assert.fail();
 		}
 	}
@@ -550,13 +821,58 @@ public class JoinOperatorTest {
 	 * ####################################################################
 	 */
 
+	public static class Nested implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public int myInt;
+
+		public Nested() {};
+
+		public Nested(int i, long l, String s) {
+			myInt = i;
+		}
+
+		@Override
+		public String toString() {
+			return ""+myInt;
+		}
+	}
+	// a simple nested type (only basic types)
+	public static class NestedCustomType implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public int myInt;
+		public long myLong;
+		public String myString;
+		public Nested nest;
+		
+		public NestedCustomType() {};
+
+		public NestedCustomType(int i, long l, String s) {
+			myInt = i;
+			myLong = l;
+			myString = s;
+		}
+
+		@Override
+		public String toString() {
+			return myInt+","+myLong+","+myString+","+nest;
+		}
+	}
+
 	public static class CustomType implements Serializable {
 		
 		private static final long serialVersionUID = 1L;
 		
 		public int myInt;
 		public long myLong;
+		public NestedCustomType nested;
 		public String myString;
+		public Object nothing;
+	//	public List<String> countries; need Kryo to support this
+	//	public Writable interfaceTest; need kryo
 		
 		public CustomType() {};
 		
@@ -564,6 +880,7 @@ public class JoinOperatorTest {
 			myInt = i;
 			myLong = l;
 			myString = s;
+			nested = new NestedCustomType(i, l, s);
 		}
 		
 		@Override
@@ -571,4 +888,33 @@ public class JoinOperatorTest {
 			return myInt+","+myLong+","+myString;
 		}
 	}
+	
+	
+	public static class CustomTypeWithTuple implements Serializable {
+		
+		private static final long serialVersionUID = 1L;
+		
+		public int myInt;
+		public long myLong;
+		public NestedCustomType nested;
+		public String myString;
+		public Tuple2<Integer, String> intByString;
+		
+		public CustomTypeWithTuple() {};
+		
+		public CustomTypeWithTuple(int i, long l, String s) {
+			myInt = i;
+			myLong = l;
+			myString = s;
+			nested = new NestedCustomType(i, l, s);
+			intByString = new Tuple2<Integer, String>(i, s);
+		}
+		
+		@Override
+		public String toString() {
+			return myInt+","+myLong+","+myString;
+		}
+	}
+
+	
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
new file mode 100644
index 0000000..00ab520
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+@RunWith(PowerMockRunner.class)
+public class KeysTest {
+	
+	@Test
+	public void testTupleRangeCheck() throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+		
+		// test private static final int[] rangeCheckFields(int[] fields, int maxAllowedField)
+		Method rangeCheckFieldsMethod = Whitebox.getMethod(Keys.class, "rangeCheckFields", int[].class, int.class);
+		int[] result = (int[]) rangeCheckFieldsMethod.invoke(null, new int[] {1,2,3,4}, 4);
+		Assert.assertArrayEquals(new int[] {1,2,3,4}, result);
+		
+		// test duplicate elimination
+		result = (int[]) rangeCheckFieldsMethod.invoke(null, new int[] {1,2,2,3,4}, 4);
+		Assert.assertArrayEquals(new int[] {1,2,3,4}, result);
+		
+		result = (int[]) rangeCheckFieldsMethod.invoke(null, new int[] {1,2,2,2,2,2,2,3,3,4}, 4);
+		Assert.assertArrayEquals(new int[] {1,2,3,4}, result);
+		
+		// corner case tests
+		result = (int[]) rangeCheckFieldsMethod.invoke(null, new int[] {0}, 0);
+		Assert.assertArrayEquals(new int[] {0}, result);
+		
+		Throwable ex = null;
+		try {
+			// throws illegal argument.
+			result = (int[]) rangeCheckFieldsMethod.invoke(null, new int[] {5}, 0);
+		} catch(Throwable iae) {
+			ex = iae;
+		}
+		Assert.assertNotNull(ex);
+	}
+	
+	@Test
+	public void testStandardTupleKeys() {
+		TupleTypeInfo<Tuple7<String, String, String, String, String, String, String>> typeInfo = new TupleTypeInfo<Tuple7<String, String, String, String, String, String, String>>(
+				BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO);
+		
+		ExpressionKeys<Tuple7<String, String, String, String, String, String, String>> ek;
+		
+		for( int i = 1; i < 8; i++) {
+			int[] ints = new int[i];
+			for( int j = 0; j < i; j++) {
+				ints[j] = j;
+			}
+			int[] inInts = Arrays.copyOf(ints, ints.length); // copy, just to make sure that the code is not cheating by changing the ints.
+			ek = new ExpressionKeys<Tuple7<String, String, String, String, String, String, String>>(inInts, typeInfo);
+			Assert.assertArrayEquals(ints, ek.computeLogicalKeyPositions());
+			Assert.assertEquals(ints.length, ek.computeLogicalKeyPositions().length);
+			
+			ArrayUtils.reverse(ints);
+			inInts = Arrays.copyOf(ints, ints.length);
+			ek = new ExpressionKeys<Tuple7<String, String, String, String, String, String, String>>(inInts, typeInfo);
+			Assert.assertArrayEquals(ints, ek.computeLogicalKeyPositions());
+			Assert.assertEquals(ints.length, ek.computeLogicalKeyPositions().length);
+		}
+	}
+	
+	@Test 
+	public void testInvalid() throws Throwable {
+		TupleTypeInfo<Tuple3<String, Tuple3<String, String, String>, String>> typeInfo = new TupleTypeInfo<Tuple3<String,Tuple3<String,String,String>,String>>(
+				BasicTypeInfo.STRING_TYPE_INFO,
+				new TupleTypeInfo<Tuple3<String, String, String>>(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO),
+				BasicTypeInfo.STRING_TYPE_INFO);
+		ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>> fpk;
+		
+		String[][] tests = new String[][] {
+				new String[] {"f11"},new String[] {"f-35"}, new String[] {"f0.f33"}, new String[] {"f1.f33"}
+		};
+		for(int i = 0; i < tests.length; i++) {
+			Throwable e = null;
+			try {
+				fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(tests[i], typeInfo);
+			} catch(Throwable t) {
+				// System.err.println("Message: "+t.getMessage()); t.printStackTrace();
+				e = t;	
+			}
+			Assert.assertNotNull(e);
+		}
+	}
+	
+	@Test
+	public void testTupleKeyExpansion() {
+		TupleTypeInfo<Tuple3<String, Tuple3<String, String, String>, String>> typeInfo = new TupleTypeInfo<Tuple3<String,Tuple3<String,String,String>,String>>(
+				BasicTypeInfo.STRING_TYPE_INFO,
+				new TupleTypeInfo<Tuple3<String, String, String>>(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO),
+				BasicTypeInfo.STRING_TYPE_INFO);
+		ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>> fpk = 
+				new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(new int[] {0}, typeInfo);
+		Assert.assertArrayEquals(new int[] {0}, fpk.computeLogicalKeyPositions());
+		
+		fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(new int[] {1}, typeInfo);
+		Assert.assertArrayEquals(new int[] {1,2,3}, fpk.computeLogicalKeyPositions());
+		
+		fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(new int[] {2}, typeInfo);
+		Assert.assertArrayEquals(new int[] {4}, fpk.computeLogicalKeyPositions());
+		
+		fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(new int[] {0,1,2}, typeInfo);
+		Assert.assertArrayEquals(new int[] {0,1,2,3,4}, fpk.computeLogicalKeyPositions());
+		
+		fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(null, typeInfo, true); // empty case
+		Assert.assertArrayEquals(new int[] {0,1,2,3,4}, fpk.computeLogicalKeyPositions());
+		
+		// duplicate elimination
+		fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(new int[] {0,1,1,1,2}, typeInfo);
+		Assert.assertArrayEquals(new int[] {0,1,2,3,4}, fpk.computeLogicalKeyPositions());
+		
+		fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(new String[] {"*"}, typeInfo);
+		Assert.assertArrayEquals(new int[] {0,1,2,3,4}, fpk.computeLogicalKeyPositions());
+		
+		// this was a bug:
+		fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(new String[] {"f2"}, typeInfo);
+		Assert.assertArrayEquals(new int[] {4}, fpk.computeLogicalKeyPositions());
+		
+		fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(new String[] {"f0","f1.f0","f1.f1", "f1.f2", "f2"}, typeInfo);
+		Assert.assertArrayEquals(new int[] {0,1,2,3,4}, fpk.computeLogicalKeyPositions());
+		
+		fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(new String[] {"f0","f1.f0","f1.f1", "f2"}, typeInfo);
+		Assert.assertArrayEquals(new int[] {0,1,2,4}, fpk.computeLogicalKeyPositions());
+		
+		fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(new String[] {"f2", "f0"}, typeInfo);
+		Assert.assertArrayEquals(new int[] {4,0}, fpk.computeLogicalKeyPositions());
+		
+		// duplicate elimination
+		fpk = new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(new String[] {"f2","f2","f2", "f0"}, typeInfo);
+		Assert.assertArrayEquals(new int[] {4,0}, fpk.computeLogicalKeyPositions());
+		
+		
+		TupleTypeInfo<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>> complexTypeInfo = new TupleTypeInfo<Tuple3<String,Tuple3<Tuple3<String, String, String>,String,String>,String>>(
+				BasicTypeInfo.STRING_TYPE_INFO,
+				new TupleTypeInfo<Tuple3<Tuple3<String, String, String>, String, String>>(new TupleTypeInfo<Tuple3<String, String, String>>(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO),BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO),
+				BasicTypeInfo.STRING_TYPE_INFO);
+		
+		ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>> complexFpk = 
+		new ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>>(new int[] {0}, complexTypeInfo);
+		Assert.assertArrayEquals(new int[] {0}, complexFpk.computeLogicalKeyPositions());
+		
+		complexFpk = new ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>>(new int[] {0,1,2}, complexTypeInfo);
+		Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6}, complexFpk.computeLogicalKeyPositions());
+		
+		complexFpk = new ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] {"*"}, complexTypeInfo);
+		Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6}, complexFpk.computeLogicalKeyPositions());
+		
+		complexFpk = new ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] {"f1.f0.*"}, complexTypeInfo);
+		Assert.assertArrayEquals(new int[] {1,2,3}, complexFpk.computeLogicalKeyPositions());
+		
+		complexFpk = new ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] {"f2"}, complexTypeInfo);
+		Assert.assertArrayEquals(new int[] {6}, complexFpk.computeLogicalKeyPositions());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeInformationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeInformationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeInformationTest.java
index 62f1fad..c1cb249 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeInformationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeInformationTest.java
@@ -22,82 +22,81 @@ package org.apache.flink.api.java.type.extractor;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.junit.Ignore;
+import org.junit.Assert;
 import org.junit.Test;
 
 @SuppressWarnings("unused")
 public class PojoTypeInformationTest {
 
-	static class SimplePojo {
-		String str;
-		Boolean Bl;
-		boolean bl;
-		Byte Bt;
-		byte bt;
-		Short Shrt;
-		short shrt;
-		Integer Intgr;
-		int intgr;
-		Long Lng;
-		long lng;
-		Float Flt;
-		float flt;
-		Double Dbl;
-		double dbl;
-		Character Ch;
-		char ch;
-		int[] primIntArray;
-		Integer[] intWrapperArray;
+	public static class SimplePojo {
+		public String str;
+		public Boolean Bl;
+		public boolean bl;
+		public Byte Bt;
+		public byte bt;
+		public Short Shrt;
+		public short shrt;
+		public Integer Intgr;
+		public int intgr;
+		public Long Lng;
+		public long lng;
+		public Float Flt;
+		public float flt;
+		public Double Dbl;
+		public double dbl;
+		public Character Ch;
+		public char ch;
+		public int[] primIntArray;
+		public Integer[] intWrapperArray;
 	}
 
-	@Ignore
 	@Test
 	public void testSimplePojoTypeExtraction() {
 		TypeInformation<SimplePojo> type = TypeExtractor.getForClass(SimplePojo.class);
-		assertTrue("Extracted type is not a Pojo type but should be.", type instanceof PojoTypeInfo);
+		assertTrue("Extracted type is not a composite/pojo type but should be.", type instanceof CompositeType);
 	}
 
-	static class NestedPojoInner {
-		private String field;
+	public static class NestedPojoInner {
+		public String field;
 	}
 
-	static class NestedPojoOuter {
-		private Integer intField;
-		NestedPojoInner inner;
+	public static class NestedPojoOuter {
+		public Integer intField;
+		public NestedPojoInner inner;
 	}
 
-	@Ignore
 	@Test
 	public void testNestedPojoTypeExtraction() {
 		TypeInformation<NestedPojoOuter> type = TypeExtractor.getForClass(NestedPojoOuter.class);
-		assertTrue("Extracted type is not a Pojo type but should be.", type instanceof PojoTypeInfo);
+		assertTrue("Extracted type is not a Pojo type but should be.", type instanceof CompositeType);
 	}
 
-	static class Recursive1Pojo {
-		private Integer intField;
-		Recursive2Pojo rec;
+	public static class Recursive1Pojo {
+		public Integer intField;
+		public Recursive2Pojo rec;
 	}
 
-	static class Recursive2Pojo {
-		private String strField;
-		Recursive1Pojo rec;
+	public static class Recursive2Pojo {
+		public String strField;
+		public Recursive1Pojo rec;
 	}
 
-	@Ignore
 	@Test
 	public void testRecursivePojoTypeExtraction() {
 		// This one tests whether a recursive pojo is detected using the set of visited
 		// types in the type extractor. The recursive field will be handled using the generic serializer.
 		TypeInformation<Recursive1Pojo> type = TypeExtractor.getForClass(Recursive1Pojo.class);
-		assertTrue("Extracted type is not a Pojo type but should be.", type instanceof PojoTypeInfo);
+		assertTrue("Extracted type is not a Pojo type but should be.", type instanceof CompositeType);
 	}
-
-	@Ignore
+	
 	@Test
 	public void testRecursivePojoObjectTypeExtraction() {
 		TypeInformation<Recursive1Pojo> type = TypeExtractor.getForObject(new Recursive1Pojo());
-		assertTrue("Extracted type is not a Pojo type but should be.", type instanceof PojoTypeInfo);
+		assertTrue("Extracted type is not a Pojo type but should be.", type instanceof CompositeType);
 	}
+	
 }


Mime
View raw message