flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [08/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
Date Tue, 02 Feb 2016 17:23:08 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
deleted file mode 100644
index de24956..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ /dev/null
@@ -1,592 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-
-public final class PojoSerializer<T> extends TypeSerializer<T> {
-
-	// Flags for the header
-	private static byte IS_NULL = 1;
-	private static byte NO_SUBCLASS = 2;
-	private static byte IS_SUBCLASS = 4;
-	private static byte IS_TAGGED_SUBCLASS = 8;
-
-	private static final long serialVersionUID = 1L;
-
-	private final Class<T> clazz;
-
-	private final TypeSerializer<Object>[] fieldSerializers;
-
-	private final int numFields;
-
-	private final Map<Class<?>, Integer> registeredClasses;
-
-	private final TypeSerializer<?>[] registeredSerializers;
-
-	private final ExecutionConfig executionConfig;
-
-	private transient Map<Class<?>, TypeSerializer<?>> subclassSerializerCache;
-	private transient ClassLoader cl;
-	// We need to handle these ourselves in writeObject()/readObject()
-	private transient Field[] fields;
-
-	@SuppressWarnings("unchecked")
-	public PojoSerializer(
-			Class<T> clazz,
-			TypeSerializer<?>[] fieldSerializers,
-			Field[] fields,
-			ExecutionConfig executionConfig) {
-
-		this.clazz = Preconditions.checkNotNull(clazz);
-		this.fieldSerializers = (TypeSerializer<Object>[]) Preconditions.checkNotNull(fieldSerializers);
-		this.fields = Preconditions.checkNotNull(fields);
-		this.numFields = fieldSerializers.length;
-		this.executionConfig = Preconditions.checkNotNull(executionConfig);
-
-		LinkedHashSet<Class<?>> registeredPojoTypes = executionConfig.getRegisteredPojoTypes();
-
-		for (int i = 0; i < numFields; i++) {
-			this.fields[i].setAccessible(true);
-		}
-
-		cl = Thread.currentThread().getContextClassLoader();
-
-		subclassSerializerCache = new HashMap<Class<?>, TypeSerializer<?>>();
-
-		// We only want those classes that are not our own class and are actually sub-classes.
-		List<Class<?>> cleanedTaggedClasses = new ArrayList<Class<?>>(registeredPojoTypes.size());
-		for (Class<?> registeredClass: registeredPojoTypes) {
-			if (registeredClass.equals(clazz)) {
-				continue;
-			}
-			if (!clazz.isAssignableFrom(registeredClass)) {
-				continue;
-			}
-			cleanedTaggedClasses.add(registeredClass);
-
-		}
-		this.registeredClasses = new LinkedHashMap<Class<?>, Integer>(cleanedTaggedClasses.size());
-		registeredSerializers = new TypeSerializer[cleanedTaggedClasses.size()];
-
-		int id = 0;
-		for (Class<?> registeredClass: cleanedTaggedClasses) {
-			this.registeredClasses.put(registeredClass, id);
-			TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(registeredClass);
-			registeredSerializers[id] = typeInfo.createSerializer(executionConfig);
-
-			id++;
-		}
-	}
-
-	private void writeObject(ObjectOutputStream out)
-			throws IOException, ClassNotFoundException {
-		out.defaultWriteObject();
-		out.writeInt(fields.length);
-		for (Field field: fields) {
-			out.writeObject(field.getDeclaringClass());
-			out.writeUTF(field.getName());
-		}
-	}
-
-	private void readObject(ObjectInputStream in)
-			throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-		int numFields = in.readInt();
-		fields = new Field[numFields];
-		for (int i = 0; i < numFields; i++) {
-			Class<?> clazz = (Class<?>)in.readObject();
-			String fieldName = in.readUTF();
-			fields[i] = null;
-			// try superclasses as well
-			while (clazz != null) {
-				try {
-					fields[i] = clazz.getDeclaredField(fieldName);
-					fields[i].setAccessible(true);
-					break;
-				} catch (NoSuchFieldException e) {
-					clazz = clazz.getSuperclass();
-				}
-			}
-			if (fields[i] == null) {
-				throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup."
-						+ " (" + fieldName + ")");
-			}
-		}
-
-		cl = Thread.currentThread().getContextClassLoader();
-		subclassSerializerCache = new HashMap<Class<?>, TypeSerializer<?>>();
-	}
-
-	private TypeSerializer<?> getSubclassSerializer(Class<?> subclass) {
-		TypeSerializer<?> result = subclassSerializerCache.get(subclass);
-		if (result == null) {
-
-			TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(subclass);
-			result = typeInfo.createSerializer(executionConfig);
-			if (result instanceof PojoSerializer) {
-				PojoSerializer<?> subclassSerializer = (PojoSerializer<?>) result;
-				subclassSerializer.copyBaseFieldOrder(this);
-			}
-			subclassSerializerCache.put(subclass, result);
-
-		}
-		return result;
-	}
-
-	@SuppressWarnings("unused")
-	private boolean hasField(Field f) {
-		for (Field field: fields) {
-			if (f.equals(field)) {
-				return true;
-			}
-		}
-		return false;
-	}
-
-	private void copyBaseFieldOrder(PojoSerializer<?> baseSerializer) {
-		// do nothing for now, but in the future, adapt subclass serializer to have same
-		// ordering as base class serializer so that binary comparison on base class fields
-		// can work
-	}
-	
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public PojoSerializer<T> duplicate() {
-		boolean stateful = false;
-		TypeSerializer<?>[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length];
-
-		for (int i = 0; i < fieldSerializers.length; i++) {
-			duplicateFieldSerializers[i] = fieldSerializers[i].duplicate();
-			if (duplicateFieldSerializers[i] != fieldSerializers[i]) {
-				// at least one of them is stateful
-				stateful = true;
-			}
-		}
-
-		if (stateful) {
-			return new PojoSerializer<T>(clazz, duplicateFieldSerializers, fields, executionConfig);
-		} else {
-			return this;
-		}
-	}
-
-	
-	@Override
-	public T createInstance() {
-		if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) {
-			return null;
-		}
-		try {
-			T t = clazz.newInstance();
-			initializeFields(t);
-			return t;
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Cannot instantiate class.", e);
-		}
-	}
-
-	protected void initializeFields(T t) {
-		for (int i = 0; i < numFields; i++) {
-			try {
-				fields[i].set(t, fieldSerializers[i].createInstance());
-			} catch (IllegalAccessException e) {
-				throw new RuntimeException("Cannot initialize fields.", e);
-			}
-		}
-	}
-
-	@Override
-	@SuppressWarnings({"unchecked", "rawtypes"})
-	public T copy(T from) {
-		if (from == null) {
-			return null;
-		}
-
-		Class<?> actualType = from.getClass();
-		if (actualType == clazz) {
-			T target;
-			try {
-				target = (T) from.getClass().newInstance();
-			}
-			catch (Throwable t) {
-				throw new RuntimeException("Cannot instantiate class.", t);
-			}
-			// no subclass
-			try {
-				for (int i = 0; i < numFields; i++) {
-					Object value = fields[i].get(from);
-					if (value != null) {
-						Object copy = fieldSerializers[i].copy(value);
-						fields[i].set(target, copy);
-					}
-					else {
-						fields[i].set(target, null);
-					}
-				}
-			} catch (IllegalAccessException e) {
-				throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.");
-
-			}
-			return target;
-		} else {
-			// subclass
-			TypeSerializer subclassSerializer = getSubclassSerializer(actualType);
-			return (T) subclassSerializer.copy(from);
-		}
-	}
-	
-	@Override
-	@SuppressWarnings({"unchecked", "rawtypes"})
-	public T copy(T from, T reuse) {
-		if (from == null) {
-			return null;
-		}
-
-		Class<?> actualType = from.getClass();
-		if (reuse == null || actualType != reuse.getClass()) {
-			// cannot reuse, do a non-reuse copy
-			return copy(from);
-		}
-
-		if (actualType == clazz) {
-			try {
-				for (int i = 0; i < numFields; i++) {
-					Object value = fields[i].get(from);
-					if (value != null) {
-						Object reuseValue = fields[i].get(reuse);
-						Object copy;
-						if(reuseValue != null) {
-							copy = fieldSerializers[i].copy(value, reuseValue);
-						}
-						else {
-							copy = fieldSerializers[i].copy(value);
-						}
-						fields[i].set(reuse, copy);
-					}
-					else {
-						fields[i].set(reuse, null);
-					}
-				}
-			} catch (IllegalAccessException e) {
-				throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before.");
-			}
-		} else {
-			TypeSerializer subclassSerializer = getSubclassSerializer(actualType);
-			reuse = (T) subclassSerializer.copy(from, reuse);
-		}
-
-		return reuse;
-	}
-
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-
-	@Override
-	@SuppressWarnings({"unchecked", "rawtypes"})
-	public void serialize(T value, DataOutputView target) throws IOException {
-		int flags = 0;
-		// handle null values
-		if (value == null) {
-			flags |= IS_NULL;
-			target.writeByte(flags);
-			return;
-		}
-
-		Integer subclassTag = -1;
-		Class<?> actualClass = value.getClass();
-		TypeSerializer subclassSerializer = null;
-		if (clazz != actualClass) {
-			subclassTag = registeredClasses.get(actualClass);
-			if (subclassTag != null) {
-				flags |= IS_TAGGED_SUBCLASS;
-				subclassSerializer = registeredSerializers[subclassTag];
-			} else {
-				flags |= IS_SUBCLASS;
-				subclassSerializer = getSubclassSerializer(actualClass);
-			}
-		} else {
-			flags |= NO_SUBCLASS;
-		}
-
-		target.writeByte(flags);
-
-		if ((flags & IS_SUBCLASS) != 0) {
-			target.writeUTF(actualClass.getName());
-		} else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
-			target.writeByte(subclassTag);
-		}
-
-
-		if ((flags & NO_SUBCLASS) != 0) {
-			try {
-				for (int i = 0; i < numFields; i++) {
-					Object o = fields[i].get(value);
-					if (o == null) {
-						target.writeBoolean(true); // null field handling
-					} else {
-						target.writeBoolean(false);
-						fieldSerializers[i].serialize(o, target);
-					}
-				}
-			} catch (IllegalAccessException e) {
-				throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before.");
-
-			}
-		} else {
-			// subclass
-			if (subclassSerializer != null) {
-				subclassSerializer.serialize(value, target);
-			}
-		}
-	}
-
-	@Override
-	@SuppressWarnings({"unchecked", "rawtypes"})
-	public T deserialize(DataInputView source) throws IOException {
-		int flags = source.readByte();
-		if((flags & IS_NULL) != 0) {
-			return null;
-		}
-
-		T target;
-
-		Class<?> actualSubclass = null;
-		TypeSerializer subclassSerializer = null;
-
-		if ((flags & IS_SUBCLASS) != 0) {
-			String subclassName = source.readUTF();
-			try {
-				actualSubclass = Class.forName(subclassName, true, cl);
-			} catch (ClassNotFoundException e) {
-				throw new RuntimeException("Cannot instantiate class.", e);
-			}
-			subclassSerializer = getSubclassSerializer(actualSubclass);
-			target = (T) subclassSerializer.createInstance();
-			// also initialize fields for which the subclass serializer is not responsible
-			initializeFields(target);
-		} else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
-
-			int subclassTag = source.readByte();
-			subclassSerializer = registeredSerializers[subclassTag];
-			target = (T) subclassSerializer.createInstance();
-			// also initialize fields for which the subclass serializer is not responsible
-			initializeFields(target);
-		} else {
-			target = createInstance();
-		}
-
-		if ((flags & NO_SUBCLASS) != 0) {
-			try {
-				for (int i = 0; i < numFields; i++) {
-					boolean isNull = source.readBoolean();
-					if (isNull) {
-						fields[i].set(target, null);
-					} else {
-						Object field = fieldSerializers[i].deserialize(source);
-						fields[i].set(target, field);
-					}
-				}
-			} catch (IllegalAccessException e) {
-				throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before.");
-
-			}
-		} else {
-			if (subclassSerializer != null) {
-				target = (T) subclassSerializer.deserialize(target, source);
-			}
-		}
-		return target;
-	}
-	
-	@Override
-	@SuppressWarnings({"unchecked", "rawtypes"})
-	public T deserialize(T reuse, DataInputView source) throws IOException {
-
-		// handle null values
-		int flags = source.readByte();
-		if((flags & IS_NULL) != 0) {
-			return null;
-		}
-
-		Class<?> subclass = null;
-		TypeSerializer subclassSerializer = null;
-		if ((flags & IS_SUBCLASS) != 0) {
-			String subclassName = source.readUTF();
-			try {
-				subclass = Class.forName(subclassName, true, cl);
-			} catch (ClassNotFoundException e) {
-				throw new RuntimeException("Cannot instantiate class.", e);
-			}
-			subclassSerializer = getSubclassSerializer(subclass);
-
-			if (reuse == null || subclass != reuse.getClass()) {
-				// cannot reuse
-				reuse = (T) subclassSerializer.createInstance();
-				// also initialize fields for which the subclass serializer is not responsible
-				initializeFields(reuse);
-			}
-		} else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
-			int subclassTag = source.readByte();
-			subclassSerializer = registeredSerializers[subclassTag];
-
-			if (reuse == null || ((PojoSerializer)subclassSerializer).clazz != reuse.getClass()) {
-				// cannot reuse
-				reuse = (T) subclassSerializer.createInstance();
-				// also initialize fields for which the subclass serializer is not responsible
-				initializeFields(reuse);
-			}
-		} else {
-			if (reuse == null || clazz != reuse.getClass()) {
-				reuse = createInstance();
-			}
-		}
-
-		if ((flags & NO_SUBCLASS) != 0) {
-			try {
-				for (int i = 0; i < numFields; i++) {
-					boolean isNull = source.readBoolean();
-					if (isNull) {
-						fields[i].set(reuse, null);
-					} else {
-						Object field;
-
-						Object reuseField = fields[i].get(reuse);
-						if(reuseField != null) {
-							field = fieldSerializers[i].deserialize(reuseField, source);
-						}
-						else {
-							field = fieldSerializers[i].deserialize(source);
-						}
-
-						fields[i].set(reuse, field);
-					}
-				}
-			} catch (IllegalAccessException e) {
-				throw new RuntimeException(
-						"Error during POJO copy, this should not happen since we check the fields before.");
-			}
-		} else {
-			if (subclassSerializer != null) {
-				reuse = (T) subclassSerializer.deserialize(reuse, source);
-			}
-		}
-
-		return reuse;
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		// copy the flags
-		int flags = source.readByte();
-		target.writeByte(flags);
-
-		if ((flags & IS_NULL) != 0) {
-			// is a null value, nothing further to copy
-			return;
-		}
-
-		TypeSerializer<?> subclassSerializer = null;
-		if ((flags & IS_SUBCLASS) != 0) {
-			String className = source.readUTF();
-			target.writeUTF(className);
-			try {
-				Class<?> subclass = Class.forName(className, true, Thread.currentThread()
-						.getContextClassLoader());
-				subclassSerializer = getSubclassSerializer(subclass);
-			} catch (ClassNotFoundException e) {
-				throw new RuntimeException("Cannot instantiate class.", e);
-			}
-		} else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
-			int subclassTag = source.readByte();
-			target.writeByte(subclassTag);
-			subclassSerializer = registeredSerializers[subclassTag];
-		}
-
-		if ((flags & NO_SUBCLASS) != 0) {
-			for (int i = 0; i < numFields; i++) {
-				boolean isNull = source.readBoolean();
-				target.writeBoolean(isNull);
-				if (!isNull) {
-					fieldSerializers[i].copy(source, target);
-				}
-			}
-		} else {
-			if (subclassSerializer != null) {
-				subclassSerializer.copy(source, target);
-			}
-		}
-	}
-	
-	@Override
-	public int hashCode() {
-		return 31 * (31 * Arrays.hashCode(fieldSerializers) + Arrays.hashCode(registeredSerializers)) +
-			Objects.hash(clazz, numFields, registeredClasses);
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof PojoSerializer) {
-			PojoSerializer<?> other = (PojoSerializer<?>) obj;
-
-			return other.canEqual(this) &&
-				clazz == other.clazz &&
-				Arrays.equals(fieldSerializers, other.fieldSerializers) &&
-				Arrays.equals(registeredSerializers, other.registeredSerializers) &&
-				numFields == other.numFields &&
-				registeredClasses.equals(other.registeredClasses);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof PojoSerializer;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java
deleted file mode 100644
index 4b734a7..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeComparatorFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.InstantiationUtil;
-
-public final class RuntimeComparatorFactory<T> implements TypeComparatorFactory<T>, java.io.Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-
-	private static final String CONFIG_KEY = "SER_DATA";
-
-	private TypeComparator<T> comparator;
-
-
-	public RuntimeComparatorFactory() {}
-
-	public RuntimeComparatorFactory(TypeComparator<T> comparator) {
-		this.comparator = comparator;
-	}
-
-	@Override
-	public void writeParametersToConfig(Configuration config) {
-		try {
-			InstantiationUtil.writeObjectToConfig(comparator, config, CONFIG_KEY);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Could not serialize comparator into the configuration.", e);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {
-		try {
-			comparator = (TypeComparator<T>) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY, cl);
-		}
-		catch (ClassNotFoundException e) {
-			throw e;
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Could not serialize serializer into the configuration.", e);
-		}
-	}
-
-	@Override
-	public TypeComparator<T> createComparator() {
-		if (comparator != null) {
-			return comparator;
-		} else {
-			throw new RuntimeException("ComparatorFactory has not been initialized from configuration.");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
deleted file mode 100644
index 31e28f7..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimePairComparatorFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.GenericPairComparator;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-
-public final class RuntimePairComparatorFactory<T1, T2>
-		implements TypePairComparatorFactory<T1, T2>, java.io.Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public TypePairComparator<T1, T2> createComparator12(
-			TypeComparator<T1> comparator1,
-			TypeComparator<T2> comparator2) {
-		return new GenericPairComparator<T1, T2>(comparator1, comparator2);
-	}
-
-	@Override
-	public TypePairComparator<T2, T1> createComparator21(
-			TypeComparator<T1> comparator1,
-			TypeComparator<T2> comparator2) {
-		return new GenericPairComparator<T2, T1>(comparator2, comparator1);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
deleted file mode 100644
index 96aff73..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.InstantiationUtil;
-
-public final class RuntimeSerializerFactory<T> implements TypeSerializerFactory<T>, java.io.Serializable {
-
-	private static final long serialVersionUID = 1L;
-	
-
-	private static final String CONFIG_KEY_SER = "SER_DATA";
-
-	private static final String CONFIG_KEY_CLASS = "CLASS_DATA";
-
-	
-	private TypeSerializer<T> serializer;
-
-	private boolean firstSerializer = true;
-
-	private Class<T> clazz;
-
-	// Because we read the class from the TaskConfig and instantiate ourselves
-	public RuntimeSerializerFactory() {}
-
-	public RuntimeSerializerFactory(TypeSerializer<T> serializer, Class<T> clazz) {
-		if (serializer == null || clazz == null) {
-			throw new NullPointerException();
-		}
-
-		this.clazz = clazz;
-		this.serializer = serializer;
-	}
-
-
-	@Override
-	public void writeParametersToConfig(Configuration config) {
-		try {
-			InstantiationUtil.writeObjectToConfig(clazz, config, CONFIG_KEY_CLASS);
-			InstantiationUtil.writeObjectToConfig(serializer, config, CONFIG_KEY_SER);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Could not serialize serializer into the configuration.", e);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {
-		if (config == null || cl == null) {
-			throw new NullPointerException();
-		}
-		
-		try {
-			this.clazz = (Class<T>) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl);
-			this.serializer = (TypeSerializer<T>)  InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_SER, cl);
-			firstSerializer = true;
-		}
-		catch (ClassNotFoundException e) {
-			throw e;
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Could not load deserializer from the configuration.", e);
-		}
-	}
-
-	@Override
-	public TypeSerializer<T> getSerializer() {
-		if (this.serializer != null) {
-			if (firstSerializer) {
-				firstSerializer = false;
-				return this.serializer;
-			} else {
-				return this.serializer.duplicate();
-			}
-		} else {
-			throw new RuntimeException("SerializerFactory has not been initialized from configuration.");
-		}
-	}
-
-	@Override
-	public Class<T> getDataType() {
-		return clazz;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return clazz.hashCode() ^ serializer.hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj != null && obj instanceof RuntimeSerializerFactory) {
-			RuntimeSerializerFactory<?> other = (RuntimeSerializerFactory<?>) obj;
-			
-			return this.clazz == other.clazz &&
-					this.serializer.equals(other.serializer);
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
deleted file mode 100644
index a06ff1a..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class Tuple0Serializer extends TupleSerializer<Tuple0> {
-	
-	private static final long serialVersionUID = 1278813169022975971L;
-
-	public static final Tuple0Serializer INSTANCE = new Tuple0Serializer();
-
-	// ------------------------------------------------------------------------
-	
-	private Tuple0Serializer() {
-		super(Tuple0.class, new TypeSerializer<?>[0]);
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public Tuple0Serializer duplicate() {
-		return this;
-	}
-
-	@Override
-	public Tuple0 createInstance() {
-		return Tuple0.INSTANCE;
-	}
-
-	@Override
-	public Tuple0 createInstance(Object[] fields) {
-		if (fields == null || fields.length == 0) {
-			return Tuple0.INSTANCE;
-		}
-
-		throw new UnsupportedOperationException(
-				"Tuple0 cannot take any data, as it has zero fields.");
-	}
-
-	@Override
-	public Tuple0 copy(Tuple0 from) {
-		return from;
-	}
-
-	@Override
-	public Tuple0 copy(Tuple0 from, Tuple0 reuse) {
-		return reuse;
-	}
-
-	@Override
-	public int getLength() {
-		return 1;
-	}
-
-	@Override
-	public void serialize(Tuple0 record, DataOutputView target) throws IOException {
-		target.writeByte(42);
-	}
-
-	@Override
-	public Tuple0 deserialize(DataInputView source) throws IOException {
-		source.readByte();
-		return Tuple0.INSTANCE;
-	}
-
-	@Override
-	public Tuple0 deserialize(Tuple0 reuse, DataInputView source) throws IOException {
-		source.readByte();
-		return reuse;
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		target.writeByte(source.readByte());
-	}
-
-	// ------------------------------------------------------------------------
-
-
-	@Override
-	public int hashCode() {
-		return Tuple0Serializer.class.hashCode();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof Tuple0Serializer) {
-			Tuple0Serializer other = (Tuple0Serializer) obj;
-
-			return other.canEqual(this);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof Tuple0Serializer;
-	}
-
-	@Override
-	public String toString() {
-		return "Tuple0Serializer";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
deleted file mode 100644
index 875ecc2..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.types.KeyFieldOutOfBoundsException;
-import org.apache.flink.types.NullFieldException;
-import org.apache.flink.types.NullKeyFieldException;
-
-
-public final class TupleComparator<T extends Tuple> extends TupleComparatorBase<T> {
-
-	private static final long serialVersionUID = 1L;
-	
-	public TupleComparator(int[] keyPositions, TypeComparator<?>[] comparators, TypeSerializer<?>[] serializers) {
-		super(keyPositions, comparators, serializers);
-	}
-	
-	private TupleComparator(TupleComparator<T> toClone) {
-		super(toClone);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Comparator Methods
-	// --------------------------------------------------------------------------------------------
-	
-	@SuppressWarnings("unchecked")
-	@Override
-	public int hash(T value) {
-		int i = 0;
-		try {
-			int code = this.comparators[0].hash(value.getFieldNotNull(keyPositions[0]));
-			for (i = 1; i < this.keyPositions.length; i++) {
-				code *= HASH_SALT[i & 0x1F]; // salt code with (i % HASH_SALT.length)-th salt component
-				code += this.comparators[i].hash(value.getFieldNotNull(keyPositions[i]));
-			}
-			return code;
-		}
-		catch (NullFieldException nfex) {
-			throw new NullKeyFieldException(nfex);
-		}
-		catch (IndexOutOfBoundsException iobex) {
-			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void setReference(T toCompare) {
-		int i = 0;
-		try {
-			for (; i < this.keyPositions.length; i++) {
-				this.comparators[i].setReference(toCompare.getFieldNotNull(this.keyPositions[i]));
-			}
-		}
-		catch (NullFieldException nfex) {
-			throw new NullKeyFieldException(nfex);
-		}
-		catch (IndexOutOfBoundsException iobex) {
-			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public boolean equalToReference(T candidate) {
-		int i = 0;
-		try {
-			for (; i < this.keyPositions.length; i++) {
-				if (!this.comparators[i].equalToReference(candidate.getFieldNotNull(this.keyPositions[i]))) {
-					return false;
-				}
-			}
-			return true;
-		}
-		catch (NullFieldException nfex) {
-			throw new NullKeyFieldException(nfex);
-		}
-		catch (IndexOutOfBoundsException iobex) {
-			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
-		}
-	}
-	
-	@SuppressWarnings("unchecked")
-	@Override
-	public int compare(T first, T second) {
-		int i = 0;
-		try {
-			for (; i < keyPositions.length; i++) {
-				int keyPos = keyPositions[i];
-				int cmp = comparators[i].compare(first.getFieldNotNull(keyPos), second.getFieldNotNull(keyPos));
-
-				if (cmp != 0) {
-					return cmp;
-				}
-			}
-			return 0;
-		} 
-		catch (NullFieldException nfex) {
-			throw new NullKeyFieldException(nfex);
-		}
-		catch (IndexOutOfBoundsException iobex) {
-			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void putNormalizedKey(T value, MemorySegment target, int offset, int numBytes) {
-		int i = 0;
-		try {
-			for (; i < this.numLeadingNormalizableKeys && numBytes > 0; i++) {
-				int len = this.normalizedKeyLengths[i];
-				len = numBytes >= len ? len : numBytes;
-				this.comparators[i].putNormalizedKey(value.getFieldNotNull(this.keyPositions[i]), target, offset, len);
-				numBytes -= len;
-				offset += len;
-			}
-		} catch (NullFieldException nfex) {
-			throw new NullKeyFieldException(nfex);
-		} catch (NullPointerException npex) {
-			throw new NullKeyFieldException(this.keyPositions[i]);
-		}
-	}
-
-	@Override
-	public int extractKeys(Object record, Object[] target, int index) {
-		int localIndex = index;
-		for(int i = 0; i < comparators.length; i++) {
-			localIndex += comparators[i].extractKeys(((Tuple) record).getField(keyPositions[i]), target, localIndex);
-		}
-		return localIndex - index;
-	}
-
-	public TypeComparator<T> duplicate() {
-		return new TupleComparator<T>(this);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
deleted file mode 100644
index 28169e5..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.KeyFieldOutOfBoundsException;
-import org.apache.flink.types.NullKeyFieldException;
-
-
-public abstract class TupleComparatorBase<T> extends CompositeTypeComparator<T> implements java.io.Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	/** key positions describe which fields are keys in what order */
-	protected int[] keyPositions;
-
-	/** comparators for the key fields, in the same order as the key fields */
-	@SuppressWarnings("rawtypes")
-	protected TypeComparator[] comparators;
-
-	protected int[] normalizedKeyLengths;
-
-	protected int numLeadingNormalizableKeys;
-
-	protected int normalizableKeyPrefixLen;
-
-	protected boolean invertNormKey;
-
-
-	/** serializers to deserialize the first n fields for comparison */
-	@SuppressWarnings("rawtypes")
-	protected TypeSerializer[] serializers;
-
-	// cache for the deserialized field objects
-	protected transient Object[] deserializedFields1;
-	protected transient Object[] deserializedFields2;
-
-
-	@SuppressWarnings("unchecked")
-	public TupleComparatorBase(int[] keyPositions, TypeComparator<?>[] comparators, TypeSerializer<?>[] serializers) {
-		// set the default utils
-		this.keyPositions = keyPositions;
-		this.comparators = (TypeComparator<Object>[]) comparators;
-		this.serializers = (TypeSerializer<Object>[]) serializers;
-
-		// set up auxiliary fields for normalized key support
-		this.normalizedKeyLengths = new int[keyPositions.length];
-		int nKeys = 0;
-		int nKeyLen = 0;
-		boolean inverted = false;
-
-		for (int i = 0; i < this.keyPositions.length; i++) {
-			TypeComparator<?> k = this.comparators[i];
-
-			// as long as the leading keys support normalized keys, we can build up the composite key
-			if (k.supportsNormalizedKey()) {
-				if (i == 0) {
-					// the first comparator decides whether we need to invert the key direction
-					inverted = k.invertNormalizedKey();
-				}
-				else if (k.invertNormalizedKey() != inverted) {
-					// if a successor does not agree on the inversion direction, it cannot be part of the normalized key
-					break;
-				}
-
-				nKeys++;
-				final int len = k.getNormalizeKeyLen();
-				if (len < 0) {
-					throw new RuntimeException("Comparator " + k.getClass().getName() + " specifies an invalid length for the normalized key: " + len);
-				}
-				this.normalizedKeyLengths[i] = len;
-				nKeyLen += len;
-
-				if (nKeyLen < 0) {
-					// overflow, which means we are out of budget for normalized key space anyways
-					nKeyLen = Integer.MAX_VALUE;
-					break;
-				}
-			} else {
-				break;
-			}
-		}
-		this.numLeadingNormalizableKeys = nKeys;
-		this.normalizableKeyPrefixLen = nKeyLen;
-		this.invertNormKey = inverted;
-	}
-
-	protected TupleComparatorBase(TupleComparatorBase<T> toClone) {
-		privateDuplicate(toClone);
-	}
-
-	// We need this because we cannot call the cloning constructor from the
-	// ScalaTupleComparator
-	protected void privateDuplicate(TupleComparatorBase<T> toClone) {
-		// copy fields and serializer factories
-		this.keyPositions = toClone.keyPositions;
-
-		this.serializers = new TypeSerializer[toClone.serializers.length];
-		for (int i = 0; i < toClone.serializers.length; i++) {
-			this.serializers[i] = toClone.serializers[i].duplicate();
-		}
-
-		this.comparators = new TypeComparator[toClone.comparators.length];
-		for (int i = 0; i < toClone.comparators.length; i++) {
-			this.comparators[i] = toClone.comparators[i].duplicate();
-		}
-
-		this.normalizedKeyLengths = toClone.normalizedKeyLengths;
-		this.numLeadingNormalizableKeys = toClone.numLeadingNormalizableKeys;
-		this.normalizableKeyPrefixLen = toClone.normalizableKeyPrefixLen;
-		this.invertNormKey = toClone.invertNormKey;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Comparator Methods
-	// --------------------------------------------------------------------------------------------
-	
-	protected int[] getKeyPositions() {
-		return this.keyPositions;
-	}
-	
-	
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Override
-	public void getFlatComparator(List<TypeComparator> flatComparators) {
-		for(int i = 0; i < comparators.length; i++) {
-			if(comparators[i] instanceof CompositeTypeComparator) {
-				((CompositeTypeComparator)comparators[i]).getFlatComparator(flatComparators);
-			} else {
-				flatComparators.add(comparators[i]);
-			}
-		}
-	}	
-	// --------------------------------------------------------------------------------------------
-	//  Comparator Methods
-	// --------------------------------------------------------------------------------------------
-
-
-	@Override
-	public int compareToReference(TypeComparator<T> referencedComparator) {
-		TupleComparatorBase<T> other = (TupleComparatorBase<T>) referencedComparator;
-		
-		int i = 0;
-		try {
-			for (; i < this.keyPositions.length; i++) {
-				@SuppressWarnings("unchecked")
-				int cmp = this.comparators[i].compareToReference(other.comparators[i]);
-				if (cmp != 0) {
-					return cmp;
-				}
-			}
-			return 0;
-		}
-		catch (NullPointerException npex) {
-			throw new NullKeyFieldException(keyPositions[i]);
-		}
-		catch (IndexOutOfBoundsException iobex) {
-			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
-		}
-	}
-	
-	@SuppressWarnings("unchecked")
-	@Override
-	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
-		if (deserializedFields1 == null) {
-			instantiateDeserializationUtils();
-		}
-		
-		int i = 0;
-		try {
-			for (; i < serializers.length; i++) {
-				deserializedFields1[i] = serializers[i].deserialize(deserializedFields1[i], firstSource);
-				deserializedFields2[i] = serializers[i].deserialize(deserializedFields2[i], secondSource);
-			}
-			
-			for (i = 0; i < keyPositions.length; i++) {
-				int keyPos = keyPositions[i];
-				int cmp = comparators[i].compare(deserializedFields1[keyPos], deserializedFields2[keyPos]);
-				if (cmp != 0) {
-					return cmp;
-				}
-			}
-			
-			return 0;
-		} catch (NullPointerException npex) {
-			throw new NullKeyFieldException(keyPositions[i]);
-		} catch (IndexOutOfBoundsException iobex) {
-			throw new KeyFieldOutOfBoundsException(keyPositions[i], iobex);
-		}
-	}
-	
-	@Override
-	public boolean supportsNormalizedKey() {
-		return this.numLeadingNormalizableKeys > 0;
-	}
-
-	@Override
-	public int getNormalizeKeyLen() {
-		return this.normalizableKeyPrefixLen;
-	}
-
-	@Override
-	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-		return this.numLeadingNormalizableKeys < this.keyPositions.length ||
-				this.normalizableKeyPrefixLen == Integer.MAX_VALUE ||
-				this.normalizableKeyPrefixLen > keyBytes;
-	}
-
-	@Override
-	public boolean invertNormalizedKey() {
-		return this.invertNormKey;
-	}
-	
-	
-	@Override
-	public boolean supportsSerializationWithKeyNormalization() {
-		return false;
-	}
-	
-	@Override
-	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	protected final void instantiateDeserializationUtils() {
-		this.deserializedFields1 = new Object[this.serializers.length];
-		this.deserializedFields2 = new Object[this.serializers.length];
-		
-		for (int i = 0; i < this.serializers.length; i++) {
-			this.deserializedFields1[i] = this.serializers[i].createInstance();
-			this.deserializedFields2[i] = this.serializers[i].createInstance();
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * A sequence of prime numbers to be used for salting the computed hash values.
-	 * Based on some empirical evidence, we are using a 32-element subsequence of the  
-	 * OEIS sequence #A068652 (numbers such that every cyclic permutation is a prime).
-	 * 
-	 * @see <a href="http://en.wikipedia.org/wiki/List_of_prime_numbers">http://en.wikipedia.org/wiki/List_of_prime_numbers</a>
-	 * @see <a href="http://oeis.org/A068652">http://oeis.org/A068652</a>
-	 */
-	public static final int[] HASH_SALT = new int[] {
-		73   , 79   , 97   , 113  , 131  , 197  , 199  , 311   , 
-		337  , 373  , 719  , 733  , 919  , 971  , 991  , 1193  , 
-		1931 , 3119 , 3779 , 7793 , 7937 , 9311 , 9377 , 11939 , 
-		19391, 19937, 37199, 39119, 71993, 91193, 93719, 93911 };
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
deleted file mode 100644
index 0897063..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.NullFieldException;
-
-
-public class TupleSerializer<T extends Tuple> extends TupleSerializerBase<T> {
-
-	private static final long serialVersionUID = 1L;
-	
-	public TupleSerializer(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
-		super(tupleClass, fieldSerializers);
-	}
-
-	@Override
-	public TupleSerializer<T> duplicate() {
-		boolean stateful = false;
-		TypeSerializer<?>[] duplicateFieldSerializers = new TypeSerializer<?>[fieldSerializers.length];
-
-		for (int i = 0; i < fieldSerializers.length; i++) {
-			duplicateFieldSerializers[i] = fieldSerializers[i].duplicate();
-			if (duplicateFieldSerializers[i] != fieldSerializers[i]) {
-				// at least one of them is stateful
-				stateful = true;
-			}
-		}
-
-		if (stateful) {
-			return new TupleSerializer<T>(tupleClass, duplicateFieldSerializers);
-		} else {
-			return this;
-		}
-	}
-
-	@Override
-	public T createInstance() {
-		try {
-			T t = tupleClass.newInstance();
-		
-			for (int i = 0; i < arity; i++) {
-				t.setField(fieldSerializers[i].createInstance(), i);
-			}
-			
-			return t;
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Cannot instantiate tuple.", e);
-		}
-	}
-
-	@Override
-	public T createInstance(Object[] fields) {
-
-		try {
-			T t = tupleClass.newInstance();
-
-			for (int i = 0; i < arity; i++) {
-				t.setField(fields[i], i);
-			}
-
-			return t;
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Cannot instantiate tuple.", e);
-		}
-	}
-
-	@Override
-	public T createOrReuseInstance(Object[] fields, T reuse) {
-		for (int i = 0; i < arity; i++) {
-			reuse.setField(fields[i], i);
-		}
-		return reuse;
-	}
-
-	@Override
-	public T copy(T from) {
-		T target = instantiateRaw();
-		for (int i = 0; i < arity; i++) {
-			Object copy = fieldSerializers[i].copy(from.getField(i));
-			target.setField(copy, i);
-		}
-		return target;
-	}
-	
-	@Override
-	public T copy(T from, T reuse) {
-		for (int i = 0; i < arity; i++) {
-			Object copy = fieldSerializers[i].copy(from.getField(i), reuse.getField(i));
-			reuse.setField(copy, i);
-		}
-		
-		return reuse;
-	}
-
-	@Override
-	public void serialize(T value, DataOutputView target) throws IOException {
-		for (int i = 0; i < arity; i++) {
-			Object o = value.getField(i);
-			try {
-				fieldSerializers[i].serialize(o, target);
-			} catch (NullPointerException npex) {
-				throw new NullFieldException(i, npex);
-			}
-		}
-	}
-
-	@Override
-	public T deserialize(DataInputView source) throws IOException {
-		T tuple = instantiateRaw();
-		for (int i = 0; i < arity; i++) {
-			Object field = fieldSerializers[i].deserialize(source);
-			tuple.setField(field, i);
-		}
-		return tuple;
-	}
-	
-	@Override
-	public T deserialize(T reuse, DataInputView source) throws IOException {
-		for (int i = 0; i < arity; i++) {
-			Object field = fieldSerializers[i].deserialize(reuse.getField(i), source);
-			reuse.setField(field, i);
-		}
-		return reuse;
-	}
-	
-	private T instantiateRaw() {
-		try {
-			return tupleClass.newInstance();
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Cannot instantiate tuple.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
deleted file mode 100644
index fc657a1..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Objects;
-
-
-public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	protected final Class<T> tupleClass;
-
-	protected final TypeSerializer<Object>[] fieldSerializers;
-
-	protected final int arity;
-
-	@SuppressWarnings("unchecked")
-	public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
-		this.tupleClass = Preconditions.checkNotNull(tupleClass);
-		this.fieldSerializers = (TypeSerializer<Object>[]) Preconditions.checkNotNull(fieldSerializers);
-		this.arity = fieldSerializers.length;
-	}
-	
-	public Class<T> getTupleClass() {
-		return this.tupleClass;
-	}
-	
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	public int getArity() {
-		return arity;
-	}
-
-	// We use this in the Aggregate and Distinct Operators to create instances
-	// of immutable Typles (i.e. Scala Tuples)
-	public abstract T createInstance(Object[] fields);
-
-	public abstract T createOrReuseInstance(Object[] fields, T reuse);
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		for (int i = 0; i < arity; i++) {
-			fieldSerializers[i].copy(source, target);
-		}
-	}
-	
-	@Override
-	public int hashCode() {
-		return 31 * Arrays.hashCode(fieldSerializers) + Objects.hash(tupleClass, arity);
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof TupleSerializerBase) {
-			TupleSerializerBase<?> other = (TupleSerializerBase<?>) obj;
-
-			return other.canEqual(this) &&
-				tupleClass == other.tupleClass &&
-				Arrays.equals(fieldSerializers, other.fieldSerializers) &&
-				arity == other.arity;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof TupleSerializerBase;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
deleted file mode 100644
index 4b9629a..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.types.NormalizableKey;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.InstantiationUtil;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-/**
- * Comparator for all Value types that extend Key
- */
-public class ValueComparator<T extends Value & Comparable<T>> extends TypeComparator<T> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private final Class<T> type;
-	
-	private final boolean ascendingComparison;
-	
-	private transient T reference;
-	
-	private transient T tempReference;
-	
-	private transient Kryo kryo;
-
-	@SuppressWarnings("rawtypes")
-	private final TypeComparator[] comparators = new TypeComparator[] {this};
-
-	public ValueComparator(boolean ascending, Class<T> type) {
-		this.type = type;
-		this.ascendingComparison = ascending;
-	}
-	
-	@Override
-	public int hash(T record) {
-		return record.hashCode();
-	}
-
-	@Override
-	public void setReference(T toCompare) {
-		checkKryoInitialized();
-
-		reference = KryoUtils.copy(toCompare, kryo, new ValueSerializer<T>(type));
-	}
-
-	@Override
-	public boolean equalToReference(T candidate) {
-		return candidate.equals(this.reference);
-	}
-
-	@Override
-	public int compareToReference(TypeComparator<T> referencedComparator) {
-		T otherRef = ((ValueComparator<T>) referencedComparator).reference;
-		int comp = otherRef.compareTo(reference);
-		return ascendingComparison ? comp : -comp;
-	}
-	
-	@Override
-	public int compare(T first, T second) {
-		int comp = first.compareTo(second);
-		return ascendingComparison ? comp : -comp;
-	}
-	
-	@Override
-	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
-		if (reference == null) {
-			reference = InstantiationUtil.instantiate(type, Value.class);
-		}
-		if (tempReference == null) {
-			tempReference = InstantiationUtil.instantiate(type, Value.class);
-		}
-		
-		reference.read(firstSource);
-		tempReference.read(secondSource);
-		int comp = reference.compareTo(tempReference);
-		return ascendingComparison ? comp : -comp;
-	}
-
-	@Override
-	public boolean supportsNormalizedKey() {
-		return NormalizableKey.class.isAssignableFrom(type);
-	}
-
-	@Override
-	public int getNormalizeKeyLen() {
-		if (reference == null) {
-			reference = InstantiationUtil.instantiate(type, Value.class);
-		}
-		
-		NormalizableKey<?> key = (NormalizableKey<?>) reference;
-		return key.getMaxNormalizedKeyLen();
-	}
-
-	@Override
-	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-		return keyBytes < getNormalizeKeyLen();
-	}
-
-	@Override
-	public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
-		NormalizableKey<?> key = (NormalizableKey<?>) record;
-		key.copyNormalizedKey(target, offset, numBytes);
-	}
-
-	@Override
-	public boolean invertNormalizedKey() {
-		return !ascendingComparison;
-	}
-	
-	@Override
-	public TypeComparator<T> duplicate() {
-		return new ValueComparator<T>(ascendingComparison, type);
-	}
-	
-	private void checkKryoInitialized() {
-		if (this.kryo == null) {
-			this.kryo = new Kryo();
-
-			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
-			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
-			kryo.setInstantiatorStrategy(instantiatorStrategy);
-
-			this.kryo.setAsmEnabled(true);
-			this.kryo.register(type);
-		}
-	}
-
-	@Override
-	public int extractKeys(Object record, Object[] target, int index) {
-		target[index] = record;
-		return 1;
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public TypeComparator[] getFlatComparators() {
-		return comparators;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// unsupported normalization
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean supportsSerializationWithKeyNormalization() {
-		return false;
-	}
-
-	@Override
-	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
deleted file mode 100644
index 9329866..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.InstantiationUtil;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-/**
- * Serializer for {@link Value} types. Uses the value's serialization methods, and uses
- * Kryo for deep object copies.
- *
- * @param <T> The type serialized.
- */
-public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
-
-	private static final long serialVersionUID = 1L;
-	
-	private final Class<T> type;
-	
-	private transient Kryo kryo;
-	
-	private transient T copyInstance;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public ValueSerializer(Class<T> type) {
-		this.type = Preconditions.checkNotNull(type);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public ValueSerializer<T> duplicate() {
-		return new ValueSerializer<T>(type);
-	}
-	
-	@Override
-	public T createInstance() {
-		return InstantiationUtil.instantiate(this.type);
-	}
-
-	@Override
-	public T copy(T from) {
-		checkKryoInitialized();
-
-		return KryoUtils.copy(from, kryo, this);
-	}
-	
-	@Override
-	public T copy(T from, T reuse) {
-		checkKryoInitialized();
-
-		return KryoUtils.copy(from, reuse, kryo, this);
-	}
-
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	@Override
-	public void serialize(T value, DataOutputView target) throws IOException {
-		value.write(target);
-	}
-
-	@Override
-	public T deserialize(DataInputView source) throws IOException {
-		return deserialize(createInstance(), source);
-	}
-	
-	@Override
-	public T deserialize(T reuse, DataInputView source) throws IOException {
-		reuse.read(source);
-		return reuse;
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		if (this.copyInstance == null) {
-			this.copyInstance = InstantiationUtil.instantiate(type);
-		}
-		
-		this.copyInstance.read(source);
-		this.copyInstance.write(target);
-	}
-	
-	private void checkKryoInitialized() {
-		if (this.kryo == null) {
-			this.kryo = new Kryo();
-
-			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
-			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
-			kryo.setInstantiatorStrategy(instantiatorStrategy);
-
-			this.kryo.setAsmEnabled(true);
-			this.kryo.register(type);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return this.type.hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof ValueSerializer) {
-			ValueSerializer<?> other = (ValueSerializer<?>) obj;
-
-			return other.canEqual(this) && type == other.type;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof ValueSerializer;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
deleted file mode 100644
index a03369a..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.types.NormalizableKey;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.io.Writable;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private Class<T> type;
-	
-	private final boolean ascendingComparison;
-	
-	private transient T reference;
-	
-	private transient T tempReference;
-	
-	private transient Kryo kryo;
-
-	@SuppressWarnings("rawtypes")
-	private final TypeComparator[] comparators = new TypeComparator[] {this};
-
-	public WritableComparator(boolean ascending, Class<T> type) {
-		this.type = type;
-		this.ascendingComparison = ascending;
-	}
-	
-	@Override
-	public int hash(T record) {
-		return record.hashCode();
-	}
-	
-	@Override
-	public void setReference(T toCompare) {
-		checkKryoInitialized();
-
-		reference = KryoUtils.copy(toCompare, kryo, new WritableSerializer<T>(type));
-	}
-	
-	@Override
-	public boolean equalToReference(T candidate) {
-		return candidate.equals(reference);
-	}
-	
-	@Override
-	public int compareToReference(TypeComparator<T> referencedComparator) {
-		T otherRef = ((WritableComparator<T>) referencedComparator).reference;
-		int comp = otherRef.compareTo(reference);
-		return ascendingComparison ? comp : -comp;
-	}
-	
-	@Override
-	public int compare(T first, T second) {
-		int comp = first.compareTo(second);
-		return ascendingComparison ? comp : -comp;
-	}
-	
-	@Override
-	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
-		ensureReferenceInstantiated();
-		ensureTempReferenceInstantiated();
-		
-		reference.readFields(firstSource);
-		tempReference.readFields(secondSource);
-		
-		int comp = reference.compareTo(tempReference);
-		return ascendingComparison ? comp : -comp;
-	}
-	
-	@Override
-	public boolean supportsNormalizedKey() {
-		return NormalizableKey.class.isAssignableFrom(type);
-	}
-	
-	@Override
-	public int getNormalizeKeyLen() {
-		ensureReferenceInstantiated();
-		
-		NormalizableKey<?> key = (NormalizableKey<?>) reference;
-		return key.getMaxNormalizedKeyLen();
-	}
-	
-	@Override
-	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-		return keyBytes < getNormalizeKeyLen();
-	}
-	
-	@Override
-	public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
-		NormalizableKey<?> key = (NormalizableKey<?>) record;
-		key.copyNormalizedKey(target, offset, numBytes);
-	}
-	
-	@Override
-	public boolean invertNormalizedKey() {
-		return !ascendingComparison;
-	}
-	
-	@Override
-	public TypeComparator<T> duplicate() {
-		return new WritableComparator<T>(ascendingComparison, type);
-	}
-
-	@Override
-	public int extractKeys(Object record, Object[] target, int index) {
-		target[index] = record;
-		return 1;
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public TypeComparator[] getFlatComparators() {
-		return comparators;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// unsupported normalization
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean supportsSerializationWithKeyNormalization() {
-		return false;
-	}
-	
-	@Override
-	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-	
-	@Override
-	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private void checkKryoInitialized() {
-		if (this.kryo == null) {
-			this.kryo = new Kryo();
-
-			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
-			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
-			kryo.setInstantiatorStrategy(instantiatorStrategy);
-
-			this.kryo.setAsmEnabled(true);
-			this.kryo.register(type);
-		}
-	}
-	
-	private void ensureReferenceInstantiated() {
-		if (reference == null) {
-			reference = InstantiationUtil.instantiate(type, Writable.class);
-		}
-	}
-	
-	private void ensureTempReferenceInstantiated() {
-		if (tempReference == null) {
-			tempReference = InstantiationUtil.instantiate(type, Writable.class);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
deleted file mode 100644
index 258d92c..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-import java.io.IOException;
-
-public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private final Class<T> typeClass;
-	
-	private transient Kryo kryo;
-	
-	private transient T copyInstance;
-	
-	public WritableSerializer(Class<T> typeClass) {
-		this.typeClass = typeClass;
-	}
-	
-	@SuppressWarnings("unchecked")
-	@Override
-	public T createInstance() {
-		if(typeClass == NullWritable.class) {
-			return (T) NullWritable.get();
-		}
-		return InstantiationUtil.instantiate(typeClass);
-	}
-
-
-	
-	@Override
-	public T copy(T from) {
-		checkKryoInitialized();
-
-		return KryoUtils.copy(from, kryo, this);
-	}
-	
-	@Override
-	public T copy(T from, T reuse) {
-		checkKryoInitialized();
-
-		return KryoUtils.copy(from, reuse, kryo, this);
-	}
-	
-	@Override
-	public int getLength() {
-		return -1;
-	}
-	
-	@Override
-	public void serialize(T record, DataOutputView target) throws IOException {
-		record.write(target);
-	}
-	
-	@Override
-	public T deserialize(DataInputView source) throws IOException {
-		return deserialize(createInstance(), source);
-	}
-	
-	@Override
-	public T deserialize(T reuse, DataInputView source) throws IOException {
-		reuse.readFields(source);
-		return reuse;
-	}
-	
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		ensureInstanceInstantiated();
-		copyInstance.readFields(source);
-		copyInstance.write(target);
-	}
-	
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-	
-	@Override
-	public WritableSerializer<T> duplicate() {
-		return new WritableSerializer<T>(typeClass);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private void ensureInstanceInstantiated() {
-		if (copyInstance == null) {
-			copyInstance = createInstance();
-		}
-	}
-	
-	private void checkKryoInitialized() {
-		if (this.kryo == null) {
-			this.kryo = new Kryo();
-
-			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
-			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
-			kryo.setInstantiatorStrategy(instantiatorStrategy);
-
-			this.kryo.setAsmEnabled(true);
-			this.kryo.register(typeClass);
-		}
-	}
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return this.typeClass.hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof WritableSerializer) {
-			WritableSerializer<?> other = (WritableSerializer<?>) obj;
-
-			return other.canEqual(this) && typeClass == other.typeClass;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof WritableSerializer;
-	}
-}


Mime
View raw message