flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [19/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
Date Tue, 02 Feb 2016 17:23:19 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
new file mode 100644
index 0000000..c0c7797
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Field;
+import java.util.List;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NullKeyFieldException;
+import org.apache.flink.util.InstantiationUtil;
+
+
+public final class PojoComparator<T> extends CompositeTypeComparator<T> implements java.io.Serializable {
+	
+	private static final long serialVersionUID = 1L;
+
+	// Reflection fields for the comp fields
+	private transient Field[] keyFields;
+
+	private final TypeComparator<Object>[] comparators;
+
+	private final int[] normalizedKeyLengths;
+
+	private final int numLeadingNormalizableKeys;
+
+	private final int normalizableKeyPrefixLen;
+
+	private final boolean invertNormKey;
+
+	private TypeSerializer<T> serializer;
+
+	private final Class<T> type;
+
+	@SuppressWarnings("unchecked")
+	public PojoComparator(Field[] keyFields, TypeComparator<?>[] comparators, TypeSerializer<T> serializer, Class<T> type) {
+		this.keyFields = keyFields;
+		this.comparators = (TypeComparator<Object>[]) comparators;
+
+		this.type = type;
+		this.serializer = serializer;
+
+		// set up auxiliary fields for normalized key support
+		this.normalizedKeyLengths = new int[keyFields.length];
+		int nKeys = 0;
+		int nKeyLen = 0;
+		boolean inverted = false;
+
+		for (int i = 0; i < this.comparators.length; i++) {
+			TypeComparator<?> k = this.comparators[i];
+			if(k == null) {
+				throw new IllegalArgumentException("One of the passed comparators is null");
+			}
+			if(keyFields[i] == null) {
+				throw new IllegalArgumentException("One of the passed reflection fields is null");
+			}
+
+			// as long as the leading keys support normalized keys, we can build up the composite key
+			if (k.supportsNormalizedKey()) {
+				if (i == 0) {
+					// the first comparator decides whether we need to invert the key direction
+					inverted = k.invertNormalizedKey();
+				}
+				else if (k.invertNormalizedKey() != inverted) {
+					// if a successor does not agree on the invertion direction, it cannot be part of the normalized key
+					break;
+				}
+
+				nKeys++;
+				final int len = k.getNormalizeKeyLen();
+				if (len < 0) {
+					throw new RuntimeException("Comparator " + k.getClass().getName() + " specifies an invalid length for the normalized key: " + len);
+				}
+				this.normalizedKeyLengths[i] = len;
+				nKeyLen += this.normalizedKeyLengths[i];
+
+				if (nKeyLen < 0) {
+					// overflow, which means we are out of budget for normalized key space anyways
+					nKeyLen = Integer.MAX_VALUE;
+					break;
+				}
+			} else {
+				break;
+			}
+		}
+		this.numLeadingNormalizableKeys = nKeys;
+		this.normalizableKeyPrefixLen = nKeyLen;
+		this.invertNormKey = inverted;
+	}
+
+	@SuppressWarnings("unchecked")
+	private PojoComparator(PojoComparator<T> toClone) {
+		this.keyFields = toClone.keyFields;
+		this.comparators = new TypeComparator[toClone.comparators.length];
+
+		for (int i = 0; i < toClone.comparators.length; i++) {
+			this.comparators[i] = toClone.comparators[i].duplicate();
+		}
+
+		this.normalizedKeyLengths = toClone.normalizedKeyLengths;
+		this.numLeadingNormalizableKeys = toClone.numLeadingNormalizableKeys;
+		this.normalizableKeyPrefixLen = toClone.normalizableKeyPrefixLen;
+		this.invertNormKey = toClone.invertNormKey;
+
+		this.type = toClone.type;
+
+		try {
+			this.serializer = (TypeSerializer<T>) InstantiationUtil.deserializeObject(
+					InstantiationUtil.serializeObject(toClone.serializer), Thread.currentThread().getContextClassLoader());
+		} catch (IOException e) {
+			throw new RuntimeException("Cannot copy serializer", e);
+		} catch (ClassNotFoundException e) {
+			throw new RuntimeException("Cannot copy serializer", e);
+		}
+	}
+
+	private void writeObject(ObjectOutputStream out)
+			throws IOException, ClassNotFoundException {
+		out.defaultWriteObject();
+		out.writeInt(keyFields.length);
+		for (Field field: keyFields) {
+			out.writeObject(field.getDeclaringClass());
+			out.writeUTF(field.getName());
+		}
+	}
+
+	private void readObject(ObjectInputStream in)
+			throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		int numKeyFields = in.readInt();
+		keyFields = new Field[numKeyFields];
+		for (int i = 0; i < numKeyFields; i++) {
+			Class<?> clazz = (Class<?>) in.readObject();
+			String fieldName = in.readUTF();
+			// try superclasses as well
+			while (clazz != null) {
+				try {
+					Field field = clazz.getDeclaredField(fieldName);
+					field.setAccessible(true);
+					keyFields[i] = field;
+					break;
+				} catch (NoSuchFieldException e) {
+					clazz = clazz.getSuperclass();
+				}
+			}
+			if (keyFields[i] == null ) {
+				throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup."
+						+ " (" + fieldName + ")");
+			}
+		}
+	}
+
+	public Field[] getKeyFields() {
+		return this.keyFields;
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Override
+	public void getFlatComparator(List<TypeComparator> flatComparators) {
+		for(int i = 0; i < comparators.length; i++) {
+			if(comparators[i] instanceof CompositeTypeComparator) {
+				((CompositeTypeComparator)comparators[i]).getFlatComparator(flatComparators);
+			} else {
+				flatComparators.add(comparators[i]);
+			}
+		}
+	}
+	
+	/**
+	 * This method is handling the IllegalAccess exceptions of Field.get()
+	 */
+	public final Object accessField(Field field, Object object) {
+		try {
+			object = field.get(object);
+		} catch (NullPointerException npex) {
+			throw new NullKeyFieldException("Unable to access field "+field+" on object "+object);
+		} catch (IllegalAccessException iaex) {
+			throw new RuntimeException("This should not happen since we call setAccesssible(true) in PojoTypeInfo."
+			+ " fields: " + field + " obj: " + object);
+		}
+		return object;
+	}
+
+	@Override
+	public int hash(T value) {
+		int i = 0;
+		int code = 0;
+		for (; i < this.keyFields.length; i++) {
+			code *= TupleComparatorBase.HASH_SALT[i & 0x1F];
+			try {
+				code += this.comparators[i].hash(accessField(keyFields[i], value));
+			}catch(NullPointerException npe) {
+				throw new RuntimeException("A NullPointerException occured while accessing a key field in a POJO. " +
+						"Most likely, the value grouped/joined on is null. Field name: "+keyFields[i].getName(), npe);
+			}
+		}
+		return code;
+
+	}
+
+	@Override
+	public void setReference(T toCompare) {
+		int i = 0;
+		for (; i < this.keyFields.length; i++) {
+			this.comparators[i].setReference(accessField(keyFields[i], toCompare));
+		}
+	}
+
+	@Override
+	public boolean equalToReference(T candidate) {
+		int i = 0;
+		for (; i < this.keyFields.length; i++) {
+			if (!this.comparators[i].equalToReference(accessField(keyFields[i], candidate))) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<T> referencedComparator) {
+		PojoComparator<T> other = (PojoComparator<T>) referencedComparator;
+
+		int i = 0;
+		try {
+			for (; i < this.keyFields.length; i++) {
+				int cmp = this.comparators[i].compareToReference(other.comparators[i]);
+				if (cmp != 0) {
+					return cmp;
+				}
+			}
+			return 0;
+		}
+		catch (NullPointerException npex) {
+			throw new NullKeyFieldException(this.keyFields[i].toString());
+		}
+	}
+
+	@Override
+	public int compare(T first, T second) {
+		int i = 0;
+		for (; i < keyFields.length; i++) {
+			int cmp = comparators[i].compare(accessField(keyFields[i], first), accessField(keyFields[i], second));
+			if (cmp != 0) {
+				return cmp;
+			}
+		}
+
+		return 0;
+	}
+
+	
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		T first = this.serializer.createInstance();
+		T second = this.serializer.createInstance();
+
+		first = this.serializer.deserialize(first, firstSource);
+		second = this.serializer.deserialize(second, secondSource);
+
+		return this.compare(first, second);
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return this.numLeadingNormalizableKeys > 0;
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return this.normalizableKeyPrefixLen;
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return this.numLeadingNormalizableKeys < this.keyFields.length ||
+				this.normalizableKeyPrefixLen == Integer.MAX_VALUE ||
+				this.normalizableKeyPrefixLen > keyBytes;
+	}
+
+	@Override
+	public void putNormalizedKey(T value, MemorySegment target, int offset, int numBytes) {
+		int i = 0;
+		for (; i < this.numLeadingNormalizableKeys & numBytes > 0; i++)
+		{
+			int len = this.normalizedKeyLengths[i];
+			len = numBytes >= len ? len : numBytes;
+			this.comparators[i].putNormalizedKey(accessField(keyFields[i], value), target, offset, len);
+			numBytes -= len;
+			offset += len;
+		}
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return this.invertNormKey;
+	}
+
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public PojoComparator<T> duplicate() {
+		return new PojoComparator<T>(this);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		int localIndex = index;
+		for (int i = 0; i < comparators.length; i++) {
+			localIndex += comparators[i].extractKeys(accessField(keyFields[i], record), target, localIndex);
+		}
+		return localIndex - index;
+	}
+
+	// --------------------------------------------------------------------------------------------
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
new file mode 100644
index 0000000..de24956
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+
+public final class PojoSerializer<T> extends TypeSerializer<T> {
+
+	// Flags for the header
+	private static byte IS_NULL = 1;
+	private static byte NO_SUBCLASS = 2;
+	private static byte IS_SUBCLASS = 4;
+	private static byte IS_TAGGED_SUBCLASS = 8;
+
+	private static final long serialVersionUID = 1L;
+
+	private final Class<T> clazz;
+
+	private final TypeSerializer<Object>[] fieldSerializers;
+
+	private final int numFields;
+
+	private final Map<Class<?>, Integer> registeredClasses;
+
+	private final TypeSerializer<?>[] registeredSerializers;
+
+	private final ExecutionConfig executionConfig;
+
+	private transient Map<Class<?>, TypeSerializer<?>> subclassSerializerCache;
+	private transient ClassLoader cl;
+	// We need to handle these ourselves in writeObject()/readObject()
+	private transient Field[] fields;
+
+	@SuppressWarnings("unchecked")
+	public PojoSerializer(
+			Class<T> clazz,
+			TypeSerializer<?>[] fieldSerializers,
+			Field[] fields,
+			ExecutionConfig executionConfig) {
+
+		this.clazz = Preconditions.checkNotNull(clazz);
+		this.fieldSerializers = (TypeSerializer<Object>[]) Preconditions.checkNotNull(fieldSerializers);
+		this.fields = Preconditions.checkNotNull(fields);
+		this.numFields = fieldSerializers.length;
+		this.executionConfig = Preconditions.checkNotNull(executionConfig);
+
+		LinkedHashSet<Class<?>> registeredPojoTypes = executionConfig.getRegisteredPojoTypes();
+
+		for (int i = 0; i < numFields; i++) {
+			this.fields[i].setAccessible(true);
+		}
+
+		cl = Thread.currentThread().getContextClassLoader();
+
+		subclassSerializerCache = new HashMap<Class<?>, TypeSerializer<?>>();
+
+		// We only want those classes that are not our own class and are actually sub-classes.
+		List<Class<?>> cleanedTaggedClasses = new ArrayList<Class<?>>(registeredPojoTypes.size());
+		for (Class<?> registeredClass: registeredPojoTypes) {
+			if (registeredClass.equals(clazz)) {
+				continue;
+			}
+			if (!clazz.isAssignableFrom(registeredClass)) {
+				continue;
+			}
+			cleanedTaggedClasses.add(registeredClass);
+
+		}
+		this.registeredClasses = new LinkedHashMap<Class<?>, Integer>(cleanedTaggedClasses.size());
+		registeredSerializers = new TypeSerializer[cleanedTaggedClasses.size()];
+
+		int id = 0;
+		for (Class<?> registeredClass: cleanedTaggedClasses) {
+			this.registeredClasses.put(registeredClass, id);
+			TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(registeredClass);
+			registeredSerializers[id] = typeInfo.createSerializer(executionConfig);
+
+			id++;
+		}
+	}
+
+	private void writeObject(ObjectOutputStream out)
+			throws IOException, ClassNotFoundException {
+		out.defaultWriteObject();
+		out.writeInt(fields.length);
+		for (Field field: fields) {
+			out.writeObject(field.getDeclaringClass());
+			out.writeUTF(field.getName());
+		}
+	}
+
+	private void readObject(ObjectInputStream in)
+			throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		int numFields = in.readInt();
+		fields = new Field[numFields];
+		for (int i = 0; i < numFields; i++) {
+			Class<?> clazz = (Class<?>)in.readObject();
+			String fieldName = in.readUTF();
+			fields[i] = null;
+			// try superclasses as well
+			while (clazz != null) {
+				try {
+					fields[i] = clazz.getDeclaredField(fieldName);
+					fields[i].setAccessible(true);
+					break;
+				} catch (NoSuchFieldException e) {
+					clazz = clazz.getSuperclass();
+				}
+			}
+			if (fields[i] == null) {
+				throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup."
+						+ " (" + fieldName + ")");
+			}
+		}
+
+		cl = Thread.currentThread().getContextClassLoader();
+		subclassSerializerCache = new HashMap<Class<?>, TypeSerializer<?>>();
+	}
+
+	private TypeSerializer<?> getSubclassSerializer(Class<?> subclass) {
+		TypeSerializer<?> result = subclassSerializerCache.get(subclass);
+		if (result == null) {
+
+			TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(subclass);
+			result = typeInfo.createSerializer(executionConfig);
+			if (result instanceof PojoSerializer) {
+				PojoSerializer<?> subclassSerializer = (PojoSerializer<?>) result;
+				subclassSerializer.copyBaseFieldOrder(this);
+			}
+			subclassSerializerCache.put(subclass, result);
+
+		}
+		return result;
+	}
+
+	@SuppressWarnings("unused")
+	private boolean hasField(Field f) {
+		for (Field field: fields) {
+			if (f.equals(field)) {
+				return true;
+			}
+		}
+		return false;
+	}
+
+	private void copyBaseFieldOrder(PojoSerializer<?> baseSerializer) {
+		// do nothing for now, but in the future, adapt subclass serializer to have same
+		// ordering as base class serializer so that binary comparison on base class fields
+		// can work
+	}
+	
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public PojoSerializer<T> duplicate() {
+		boolean stateful = false;
+		TypeSerializer<?>[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length];
+
+		for (int i = 0; i < fieldSerializers.length; i++) {
+			duplicateFieldSerializers[i] = fieldSerializers[i].duplicate();
+			if (duplicateFieldSerializers[i] != fieldSerializers[i]) {
+				// at least one of them is stateful
+				stateful = true;
+			}
+		}
+
+		if (stateful) {
+			return new PojoSerializer<T>(clazz, duplicateFieldSerializers, fields, executionConfig);
+		} else {
+			return this;
+		}
+	}
+
+	
+	@Override
+	public T createInstance() {
+		if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) {
+			return null;
+		}
+		try {
+			T t = clazz.newInstance();
+			initializeFields(t);
+			return t;
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Cannot instantiate class.", e);
+		}
+	}
+
+	protected void initializeFields(T t) {
+		for (int i = 0; i < numFields; i++) {
+			try {
+				fields[i].set(t, fieldSerializers[i].createInstance());
+			} catch (IllegalAccessException e) {
+				throw new RuntimeException("Cannot initialize fields.", e);
+			}
+		}
+	}
+
+	@Override
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	public T copy(T from) {
+		if (from == null) {
+			return null;
+		}
+
+		Class<?> actualType = from.getClass();
+		if (actualType == clazz) {
+			T target;
+			try {
+				target = (T) from.getClass().newInstance();
+			}
+			catch (Throwable t) {
+				throw new RuntimeException("Cannot instantiate class.", t);
+			}
+			// no subclass
+			try {
+				for (int i = 0; i < numFields; i++) {
+					Object value = fields[i].get(from);
+					if (value != null) {
+						Object copy = fieldSerializers[i].copy(value);
+						fields[i].set(target, copy);
+					}
+					else {
+						fields[i].set(target, null);
+					}
+				}
+			} catch (IllegalAccessException e) {
+				throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.");
+
+			}
+			return target;
+		} else {
+			// subclass
+			TypeSerializer subclassSerializer = getSubclassSerializer(actualType);
+			return (T) subclassSerializer.copy(from);
+		}
+	}
+	
+	@Override
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	public T copy(T from, T reuse) {
+		if (from == null) {
+			return null;
+		}
+
+		Class<?> actualType = from.getClass();
+		if (reuse == null || actualType != reuse.getClass()) {
+			// cannot reuse, do a non-reuse copy
+			return copy(from);
+		}
+
+		if (actualType == clazz) {
+			try {
+				for (int i = 0; i < numFields; i++) {
+					Object value = fields[i].get(from);
+					if (value != null) {
+						Object reuseValue = fields[i].get(reuse);
+						Object copy;
+						if(reuseValue != null) {
+							copy = fieldSerializers[i].copy(value, reuseValue);
+						}
+						else {
+							copy = fieldSerializers[i].copy(value);
+						}
+						fields[i].set(reuse, copy);
+					}
+					else {
+						fields[i].set(reuse, null);
+					}
+				}
+			} catch (IllegalAccessException e) {
+				throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before.");
+			}
+		} else {
+			TypeSerializer subclassSerializer = getSubclassSerializer(actualType);
+			reuse = (T) subclassSerializer.copy(from, reuse);
+		}
+
+		return reuse;
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+
+	@Override
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	public void serialize(T value, DataOutputView target) throws IOException {
+		int flags = 0;
+		// handle null values
+		if (value == null) {
+			flags |= IS_NULL;
+			target.writeByte(flags);
+			return;
+		}
+
+		Integer subclassTag = -1;
+		Class<?> actualClass = value.getClass();
+		TypeSerializer subclassSerializer = null;
+		if (clazz != actualClass) {
+			subclassTag = registeredClasses.get(actualClass);
+			if (subclassTag != null) {
+				flags |= IS_TAGGED_SUBCLASS;
+				subclassSerializer = registeredSerializers[subclassTag];
+			} else {
+				flags |= IS_SUBCLASS;
+				subclassSerializer = getSubclassSerializer(actualClass);
+			}
+		} else {
+			flags |= NO_SUBCLASS;
+		}
+
+		target.writeByte(flags);
+
+		if ((flags & IS_SUBCLASS) != 0) {
+			target.writeUTF(actualClass.getName());
+		} else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
+			target.writeByte(subclassTag);
+		}
+
+
+		if ((flags & NO_SUBCLASS) != 0) {
+			try {
+				for (int i = 0; i < numFields; i++) {
+					Object o = fields[i].get(value);
+					if (o == null) {
+						target.writeBoolean(true); // null field handling
+					} else {
+						target.writeBoolean(false);
+						fieldSerializers[i].serialize(o, target);
+					}
+				}
+			} catch (IllegalAccessException e) {
+				throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before.");
+
+			}
+		} else {
+			// subclass
+			if (subclassSerializer != null) {
+				subclassSerializer.serialize(value, target);
+			}
+		}
+	}
+
+	@Override
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	public T deserialize(DataInputView source) throws IOException {
+		int flags = source.readByte();
+		if((flags & IS_NULL) != 0) {
+			return null;
+		}
+
+		T target;
+
+		Class<?> actualSubclass = null;
+		TypeSerializer subclassSerializer = null;
+
+		if ((flags & IS_SUBCLASS) != 0) {
+			String subclassName = source.readUTF();
+			try {
+				actualSubclass = Class.forName(subclassName, true, cl);
+			} catch (ClassNotFoundException e) {
+				throw new RuntimeException("Cannot instantiate class.", e);
+			}
+			subclassSerializer = getSubclassSerializer(actualSubclass);
+			target = (T) subclassSerializer.createInstance();
+			// also initialize fields for which the subclass serializer is not responsible
+			initializeFields(target);
+		} else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
+
+			int subclassTag = source.readByte();
+			subclassSerializer = registeredSerializers[subclassTag];
+			target = (T) subclassSerializer.createInstance();
+			// also initialize fields for which the subclass serializer is not responsible
+			initializeFields(target);
+		} else {
+			target = createInstance();
+		}
+
+		if ((flags & NO_SUBCLASS) != 0) {
+			try {
+				for (int i = 0; i < numFields; i++) {
+					boolean isNull = source.readBoolean();
+					if (isNull) {
+						fields[i].set(target, null);
+					} else {
+						Object field = fieldSerializers[i].deserialize(source);
+						fields[i].set(target, field);
+					}
+				}
+			} catch (IllegalAccessException e) {
+				throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before.");
+
+			}
+		} else {
+			if (subclassSerializer != null) {
+				target = (T) subclassSerializer.deserialize(target, source);
+			}
+		}
+		return target;
+	}
+	
+	@Override
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	public T deserialize(T reuse, DataInputView source) throws IOException {
+
+		// handle null values
+		int flags = source.readByte();
+		if((flags & IS_NULL) != 0) {
+			return null;
+		}
+
+		Class<?> subclass = null;
+		TypeSerializer subclassSerializer = null;
+		if ((flags & IS_SUBCLASS) != 0) {
+			String subclassName = source.readUTF();
+			try {
+				subclass = Class.forName(subclassName, true, cl);
+			} catch (ClassNotFoundException e) {
+				throw new RuntimeException("Cannot instantiate class.", e);
+			}
+			subclassSerializer = getSubclassSerializer(subclass);
+
+			if (reuse == null || subclass != reuse.getClass()) {
+				// cannot reuse
+				reuse = (T) subclassSerializer.createInstance();
+				// also initialize fields for which the subclass serializer is not responsible
+				initializeFields(reuse);
+			}
+		} else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
+			int subclassTag = source.readByte();
+			subclassSerializer = registeredSerializers[subclassTag];
+
+			if (reuse == null || ((PojoSerializer)subclassSerializer).clazz != reuse.getClass()) {
+				// cannot reuse
+				reuse = (T) subclassSerializer.createInstance();
+				// also initialize fields for which the subclass serializer is not responsible
+				initializeFields(reuse);
+			}
+		} else {
+			if (reuse == null || clazz != reuse.getClass()) {
+				reuse = createInstance();
+			}
+		}
+
+		if ((flags & NO_SUBCLASS) != 0) {
+			try {
+				for (int i = 0; i < numFields; i++) {
+					boolean isNull = source.readBoolean();
+					if (isNull) {
+						fields[i].set(reuse, null);
+					} else {
+						Object field;
+
+						Object reuseField = fields[i].get(reuse);
+						if(reuseField != null) {
+							field = fieldSerializers[i].deserialize(reuseField, source);
+						}
+						else {
+							field = fieldSerializers[i].deserialize(source);
+						}
+
+						fields[i].set(reuse, field);
+					}
+				}
+			} catch (IllegalAccessException e) {
+				throw new RuntimeException(
+						"Error during POJO copy, this should not happen since we check the fields before.");
+			}
+		} else {
+			if (subclassSerializer != null) {
+				reuse = (T) subclassSerializer.deserialize(reuse, source);
+			}
+		}
+
+		return reuse;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		// copy the flags
+		int flags = source.readByte();
+		target.writeByte(flags);
+
+		if ((flags & IS_NULL) != 0) {
+			// is a null value, nothing further to copy
+			return;
+		}
+
+		TypeSerializer<?> subclassSerializer = null;
+		if ((flags & IS_SUBCLASS) != 0) {
+			String className = source.readUTF();
+			target.writeUTF(className);
+			try {
+				Class<?> subclass = Class.forName(className, true, Thread.currentThread()
+						.getContextClassLoader());
+				subclassSerializer = getSubclassSerializer(subclass);
+			} catch (ClassNotFoundException e) {
+				throw new RuntimeException("Cannot instantiate class.", e);
+			}
+		} else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
+			int subclassTag = source.readByte();
+			target.writeByte(subclassTag);
+			subclassSerializer = registeredSerializers[subclassTag];
+		}
+
+		if ((flags & NO_SUBCLASS) != 0) {
+			for (int i = 0; i < numFields; i++) {
+				boolean isNull = source.readBoolean();
+				target.writeBoolean(isNull);
+				if (!isNull) {
+					fieldSerializers[i].copy(source, target);
+				}
+			}
+		} else {
+			if (subclassSerializer != null) {
+				subclassSerializer.copy(source, target);
+			}
+		}
+	}
+	
+	@Override
+	public int hashCode() {
+		return 31 * (31 * Arrays.hashCode(fieldSerializers) + Arrays.hashCode(registeredSerializers)) +
+			Objects.hash(clazz, numFields, registeredClasses);
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof PojoSerializer) {
+			PojoSerializer<?> other = (PojoSerializer<?>) obj;
+
+			return other.canEqual(this) &&
+				clazz == other.clazz &&
+				Arrays.equals(fieldSerializers, other.fieldSerializers) &&
+				Arrays.equals(registeredSerializers, other.registeredSerializers) &&
+				numFields == other.numFields &&
+				registeredClasses.equals(other.registeredClasses);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof PojoSerializer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java
new file mode 100644
index 0000000..4b734a7
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.InstantiationUtil;
+
+public final class RuntimeComparatorFactory<T> implements TypeComparatorFactory<T>, java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+
+	private static final String CONFIG_KEY = "SER_DATA";
+
+	private TypeComparator<T> comparator;
+
+
+	public RuntimeComparatorFactory() {}
+
+	public RuntimeComparatorFactory(TypeComparator<T> comparator) {
+		this.comparator = comparator;
+	}
+
+	@Override
+	public void writeParametersToConfig(Configuration config) {
+		try {
+			InstantiationUtil.writeObjectToConfig(comparator, config, CONFIG_KEY);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Could not serialize comparator into the configuration.", e);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {
+		try {
+			comparator = (TypeComparator<T>) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY, cl);
+		}
+		catch (ClassNotFoundException e) {
+			throw e;
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Could not serialize serializer into the configuration.", e);
+		}
+	}
+
+	@Override
+	public TypeComparator<T> createComparator() {
+		if (comparator != null) {
+			return comparator;
+		} else {
+			throw new RuntimeException("ComparatorFactory has not been initialized from configuration.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
new file mode 100644
index 0000000..31e28f7
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
+
+public final class RuntimePairComparatorFactory<T1, T2>
+		implements TypePairComparatorFactory<T1, T2>, java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public TypePairComparator<T1, T2> createComparator12(
+			TypeComparator<T1> comparator1,
+			TypeComparator<T2> comparator2) {
+		return new GenericPairComparator<T1, T2>(comparator1, comparator2);
+	}
+
+	@Override
+	public TypePairComparator<T2, T1> createComparator21(
+			TypeComparator<T1> comparator1,
+			TypeComparator<T2> comparator2) {
+		return new GenericPairComparator<T2, T1>(comparator2, comparator1);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
new file mode 100644
index 0000000..96aff73
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.InstantiationUtil;
+
+public final class RuntimeSerializerFactory<T> implements TypeSerializerFactory<T>, java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
+	
+
+	private static final String CONFIG_KEY_SER = "SER_DATA";
+
+	private static final String CONFIG_KEY_CLASS = "CLASS_DATA";
+
+	
+	private TypeSerializer<T> serializer;
+
+	private boolean firstSerializer = true;
+
+	private Class<T> clazz;
+
+	// Because we read the class from the TaskConfig and instantiate ourselves
+	public RuntimeSerializerFactory() {}
+
+	public RuntimeSerializerFactory(TypeSerializer<T> serializer, Class<T> clazz) {
+		if (serializer == null || clazz == null) {
+			throw new NullPointerException();
+		}
+
+		this.clazz = clazz;
+		this.serializer = serializer;
+	}
+
+
+	@Override
+	public void writeParametersToConfig(Configuration config) {
+		try {
+			InstantiationUtil.writeObjectToConfig(clazz, config, CONFIG_KEY_CLASS);
+			InstantiationUtil.writeObjectToConfig(serializer, config, CONFIG_KEY_SER);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Could not serialize serializer into the configuration.", e);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {
+		if (config == null || cl == null) {
+			throw new NullPointerException();
+		}
+		
+		try {
+			this.clazz = (Class<T>) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl);
+			this.serializer = (TypeSerializer<T>)  InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_SER, cl);
+			firstSerializer = true;
+		}
+		catch (ClassNotFoundException e) {
+			throw e;
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Could not load deserializer from the configuration.", e);
+		}
+	}
+
+	@Override
+	public TypeSerializer<T> getSerializer() {
+		if (this.serializer != null) {
+			if (firstSerializer) {
+				firstSerializer = false;
+				return this.serializer;
+			} else {
+				return this.serializer.duplicate();
+			}
+		} else {
+			throw new RuntimeException("SerializerFactory has not been initialized from configuration.");
+		}
+	}
+
+	@Override
+	public Class<T> getDataType() {
+		return clazz;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return clazz.hashCode() ^ serializer.hashCode();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj != null && obj instanceof RuntimeSerializerFactory) {
+			RuntimeSerializerFactory<?> other = (RuntimeSerializerFactory<?>) obj;
+			
+			return this.clazz == other.clazz &&
+					this.serializer.equals(other.serializer);
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
new file mode 100644
index 0000000..a06ff1a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class Tuple0Serializer extends TupleSerializer<Tuple0> {
+	
+	private static final long serialVersionUID = 1278813169022975971L;
+
+	public static final Tuple0Serializer INSTANCE = new Tuple0Serializer();
+
+	// ------------------------------------------------------------------------
+	
+	private Tuple0Serializer() {
+		super(Tuple0.class, new TypeSerializer<?>[0]);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Tuple0Serializer duplicate() {
+		return this;
+	}
+
+	@Override
+	public Tuple0 createInstance() {
+		return Tuple0.INSTANCE;
+	}
+
+	@Override
+	public Tuple0 createInstance(Object[] fields) {
+		if (fields == null || fields.length == 0) {
+			return Tuple0.INSTANCE;
+		}
+
+		throw new UnsupportedOperationException(
+				"Tuple0 cannot take any data, as it has zero fields.");
+	}
+
+	@Override
+	public Tuple0 copy(Tuple0 from) {
+		return from;
+	}
+
+	@Override
+	public Tuple0 copy(Tuple0 from, Tuple0 reuse) {
+		return reuse;
+	}
+
+	@Override
+	public int getLength() {
+		return 1;
+	}
+
+	@Override
+	public void serialize(Tuple0 record, DataOutputView target) throws IOException {
+		target.writeByte(42);
+	}
+
+	@Override
+	public Tuple0 deserialize(DataInputView source) throws IOException {
+		source.readByte();
+		return Tuple0.INSTANCE;
+	}
+
+	@Override
+	public Tuple0 deserialize(Tuple0 reuse, DataInputView source) throws IOException {
+		source.readByte();
+		return reuse;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		target.writeByte(source.readByte());
+	}
+
+	// ------------------------------------------------------------------------
+
+
+	@Override
+	public int hashCode() {
+		return Tuple0Serializer.class.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof Tuple0Serializer) {
+			Tuple0Serializer other = (Tuple0Serializer) obj;
+
+			return other.canEqual(this);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof Tuple0Serializer;
+	}
+
+	@Override
+	public String toString() {
+		return "Tuple0Serializer";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
new file mode 100644
index 0000000..875ecc2
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.KeyFieldOutOfBoundsException;
+import org.apache.flink.types.NullFieldException;
+import org.apache.flink.types.NullKeyFieldException;
+
+
+public final class TupleComparator<T extends Tuple> extends TupleComparatorBase<T> {
+
+	private static final long serialVersionUID = 1L;
+	
+	public TupleComparator(int[] keyPositions, TypeComparator<?>[] comparators, TypeSerializer<?>[] serializers) {
+		super(keyPositions, comparators, serializers);
+	}
+	
+	private TupleComparator(TupleComparator<T> toClone) {
+		super(toClone);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Comparator Methods
+	// --------------------------------------------------------------------------------------------
+	
+	@SuppressWarnings("unchecked")
+	@Override
+	public int hash(T value) {
+		int i = 0;
+		try {
+			int code = this.comparators[0].hash(value.getFieldNotNull(keyPositions[0]));
+			for (i = 1; i < this.keyPositions.length; i++) {
+				code *= HASH_SALT[i & 0x1F]; // salt code with (i % HASH_SALT.length)-th salt component
+				code += this.comparators[i].hash(value.getFieldNotNull(keyPositions[i]));
+			}
+			return code;
+		}
+		catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
+		catch (IndexOutOfBoundsException iobex) {
+			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void setReference(T toCompare) {
+		int i = 0;
+		try {
+			for (; i < this.keyPositions.length; i++) {
+				this.comparators[i].setReference(toCompare.getFieldNotNull(this.keyPositions[i]));
+			}
+		}
+		catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
+		catch (IndexOutOfBoundsException iobex) {
+			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public boolean equalToReference(T candidate) {
+		int i = 0;
+		try {
+			for (; i < this.keyPositions.length; i++) {
+				if (!this.comparators[i].equalToReference(candidate.getFieldNotNull(this.keyPositions[i]))) {
+					return false;
+				}
+			}
+			return true;
+		}
+		catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
+		catch (IndexOutOfBoundsException iobex) {
+			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+		}
+	}
+	
+	@SuppressWarnings("unchecked")
+	@Override
+	public int compare(T first, T second) {
+		int i = 0;
+		try {
+			for (; i < keyPositions.length; i++) {
+				int keyPos = keyPositions[i];
+				int cmp = comparators[i].compare(first.getFieldNotNull(keyPos), second.getFieldNotNull(keyPos));
+
+				if (cmp != 0) {
+					return cmp;
+				}
+			}
+			return 0;
+		} 
+		catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		}
+		catch (IndexOutOfBoundsException iobex) {
+			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void putNormalizedKey(T value, MemorySegment target, int offset, int numBytes) {
+		int i = 0;
+		try {
+			for (; i < this.numLeadingNormalizableKeys && numBytes > 0; i++) {
+				int len = this.normalizedKeyLengths[i];
+				len = numBytes >= len ? len : numBytes;
+				this.comparators[i].putNormalizedKey(value.getFieldNotNull(this.keyPositions[i]), target, offset, len);
+				numBytes -= len;
+				offset += len;
+			}
+		} catch (NullFieldException nfex) {
+			throw new NullKeyFieldException(nfex);
+		} catch (NullPointerException npex) {
+			throw new NullKeyFieldException(this.keyPositions[i]);
+		}
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		int localIndex = index;
+		for(int i = 0; i < comparators.length; i++) {
+			localIndex += comparators[i].extractKeys(((Tuple) record).getField(keyPositions[i]), target, localIndex);
+		}
+		return localIndex - index;
+	}
+
+	public TypeComparator<T> duplicate() {
+		return new TupleComparator<T>(this);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
new file mode 100644
index 0000000..28169e5
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.KeyFieldOutOfBoundsException;
+import org.apache.flink.types.NullKeyFieldException;
+
+
+public abstract class TupleComparatorBase<T> extends CompositeTypeComparator<T> implements java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	/** key positions describe which fields are keys in what order */
+	protected int[] keyPositions;
+
+	/** comparators for the key fields, in the same order as the key fields */
+	@SuppressWarnings("rawtypes")
+	protected TypeComparator[] comparators;
+
+	protected int[] normalizedKeyLengths;
+
+	protected int numLeadingNormalizableKeys;
+
+	protected int normalizableKeyPrefixLen;
+
+	protected boolean invertNormKey;
+
+
+	/** serializers to deserialize the first n fields for comparison */
+	@SuppressWarnings("rawtypes")
+	protected TypeSerializer[] serializers;
+
+	// cache for the deserialized field objects
+	protected transient Object[] deserializedFields1;
+	protected transient Object[] deserializedFields2;
+
+
+	@SuppressWarnings("unchecked")
+	public TupleComparatorBase(int[] keyPositions, TypeComparator<?>[] comparators, TypeSerializer<?>[] serializers) {
+		// set the default utils
+		this.keyPositions = keyPositions;
+		this.comparators = (TypeComparator<Object>[]) comparators;
+		this.serializers = (TypeSerializer<Object>[]) serializers;
+
+		// set up auxiliary fields for normalized key support
+		this.normalizedKeyLengths = new int[keyPositions.length];
+		int nKeys = 0;
+		int nKeyLen = 0;
+		boolean inverted = false;
+
+		for (int i = 0; i < this.keyPositions.length; i++) {
+			TypeComparator<?> k = this.comparators[i];
+
+			// as long as the leading keys support normalized keys, we can build up the composite key
+			if (k.supportsNormalizedKey()) {
+				if (i == 0) {
+					// the first comparator decides whether we need to invert the key direction
+					inverted = k.invertNormalizedKey();
+				}
+				else if (k.invertNormalizedKey() != inverted) {
+					// if a successor does not agree on the inversion direction, it cannot be part of the normalized key
+					break;
+				}
+
+				nKeys++;
+				final int len = k.getNormalizeKeyLen();
+				if (len < 0) {
+					throw new RuntimeException("Comparator " + k.getClass().getName() + " specifies an invalid length for the normalized key: " + len);
+				}
+				this.normalizedKeyLengths[i] = len;
+				nKeyLen += len;
+
+				if (nKeyLen < 0) {
+					// overflow, which means we are out of budget for normalized key space anyways
+					nKeyLen = Integer.MAX_VALUE;
+					break;
+				}
+			} else {
+				break;
+			}
+		}
+		this.numLeadingNormalizableKeys = nKeys;
+		this.normalizableKeyPrefixLen = nKeyLen;
+		this.invertNormKey = inverted;
+	}
+
+	protected TupleComparatorBase(TupleComparatorBase<T> toClone) {
+		privateDuplicate(toClone);
+	}
+
+	// We need this because we cannot call the cloning constructor from the
+	// ScalaTupleComparator
+	protected void privateDuplicate(TupleComparatorBase<T> toClone) {
+		// copy fields and serializer factories
+		this.keyPositions = toClone.keyPositions;
+
+		this.serializers = new TypeSerializer[toClone.serializers.length];
+		for (int i = 0; i < toClone.serializers.length; i++) {
+			this.serializers[i] = toClone.serializers[i].duplicate();
+		}
+
+		this.comparators = new TypeComparator[toClone.comparators.length];
+		for (int i = 0; i < toClone.comparators.length; i++) {
+			this.comparators[i] = toClone.comparators[i].duplicate();
+		}
+
+		this.normalizedKeyLengths = toClone.normalizedKeyLengths;
+		this.numLeadingNormalizableKeys = toClone.numLeadingNormalizableKeys;
+		this.normalizableKeyPrefixLen = toClone.normalizableKeyPrefixLen;
+		this.invertNormKey = toClone.invertNormKey;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Comparator Methods
+	// --------------------------------------------------------------------------------------------
+	
+	protected int[] getKeyPositions() {
+		return this.keyPositions;
+	}
+	
+	
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Override
+	public void getFlatComparator(List<TypeComparator> flatComparators) {
+		for(int i = 0; i < comparators.length; i++) {
+			if(comparators[i] instanceof CompositeTypeComparator) {
+				((CompositeTypeComparator)comparators[i]).getFlatComparator(flatComparators);
+			} else {
+				flatComparators.add(comparators[i]);
+			}
+		}
+	}	
+	// --------------------------------------------------------------------------------------------
+	//  Comparator Methods
+	// --------------------------------------------------------------------------------------------
+
+
+	@Override
+	public int compareToReference(TypeComparator<T> referencedComparator) {
+		TupleComparatorBase<T> other = (TupleComparatorBase<T>) referencedComparator;
+		
+		int i = 0;
+		try {
+			for (; i < this.keyPositions.length; i++) {
+				@SuppressWarnings("unchecked")
+				int cmp = this.comparators[i].compareToReference(other.comparators[i]);
+				if (cmp != 0) {
+					return cmp;
+				}
+			}
+			return 0;
+		}
+		catch (NullPointerException npex) {
+			throw new NullKeyFieldException(keyPositions[i]);
+		}
+		catch (IndexOutOfBoundsException iobex) {
+			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+		}
+	}
+	
+	@SuppressWarnings("unchecked")
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		if (deserializedFields1 == null) {
+			instantiateDeserializationUtils();
+		}
+		
+		int i = 0;
+		try {
+			for (; i < serializers.length; i++) {
+				deserializedFields1[i] = serializers[i].deserialize(deserializedFields1[i], firstSource);
+				deserializedFields2[i] = serializers[i].deserialize(deserializedFields2[i], secondSource);
+			}
+			
+			for (i = 0; i < keyPositions.length; i++) {
+				int keyPos = keyPositions[i];
+				int cmp = comparators[i].compare(deserializedFields1[keyPos], deserializedFields2[keyPos]);
+				if (cmp != 0) {
+					return cmp;
+				}
+			}
+			
+			return 0;
+		} catch (NullPointerException npex) {
+			throw new NullKeyFieldException(keyPositions[i]);
+		} catch (IndexOutOfBoundsException iobex) {
+			throw new KeyFieldOutOfBoundsException(keyPositions[i], iobex);
+		}
+	}
+	
+	@Override
+	public boolean supportsNormalizedKey() {
+		return this.numLeadingNormalizableKeys > 0;
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return this.normalizableKeyPrefixLen;
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return this.numLeadingNormalizableKeys < this.keyPositions.length ||
+				this.normalizableKeyPrefixLen == Integer.MAX_VALUE ||
+				this.normalizableKeyPrefixLen > keyBytes;
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return this.invertNormKey;
+	}
+	
+	
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+	
+	@Override
+	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	protected final void instantiateDeserializationUtils() {
+		this.deserializedFields1 = new Object[this.serializers.length];
+		this.deserializedFields2 = new Object[this.serializers.length];
+		
+		for (int i = 0; i < this.serializers.length; i++) {
+			this.deserializedFields1[i] = this.serializers[i].createInstance();
+			this.deserializedFields2[i] = this.serializers[i].createInstance();
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * A sequence of prime numbers to be used for salting the computed hash values.
+	 * Based on some empirical evidence, we are using a 32-element subsequence of the  
+	 * OEIS sequence #A068652 (numbers such that every cyclic permutation is a prime).
+	 * 
+	 * @see <a href="http://en.wikipedia.org/wiki/List_of_prime_numbers">http://en.wikipedia.org/wiki/List_of_prime_numbers</a>
+	 * @see <a href="http://oeis.org/A068652">http://oeis.org/A068652</a>
+	 */
+	public static final int[] HASH_SALT = new int[] {
+		73   , 79   , 97   , 113  , 131  , 197  , 199  , 311   , 
+		337  , 373  , 719  , 733  , 919  , 971  , 991  , 1193  , 
+		1931 , 3119 , 3779 , 7793 , 7937 , 9311 , 9377 , 11939 , 
+		19391, 19937, 37199, 39119, 71993, 91193, 93719, 93911 };
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
new file mode 100644
index 0000000..0897063
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.NullFieldException;
+
+
+public class TupleSerializer<T extends Tuple> extends TupleSerializerBase<T> {
+
+	private static final long serialVersionUID = 1L;
+	
+	public TupleSerializer(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
+		super(tupleClass, fieldSerializers);
+	}
+
+	@Override
+	public TupleSerializer<T> duplicate() {
+		boolean stateful = false;
+		TypeSerializer<?>[] duplicateFieldSerializers = new TypeSerializer<?>[fieldSerializers.length];
+
+		for (int i = 0; i < fieldSerializers.length; i++) {
+			duplicateFieldSerializers[i] = fieldSerializers[i].duplicate();
+			if (duplicateFieldSerializers[i] != fieldSerializers[i]) {
+				// at least one of them is stateful
+				stateful = true;
+			}
+		}
+
+		if (stateful) {
+			return new TupleSerializer<T>(tupleClass, duplicateFieldSerializers);
+		} else {
+			return this;
+		}
+	}
+
+	@Override
+	public T createInstance() {
+		try {
+			T t = tupleClass.newInstance();
+		
+			for (int i = 0; i < arity; i++) {
+				t.setField(fieldSerializers[i].createInstance(), i);
+			}
+			
+			return t;
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Cannot instantiate tuple.", e);
+		}
+	}
+
+	@Override
+	public T createInstance(Object[] fields) {
+
+		try {
+			T t = tupleClass.newInstance();
+
+			for (int i = 0; i < arity; i++) {
+				t.setField(fields[i], i);
+			}
+
+			return t;
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Cannot instantiate tuple.", e);
+		}
+	}
+
+	@Override
+	public T createOrReuseInstance(Object[] fields, T reuse) {
+		for (int i = 0; i < arity; i++) {
+			reuse.setField(fields[i], i);
+		}
+		return reuse;
+	}
+
+	@Override
+	public T copy(T from) {
+		T target = instantiateRaw();
+		for (int i = 0; i < arity; i++) {
+			Object copy = fieldSerializers[i].copy(from.getField(i));
+			target.setField(copy, i);
+		}
+		return target;
+	}
+	
+	@Override
+	public T copy(T from, T reuse) {
+		for (int i = 0; i < arity; i++) {
+			Object copy = fieldSerializers[i].copy(from.getField(i), reuse.getField(i));
+			reuse.setField(copy, i);
+		}
+		
+		return reuse;
+	}
+
+	@Override
+	public void serialize(T value, DataOutputView target) throws IOException {
+		for (int i = 0; i < arity; i++) {
+			Object o = value.getField(i);
+			try {
+				fieldSerializers[i].serialize(o, target);
+			} catch (NullPointerException npex) {
+				throw new NullFieldException(i, npex);
+			}
+		}
+	}
+
+	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		T tuple = instantiateRaw();
+		for (int i = 0; i < arity; i++) {
+			Object field = fieldSerializers[i].deserialize(source);
+			tuple.setField(field, i);
+		}
+		return tuple;
+	}
+	
+	@Override
+	public T deserialize(T reuse, DataInputView source) throws IOException {
+		for (int i = 0; i < arity; i++) {
+			Object field = fieldSerializers[i].deserialize(reuse.getField(i), source);
+			reuse.setField(field, i);
+		}
+		return reuse;
+	}
+	
+	private T instantiateRaw() {
+		try {
+			return tupleClass.newInstance();
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Cannot instantiate tuple.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
new file mode 100644
index 0000000..fc657a1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+
+public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	protected final Class<T> tupleClass;
+
+	protected final TypeSerializer<Object>[] fieldSerializers;
+
+	protected final int arity;
+
+	@SuppressWarnings("unchecked")
+	public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
+		this.tupleClass = Preconditions.checkNotNull(tupleClass);
+		this.fieldSerializers = (TypeSerializer<Object>[]) Preconditions.checkNotNull(fieldSerializers);
+		this.arity = fieldSerializers.length;
+	}
+	
+	public Class<T> getTupleClass() {
+		return this.tupleClass;
+	}
+	
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	public int getArity() {
+		return arity;
+	}
+
+	// We use this in the Aggregate and Distinct Operators to create instances
+	// of immutable Typles (i.e. Scala Tuples)
+	public abstract T createInstance(Object[] fields);
+
+	public abstract T createOrReuseInstance(Object[] fields, T reuse);
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		for (int i = 0; i < arity; i++) {
+			fieldSerializers[i].copy(source, target);
+		}
+	}
+	
+	@Override
+	public int hashCode() {
+		return 31 * Arrays.hashCode(fieldSerializers) + Objects.hash(tupleClass, arity);
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof TupleSerializerBase) {
+			TupleSerializerBase<?> other = (TupleSerializerBase<?>) obj;
+
+			return other.canEqual(this) &&
+				tupleClass == other.tupleClass &&
+				Arrays.equals(fieldSerializers, other.fieldSerializers) &&
+				arity == other.arity;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof TupleSerializerBase;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
new file mode 100644
index 0000000..4b9629a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.InstantiationUtil;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+/**
+ * Comparator for all Value types that extend Key
+ */
+public class ValueComparator<T extends Value & Comparable<T>> extends TypeComparator<T> {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private final Class<T> type;
+	
+	private final boolean ascendingComparison;
+	
+	private transient T reference;
+	
+	private transient T tempReference;
+	
+	private transient Kryo kryo;
+
+	@SuppressWarnings("rawtypes")
+	private final TypeComparator[] comparators = new TypeComparator[] {this};
+
+	public ValueComparator(boolean ascending, Class<T> type) {
+		this.type = type;
+		this.ascendingComparison = ascending;
+	}
+	
+	@Override
+	public int hash(T record) {
+		return record.hashCode();
+	}
+
+	@Override
+	public void setReference(T toCompare) {
+		checkKryoInitialized();
+
+		reference = KryoUtils.copy(toCompare, kryo, new ValueSerializer<T>(type));
+	}
+
+	@Override
+	public boolean equalToReference(T candidate) {
+		return candidate.equals(this.reference);
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<T> referencedComparator) {
+		T otherRef = ((ValueComparator<T>) referencedComparator).reference;
+		int comp = otherRef.compareTo(reference);
+		return ascendingComparison ? comp : -comp;
+	}
+	
+	@Override
+	public int compare(T first, T second) {
+		int comp = first.compareTo(second);
+		return ascendingComparison ? comp : -comp;
+	}
+	
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		if (reference == null) {
+			reference = InstantiationUtil.instantiate(type, Value.class);
+		}
+		if (tempReference == null) {
+			tempReference = InstantiationUtil.instantiate(type, Value.class);
+		}
+		
+		reference.read(firstSource);
+		tempReference.read(secondSource);
+		int comp = reference.compareTo(tempReference);
+		return ascendingComparison ? comp : -comp;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return NormalizableKey.class.isAssignableFrom(type);
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		if (reference == null) {
+			reference = InstantiationUtil.instantiate(type, Value.class);
+		}
+		
+		NormalizableKey<?> key = (NormalizableKey<?>) reference;
+		return key.getMaxNormalizedKeyLen();
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < getNormalizeKeyLen();
+	}
+
+	@Override
+	public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
+		NormalizableKey<?> key = (NormalizableKey<?>) record;
+		key.copyNormalizedKey(target, offset, numBytes);
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascendingComparison;
+	}
+	
+	@Override
+	public TypeComparator<T> duplicate() {
+		return new ValueComparator<T>(ascendingComparison, type);
+	}
+	
+	private void checkKryoInitialized() {
+		if (this.kryo == null) {
+			this.kryo = new Kryo();
+
+			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
+			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
+			kryo.setInstantiatorStrategy(instantiatorStrategy);
+
+			this.kryo.setAsmEnabled(true);
+			this.kryo.register(type);
+		}
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public TypeComparator[] getFlatComparators() {
+		return comparators;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// unsupported normalization
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
new file mode 100644
index 0000000..9329866
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.InstantiationUtil;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+/**
+ * Serializer for {@link Value} types. Uses the value's serialization methods, and uses
+ * Kryo for deep object copies.
+ *
+ * @param <T> The type serialized.
+ */
+public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
+
+	private static final long serialVersionUID = 1L;
+	
+	private final Class<T> type;
+	
+	private transient Kryo kryo;
+	
+	private transient T copyInstance;
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public ValueSerializer(Class<T> type) {
+		this.type = Preconditions.checkNotNull(type);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public ValueSerializer<T> duplicate() {
+		return new ValueSerializer<T>(type);
+	}
+	
+	@Override
+	public T createInstance() {
+		return InstantiationUtil.instantiate(this.type);
+	}
+
+	@Override
+	public T copy(T from) {
+		checkKryoInitialized();
+
+		return KryoUtils.copy(from, kryo, this);
+	}
+	
+	@Override
+	public T copy(T from, T reuse) {
+		checkKryoInitialized();
+
+		return KryoUtils.copy(from, reuse, kryo, this);
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public void serialize(T value, DataOutputView target) throws IOException {
+		value.write(target);
+	}
+
+	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		return deserialize(createInstance(), source);
+	}
+	
+	@Override
+	public T deserialize(T reuse, DataInputView source) throws IOException {
+		reuse.read(source);
+		return reuse;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		if (this.copyInstance == null) {
+			this.copyInstance = InstantiationUtil.instantiate(type);
+		}
+		
+		this.copyInstance.read(source);
+		this.copyInstance.write(target);
+	}
+	
+	private void checkKryoInitialized() {
+		if (this.kryo == null) {
+			this.kryo = new Kryo();
+
+			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
+			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
+			kryo.setInstantiatorStrategy(instantiatorStrategy);
+
+			this.kryo.setAsmEnabled(true);
+			this.kryo.register(type);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return this.type.hashCode();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof ValueSerializer) {
+			ValueSerializer<?> other = (ValueSerializer<?>) obj;
+
+			return other.canEqual(this) && type == other.type;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof ValueSerializer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
new file mode 100644
index 0000000..a03369a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.io.Writable;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T> {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private Class<T> type;
+	
+	private final boolean ascendingComparison;
+	
+	private transient T reference;
+	
+	private transient T tempReference;
+	
+	private transient Kryo kryo;
+
+	@SuppressWarnings("rawtypes")
+	private final TypeComparator[] comparators = new TypeComparator[] {this};
+
+	public WritableComparator(boolean ascending, Class<T> type) {
+		this.type = type;
+		this.ascendingComparison = ascending;
+	}
+	
+	@Override
+	public int hash(T record) {
+		return record.hashCode();
+	}
+	
+	@Override
+	public void setReference(T toCompare) {
+		checkKryoInitialized();
+
+		reference = KryoUtils.copy(toCompare, kryo, new WritableSerializer<T>(type));
+	}
+	
+	@Override
+	public boolean equalToReference(T candidate) {
+		return candidate.equals(reference);
+	}
+	
+	@Override
+	public int compareToReference(TypeComparator<T> referencedComparator) {
+		T otherRef = ((WritableComparator<T>) referencedComparator).reference;
+		int comp = otherRef.compareTo(reference);
+		return ascendingComparison ? comp : -comp;
+	}
+	
+	@Override
+	public int compare(T first, T second) {
+		int comp = first.compareTo(second);
+		return ascendingComparison ? comp : -comp;
+	}
+	
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		ensureReferenceInstantiated();
+		ensureTempReferenceInstantiated();
+		
+		reference.readFields(firstSource);
+		tempReference.readFields(secondSource);
+		
+		int comp = reference.compareTo(tempReference);
+		return ascendingComparison ? comp : -comp;
+	}
+	
+	@Override
+	public boolean supportsNormalizedKey() {
+		return NormalizableKey.class.isAssignableFrom(type);
+	}
+	
+	@Override
+	public int getNormalizeKeyLen() {
+		ensureReferenceInstantiated();
+		
+		NormalizableKey<?> key = (NormalizableKey<?>) reference;
+		return key.getMaxNormalizedKeyLen();
+	}
+	
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < getNormalizeKeyLen();
+	}
+	
+	@Override
+	public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
+		NormalizableKey<?> key = (NormalizableKey<?>) record;
+		key.copyNormalizedKey(target, offset, numBytes);
+	}
+	
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascendingComparison;
+	}
+	
+	@Override
+	public TypeComparator<T> duplicate() {
+		return new WritableComparator<T>(ascendingComparison, type);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public TypeComparator[] getFlatComparators() {
+		return comparators;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// unsupported normalization
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+	
+	@Override
+	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+	
+	@Override
+	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private void checkKryoInitialized() {
+		if (this.kryo == null) {
+			this.kryo = new Kryo();
+
+			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
+			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
+			kryo.setInstantiatorStrategy(instantiatorStrategy);
+
+			this.kryo.setAsmEnabled(true);
+			this.kryo.register(type);
+		}
+	}
+	
+	private void ensureReferenceInstantiated() {
+		if (reference == null) {
+			reference = InstantiationUtil.instantiate(type, Writable.class);
+		}
+	}
+	
+	private void ensureTempReferenceInstantiated() {
+		if (tempReference == null) {
+			tempReference = InstantiationUtil.instantiate(type, Writable.class);
+		}
+	}
+}


Mime
View raw message