flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [09/38] flink git commit: [FLINK-3303] [core] Move all type utilities to flink-core
Date Tue, 02 Feb 2016 17:23:09 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
deleted file mode 100644
index 5187de7..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator;
-import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer;
-import org.apache.flink.api.java.typeutils.runtime.ValueComparator;
-import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.ByteValue;
-import org.apache.flink.types.CharValue;
-import org.apache.flink.types.CopyableValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.ShortValue;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-
-/**
- * Type information for data types that extend the {@link Value} interface. The value
- * interface allows types to define their custom serialization and deserialization routines.
- *
- * @param <T> The type of the class represented by this type information.
- */
-public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implements AtomicType<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	public static final ValueTypeInfo<BooleanValue> BOOLEAN_VALUE_TYPE_INFO = new ValueTypeInfo<>(BooleanValue.class);
-	public static final ValueTypeInfo<ByteValue> BYTE_VALUE_TYPE_INFO = new ValueTypeInfo<>(ByteValue.class);
-	public static final ValueTypeInfo<CharValue> CHAR_VALUE_TYPE_INFO = new ValueTypeInfo<>(CharValue.class);
-	public static final ValueTypeInfo<DoubleValue> DOUBLE_VALUE_TYPE_INFO = new ValueTypeInfo<>(DoubleValue.class);
-	public static final ValueTypeInfo<FloatValue> FLOAT_VALUE_TYPE_INFO = new ValueTypeInfo<>(FloatValue.class);
-	public static final ValueTypeInfo<IntValue> INT_VALUE_TYPE_INFO = new ValueTypeInfo<>(IntValue.class);
-	public static final ValueTypeInfo<LongValue> LONG_VALUE_TYPE_INFO = new ValueTypeInfo<>(LongValue.class);
-	public static final ValueTypeInfo<NullValue> NULL_VALUE_TYPE_INFO = new ValueTypeInfo<>(NullValue.class);
-	public static final ValueTypeInfo<ShortValue> SHORT_VALUE_TYPE_INFO = new ValueTypeInfo<>(ShortValue.class);
-	public static final ValueTypeInfo<StringValue> STRING_VALUE_TYPE_INFO = new ValueTypeInfo<>(StringValue.class);
-
-	private final Class<T> type;
-	
-	public ValueTypeInfo(Class<T> type) {
-		this.type = Preconditions.checkNotNull(type);
-
-		Preconditions.checkArgument(
-			Value.class.isAssignableFrom(type) || type.equals(Value.class),
-			"ValueTypeInfo can only be used for subclasses of " + Value.class.getName());
-	}
-	
-	@Override
-	public int getArity() {
-		return 1;
-	}
-
-	@Override
-	public int getTotalFields() {
-		return 1;
-	}
-	
-	@Override
-	public Class<T> getTypeClass() {
-		return this.type;
-	}
-
-	@Override
-	public boolean isBasicType() {
-		return false;
-	}
-
-	public boolean isBasicValueType() {
-		return type.equals(StringValue.class) || type.equals(ByteValue.class) || type.equals(ShortValue.class) || type.equals(CharValue.class) ||
-				type.equals(DoubleValue.class) || type.equals(FloatValue.class) || type.equals(IntValue.class) || type.equals(LongValue.class) ||
-				type.equals(NullValue.class) || type.equals(BooleanValue.class);
-	}
-
-	@Override
-	public boolean isTupleType() {
-		return false;
-	}
-	
-	@Override
-	public boolean isKeyType() {
-		return Comparable.class.isAssignableFrom(type);
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
-		if (CopyableValue.class.isAssignableFrom(type)) {
-			return (TypeSerializer<T>) createCopyableValueSerializer(type.asSubclass(CopyableValue.class));
-		}
-		else {
-			return new ValueSerializer<T>(type);
-		}
-	}
-	
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	@Override
-	public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
-		if (!isKeyType()) {
-			throw new RuntimeException("The type " + type.getName() + " is not Comparable.");
-		}
-		
-		if (CopyableValue.class.isAssignableFrom(type)) {
-			return (TypeComparator<T>) new CopyableValueComparator(sortOrderAscending, type);
-		}
-		else {
-			return (TypeComparator<T>) new ValueComparator(sortOrderAscending, type);
-		}
-	}
-	
-	// utility method to summon the necessary bound
-	private static <X extends CopyableValue<X>> CopyableValueSerializer<X> createCopyableValueSerializer(Class<X> clazz) {
-		return new CopyableValueSerializer<X>(clazz);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return this.type.hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof ValueTypeInfo) {
-			@SuppressWarnings("unchecked")
-			ValueTypeInfo<T> valueTypeInfo = (ValueTypeInfo<T>) obj;
-
-			return valueTypeInfo.canEqual(this) &&
-				type == valueTypeInfo.type;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof ValueTypeInfo;
-	}
-	
-	@Override
-	public String toString() {
-		return "ValueType<" + type.getSimpleName() + ">";
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	static <X extends Value> TypeInformation<X> getValueTypeInfo(Class<X> typeClass) {
-		if (Value.class.isAssignableFrom(typeClass) && !typeClass.equals(Value.class)) {
-			return new ValueTypeInfo<X>(typeClass);
-		}
-		else {
-			throw new InvalidTypesException("The given class is no subclass of " + Value.class.getName());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
deleted file mode 100644
index 6c140d9..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.WritableComparator;
-import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Type information for data types that extend Hadoop's {@link Writable} interface. The Writable
- * interface defines the serialization and deserialization routines for the data type.
- *
- * @param <T> The type of the class represented by this type information.
- */
-public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private final Class<T> typeClass;
-	
-	public WritableTypeInfo(Class<T> typeClass) {
-		this.typeClass = Preconditions.checkNotNull(typeClass);
-
-		Preconditions.checkArgument(
-			Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class),
-			"WritableTypeInfo can only be used for subclasses of " + Writable.class.getName());
-	}
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Override
-	public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
-		if(Comparable.class.isAssignableFrom(typeClass)) {
-			return new WritableComparator(sortOrderAscending, typeClass);
-		}
-		else {
-			throw new UnsupportedOperationException("Cannot create Comparator for "+typeClass.getCanonicalName()+". " +
-													"Class does not implement Comparable interface.");
-		}
-	}
-
-	@Override
-	public boolean isBasicType() {
-		return false;
-	}
-
-	@Override
-	public boolean isTupleType() {
-		return false;
-	}
-
-	@Override
-	public int getArity() {
-		return 1;
-	}
-	
-	@Override
-	public int getTotalFields() {
-		return 1;
-	}
-
-	@Override
-	public Class<T> getTypeClass() {
-		return this.typeClass;
-	}
-
-	@Override
-	public boolean isKeyType() {
-		return Comparable.class.isAssignableFrom(typeClass);
-	}
-
-	@Override
-	public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
-		return new WritableSerializer<T>(typeClass);
-	}
-	
-	@Override
-	public String toString() {
-		return "WritableType<" + typeClass.getName() + ">";
-	}	
-	
-	@Override
-	public int hashCode() {
-		return typeClass.hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof WritableTypeInfo) {
-			@SuppressWarnings("unchecked")
-			WritableTypeInfo<T> writableTypeInfo = (WritableTypeInfo<T>) obj;
-
-			return writableTypeInfo.canEqual(this) &&
-				typeClass == writableTypeInfo.typeClass;
-
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof WritableTypeInfo;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass) {
-		if (Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class)) {
-			return new WritableTypeInfo<T>(typeClass);
-		}
-		else {
-			throw new InvalidTypesException("The given class is no subclass of " + Writable.class.getName());
-		}
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
deleted file mode 100644
index bc04367..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import com.google.common.base.Preconditions;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.util.Utf8;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-
-/**
- * General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and
- * Kryo for deep object copies. We want to change this to Kryo-only.
- *
- * @param <T> The type serialized.
- */
-public final class AvroSerializer<T> extends TypeSerializer<T> {
-
-	private static final long serialVersionUID = 1L;
-	
-	private final Class<T> type;
-	
-	private final Class<? extends T> typeToInstantiate;
-	
-	private transient ReflectDatumWriter<T> writer;
-	private transient ReflectDatumReader<T> reader;
-	
-	private transient DataOutputEncoder encoder;
-	private transient DataInputDecoder decoder;
-	
-	private transient Kryo kryo;
-	
-	private transient T deepCopyInstance;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public AvroSerializer(Class<T> type) {
-		this(type, type);
-	}
-	
-	public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) {
-		this.type = Preconditions.checkNotNull(type);
-		this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate);
-		
-		InstantiationUtil.checkForInstantiation(typeToInstantiate);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public AvroSerializer<T> duplicate() {
-		return new AvroSerializer<T>(type, typeToInstantiate);
-	}
-	
-	@Override
-	public T createInstance() {
-		return InstantiationUtil.instantiate(this.typeToInstantiate);
-	}
-
-	@Override
-	public T copy(T from) {
-		checkKryoInitialized();
-
-		return KryoUtils.copy(from, kryo, this);
-	}
-	
-	@Override
-	public T copy(T from, T reuse) {
-		checkKryoInitialized();
-
-		return KryoUtils.copy(from, reuse, kryo, this);
-	}
-
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	@Override
-	public void serialize(T value, DataOutputView target) throws IOException {
-		checkAvroInitialized();
-		this.encoder.setOut(target);
-		this.writer.write(value, this.encoder);
-	}
-	
-	@Override
-	public T deserialize(DataInputView source) throws IOException {
-		checkAvroInitialized();
-		this.decoder.setIn(source);
-		return this.reader.read(null, this.decoder);
-	}
-
-	@Override
-	public T deserialize(T reuse, DataInputView source) throws IOException {
-		checkAvroInitialized();
-		this.decoder.setIn(source);
-		return this.reader.read(reuse, this.decoder);
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		checkAvroInitialized();
-		
-		if (this.deepCopyInstance == null) {
-			this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class);
-		}
-		
-		this.decoder.setIn(source);
-		this.encoder.setOut(target);
-		
-		T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
-		this.writer.write(tmp, this.encoder);
-	}
-	
-	
-	private void checkAvroInitialized() {
-		if (this.reader == null) {
-			this.reader = new ReflectDatumReader<T>(type);
-			this.writer = new ReflectDatumWriter<T>(type);
-			this.encoder = new DataOutputEncoder();
-			this.decoder = new DataInputDecoder();
-		}
-	}
-	
-	private void checkKryoInitialized() {
-		if (this.kryo == null) {
-			this.kryo = new Kryo();
-
-			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
-			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
-			kryo.setInstantiatorStrategy(instantiatorStrategy);
-
-			// register Avro types.
-			this.kryo.register(GenericData.Array.class, new Serializers.SpecificInstanceCollectionSerializerForArrayList());
-			this.kryo.register(Utf8.class);
-			this.kryo.register(GenericData.EnumSymbol.class);
-			this.kryo.register(GenericData.Fixed.class);
-			this.kryo.register(GenericData.StringType.class);
-			this.kryo.setAsmEnabled(true);
-			this.kryo.register(type);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof AvroSerializer) {
-			@SuppressWarnings("unchecked")
-			AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj;
-
-			return avroSerializer.canEqual(this) &&
-				type == avroSerializer.type &&
-				typeToInstantiate == avroSerializer.typeToInstantiate;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof AvroSerializer;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
deleted file mode 100644
index 9b3b191..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparator.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.types.CopyableValue;
-import org.apache.flink.types.NormalizableKey;
-import org.apache.flink.util.InstantiationUtil;
-
-/**
- * Comparator for all Value types that extend Key
- */
-public class CopyableValueComparator<T extends CopyableValue<T> & Comparable<T>> extends TypeComparator<T> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private final Class<T> type;
-	
-	private final boolean ascendingComparison;
-	
-	private transient T reference;
-	
-	private transient T tempReference;
-
-	private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
-
-	public CopyableValueComparator(boolean ascending, Class<T> type) {
-		this.type = type;
-		this.ascendingComparison = ascending;
-		this.reference = InstantiationUtil.instantiate(type, CopyableValue.class);
-	}
-	
-	@Override
-	public int hash(T record) {
-		return record.hashCode();
-	}
-
-	@Override
-	public void setReference(T toCompare) {
-		toCompare.copyTo(reference);
-	}
-
-	@Override
-	public boolean equalToReference(T candidate) {
-		return candidate.equals(this.reference);
-	}
-
-	@Override
-	public int compareToReference(TypeComparator<T> referencedComparator) {
-		T otherRef = ((CopyableValueComparator<T>) referencedComparator).reference;
-		int comp = otherRef.compareTo(reference);
-		return ascendingComparison ? comp : -comp;
-	}
-	
-	@Override
-	public int compare(T first, T second) {
-		int comp = first.compareTo(second);
-		return ascendingComparison ? comp : -comp;
-	}
-	
-	@Override
-	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
-		if (tempReference == null) {
-			tempReference = InstantiationUtil.instantiate(type, CopyableValue.class);
-		}
-		
-		reference.read(firstSource);
-		tempReference.read(secondSource);
-		int comp = reference.compareTo(tempReference);
-		return ascendingComparison ? comp : -comp;
-	}
-
-	@Override
-	public boolean supportsNormalizedKey() {
-		return NormalizableKey.class.isAssignableFrom(type);
-	}
-
-	@Override
-	public int getNormalizeKeyLen() {
-		NormalizableKey<?> key = (NormalizableKey<?>) reference;
-		return key.getMaxNormalizedKeyLen();
-	}
-
-	@Override
-	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-		return keyBytes < getNormalizeKeyLen();
-	}
-
-	@Override
-	public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
-		NormalizableKey<?> key = (NormalizableKey<?>) record;
-		key.copyNormalizedKey(target, offset, numBytes);
-	}
-
-	@Override
-	public boolean invertNormalizedKey() {
-		return !ascendingComparison;
-	}
-	
-	@Override
-	public TypeComparator<T> duplicate() {
-		return new CopyableValueComparator<T>(ascendingComparison, type);
-	}
-
-	@Override
-	public int extractKeys(Object record, Object[] target, int index) {
-		target[index] = record;
-		return 1;
-	}
-
-	@Override
-	public TypeComparator<?>[] getFlatComparators() {
-		return comparators;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// unsupported normalization
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean supportsSerializationWithKeyNormalization() {
-		return false;
-	}
-
-	@Override
-	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
-		throw new UnsupportedOperationException();
-	}	
-	
-	// --------------------------------------------------------------------------------------------
-	// serialization
-	// --------------------------------------------------------------------------------------------
-	
-	private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException {
-		// read basic object and the type
-		s.defaultReadObject();
-		
-		this.reference = InstantiationUtil.instantiate(type, CopyableValue.class);
-		this.tempReference = null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
deleted file mode 100644
index 9e46f27..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.CopyableValue;
-import org.apache.flink.util.InstantiationUtil;
-
-
-public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSerializer<T> {
-
-	private static final long serialVersionUID = 1L;
-	
-	
-	private final Class<T> valueClass;
-	
-	private transient T instance;
-	
-	
-	public CopyableValueSerializer(Class<T> valueClass) {
-		this.valueClass = Preconditions.checkNotNull(valueClass);
-	}
-
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public CopyableValueSerializer<T> duplicate() {
-		return this;
-	}
-
-	@Override
-	public T createInstance() {
-		return InstantiationUtil.instantiate(this.valueClass);
-	}
-	
-	@Override
-	public T copy(T from) {
-		return copy(from, createInstance());
-	}
-	
-	@Override
-	public T copy(T from, T reuse) {
-		from.copyTo(reuse);
-		return reuse;
-	}
-
-	@Override
-	public int getLength() {
-		ensureInstanceInstantiated();
-		return instance.getBinaryLength();
-	}
-
-	@Override
-	public void serialize(T value, DataOutputView target) throws IOException {
-		value.write(target);
-	}
-
-	@Override
-	public T deserialize(DataInputView source) throws IOException {
-		return deserialize(createInstance(), source);
-	}
-	
-	@Override
-	public T deserialize(T reuse, DataInputView source) throws IOException {
-		reuse.read(source);
-		return reuse;
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		ensureInstanceInstantiated();
-		instance.copy(source, target);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private void ensureInstanceInstantiated() {
-		if (instance == null) {
-			instance = createInstance();
-		}
-	}
-	
-	@Override
-	public int hashCode() {
-		return this.valueClass.hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof CopyableValueSerializer) {
-			@SuppressWarnings("unchecked")
-			CopyableValueSerializer<T> copyableValueSerializer = (CopyableValueSerializer<T>) obj;
-
-			return copyableValueSerializer.canEqual(this) &&
-				valueClass == copyableValueSerializer.valueClass;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof CopyableValueSerializer;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java
deleted file mode 100644
index e48f9fa..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Decoder;
-import org.apache.avro.util.Utf8;
-
-
-public class DataInputDecoder extends Decoder implements java.io.Serializable {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private transient Utf8 stringDecoder = new Utf8();
-	
-	
-	private transient DataInput in;
-	
-	
-	public void setIn(DataInput in) {
-		this.in = in;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// primitives
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void readNull() {}
-	
-
-	@Override
-	public boolean readBoolean() throws IOException {
-		return in.readBoolean();
-	}
-
-	@Override
-	public int readInt() throws IOException {
-		return in.readInt();
-	}
-
-	@Override
-	public long readLong() throws IOException {
-		return in.readLong();
-	}
-
-	@Override
-	public float readFloat() throws IOException {
-		return in.readFloat();
-	}
-
-	@Override
-	public double readDouble() throws IOException {
-		return in.readDouble();
-	}
-	
-	@Override
-	public int readEnum() throws IOException {
-		return readInt();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// bytes
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void readFixed(byte[] bytes, int start, int length) throws IOException {
-		in.readFully(bytes, start, length);
-	}
-	
-	@Override
-	public ByteBuffer readBytes(ByteBuffer old) throws IOException {
-		int length = readInt();
-		ByteBuffer result;
-		if (old != null && length <= old.capacity() && old.hasArray()) {
-			result = old;
-			result.clear();
-		} else {
-			result = ByteBuffer.allocate(length);
-		}
-		in.readFully(result.array(), result.arrayOffset() + result.position(), length);
-		result.limit(length);
-		return result;
-	}
-	
-	
-	@Override
-	public void skipFixed(int length) throws IOException {
-		skipBytes(length);
-	}
-	
-	@Override
-	public void skipBytes() throws IOException {
-		int num = readInt();
-		skipBytes(num);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// strings
-	// --------------------------------------------------------------------------------------------
-	
-	
-	@Override
-	public Utf8 readString(Utf8 old) throws IOException {
-		int length = readInt();
-		Utf8 result = (old != null ? old : new Utf8());
-		result.setByteLength(length);
-		
-		if (length > 0) {
-			in.readFully(result.getBytes(), 0, length);
-		}
-		
-		return result;
-	}
-
-	@Override
-	public String readString() throws IOException {
-		return readString(stringDecoder).toString();
-	}
-
-	@Override
-	public void skipString() throws IOException {
-		int len = readInt();
-		skipBytes(len);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// collection types
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public long readArrayStart() throws IOException {
-		return readVarLongCount(in);
-	}
-
-	@Override
-	public long arrayNext() throws IOException {
-		return readVarLongCount(in);
-	}
-
-	@Override
-	public long skipArray() throws IOException {
-		return readVarLongCount(in);
-	}
-
-	@Override
-	public long readMapStart() throws IOException {
-		return readVarLongCount(in);
-	}
-
-	@Override
-	public long mapNext() throws IOException {
-		return readVarLongCount(in);
-	}
-
-	@Override
-	public long skipMap() throws IOException {
-		return readVarLongCount(in);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// union
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public int readIndex() throws IOException {
-		return readInt();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// utils
-	// --------------------------------------------------------------------------------------------
-	
-	private void skipBytes(int num) throws IOException {
-		while (num > 0) {
-			num -= in.skipBytes(num);
-		}
-	}
-	
-	public static long readVarLongCount(DataInput in) throws IOException {
-		long value = in.readUnsignedByte();
-
-		if ((value & 0x80) == 0) {
-			return value;
-		}
-		else {
-			long curr;
-			int shift = 7;
-			value = value & 0x7f;
-			while (((curr = in.readUnsignedByte()) & 0x80) != 0){
-				value |= (curr & 0x7f) << shift;
-				shift += 7;
-			}
-			value |= curr << shift;
-			return value;
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// serialization
-	// --------------------------------------------------------------------------------------------
-	
-	private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException {
-		// Read in size, and any hidden stuff
-		s.defaultReadObject();
-		
-		this.stringDecoder = new Utf8();
-		this.in = null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java
deleted file mode 100644
index be17d64..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.core.memory.DataInputView;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * An input stream that draws its data from a {@link DataInputView}.
- */
-public class DataInputViewStream extends InputStream {
-	
-	protected DataInputView inputView;
-
-	public DataInputViewStream(DataInputView inputView) {
-		this.inputView = inputView;
-	}
-
-	public DataInputView getInputView(){
-		return inputView;
-	}
-
-	@Override
-	public int read() throws IOException {
-		try {
-			return inputView.readUnsignedByte();
-		} catch(EOFException ex) {
-			return -1;
-		}
-	}
-
-	@Override
-	public long skip(long n) throws IOException {
-		long counter = n;
-		while(counter > Integer.MAX_VALUE) {
-			int skippedBytes = inputView.skipBytes(Integer.MAX_VALUE);
-
-			if (skippedBytes == 0) {
-				return n - counter;
-			}
-
-			counter -= skippedBytes;
-		}
-		return n - counter - inputView.skipBytes((int) counter);
-	}
-
-	@Override
-	public int read(byte[] b, int off, int len) throws IOException {
-		return inputView.read(b, off, len);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java
deleted file mode 100644
index 5c89962..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Encoder;
-import org.apache.avro.util.Utf8;
-
-
-public final class DataOutputEncoder extends Encoder implements java.io.Serializable {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private transient DataOutput out;
-	
-	
-	public void setOut(DataOutput out) {
-		this.out = out;
-	}
-
-
-	@Override
-	public void flush() throws IOException {}
-
-	// --------------------------------------------------------------------------------------------
-	// primitives
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void writeNull() {}
-	
-
-	@Override
-	public void writeBoolean(boolean b) throws IOException {
-		out.writeBoolean(b);
-	}
-
-	@Override
-	public void writeInt(int n) throws IOException {
-		out.writeInt(n);
-	}
-
-	@Override
-	public void writeLong(long n) throws IOException {
-		out.writeLong(n);
-	}
-
-	@Override
-	public void writeFloat(float f) throws IOException {
-		out.writeFloat(f);
-	}
-
-	@Override
-	public void writeDouble(double d) throws IOException {
-		out.writeDouble(d);
-	}
-	
-	@Override
-	public void writeEnum(int e) throws IOException {
-		out.writeInt(e);
-	}
-	
-	
-	// --------------------------------------------------------------------------------------------
-	// bytes
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void writeFixed(byte[] bytes, int start, int len) throws IOException {
-		out.write(bytes, start, len);
-	}
-	
-	@Override
-	public void writeBytes(byte[] bytes, int start, int len) throws IOException {
-		out.writeInt(len);
-		if (len > 0) {
-			out.write(bytes, start, len);
-		}
-	}
-	
-	@Override
-	public void writeBytes(ByteBuffer bytes) throws IOException {
-		int num = bytes.remaining();
-		out.writeInt(num);
-		
-		if (num > 0) {
-			writeFixed(bytes);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// strings
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void writeString(String str) throws IOException {
-		byte[] bytes = Utf8.getBytesFor(str);
-		writeBytes(bytes, 0, bytes.length);
-	}
-	
-	@Override
-	public void writeString(Utf8 utf8) throws IOException {
-		writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
-		
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// collection types
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void writeArrayStart() {}
-
-	@Override
-	public void setItemCount(long itemCount) throws IOException {
-		if (itemCount > 0) {
-			writeVarLongCount(out, itemCount);
-		}
-	}
-
-	@Override
-	public void startItem() {}
-
-	@Override
-	public void writeArrayEnd() throws IOException {
-		// write a single byte 0, shortcut for a var-length long of 0
-		out.write(0);
-	}
-
-	@Override
-	public void writeMapStart() {}
-
-	@Override
-	public void writeMapEnd() throws IOException {
-		// write a single byte 0, shortcut for a var-length long of 0
-		out.write(0);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// union
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void writeIndex(int unionIndex) throws IOException {
-		out.writeInt(unionIndex);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// utils
-	// --------------------------------------------------------------------------------------------
-		
-	
-	public static final void writeVarLongCount(DataOutput out, long val) throws IOException {
-		if (val < 0) {
-			throw new IOException("Illegal count (must be non-negative): " + val);
-		}
-		
-		while ((val & ~0x7FL) != 0) {
-			out.write(((int) val) | 0x80);
-			val >>>= 7;
-		}
-		out.write((int) val);
-	}
-	
-	private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException {
-		// Read in size, and any hidden stuff
-		s.defaultReadObject();
-
-		this.out = null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java
deleted file mode 100644
index 66f2af6..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.core.memory.DataOutputView;
-import java.io.IOException;
-import java.io.OutputStream;
-
-public class DataOutputViewStream extends OutputStream {
-	protected DataOutputView outputView;
-
-	public DataOutputViewStream(DataOutputView outputView){
-		this.outputView = outputView;
-	}
-
-	@Override
-	public void write(int b) throws IOException {
-		outputView.writeByte(b);
-	}
-
-	@Override
-	public void write(byte[] b, int off, int len) throws IOException {
-		outputView.write(b, off, len);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
deleted file mode 100644
index b4b95f3..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import static org.apache.flink.api.java.typeutils.Either.Left;
-import static org.apache.flink.api.java.typeutils.Either.Right;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.Either;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * A {@link TypeSerializer} for the {@ link Either} type of the Java class.
- *
- * @param <L> the Left value type
- * @param <R> the Right value type
- */
-public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> {
-
-	private static final long serialVersionUID = 1L;
-
-	private final TypeSerializer<L> leftSerializer;
-
-	private final TypeSerializer<R> rightSerializer;
-
-	public EitherSerializer(TypeSerializer<L> leftSerializer, TypeSerializer<R> rightSerializer) {
-		this.leftSerializer = leftSerializer;
-		this.rightSerializer = rightSerializer;
-	}
-
-	@Override
-	public boolean isImmutableType() {
-		return leftSerializer.isImmutableType() && rightSerializer.isImmutableType();
-	}
-
-	@Override
-	public TypeSerializer<Either<L, R>> duplicate() {
-		TypeSerializer<L> duplicateLeft = leftSerializer.duplicate();
-		TypeSerializer<R> duplicateRight = rightSerializer.duplicate();
-
-		if ((leftSerializer != duplicateLeft) || (rightSerializer != duplicateRight)) {
-			// stateful
-			return new EitherSerializer<L, R>(duplicateLeft, duplicateRight);
-		}
-		else {
-			return this;
-		}
-	}
-
-
-	@Override
-	public Either<L, R> createInstance() {
-		// We arbitrarily always create a Right value instance.
-		return Right(rightSerializer.createInstance());
-	}
-
-	@Override
-	public Either<L, R> copy(Either<L, R> from) {
-		if (from.isLeft()) {
-			L left = from.left();
-			L copyLeft = leftSerializer.copy(left);
-			return Left(copyLeft);
-		}
-		else {
-			R right = from.right();
-			R copyRight = rightSerializer.copy(right);
-			return Right(copyRight);
-		}
-	}
-
-	@Override
-	public Either<L, R> copy(Either<L, R> from, Either<L, R> reuse) {
-		if (from.isRight()) {
-			final R right = from.right();
-			if (reuse.isRight()) {
-				R copyRight = rightSerializer.copy(right, reuse.right());
-				return Right(copyRight);
-			}
-			else {
-				// if the reuse record isn't a right value, we cannot reuse
-				R copyRight = rightSerializer.copy(right);
-				return Right(copyRight);
-			}
-		}
-		else {
-			L left = from.left();
-			// reuse record is never a left value because we always create a right instance
-			L copyLeft = leftSerializer.copy(left);
-			return Left(copyLeft);
-		}
-	}
-
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	@Override
-	public void serialize(Either<L, R> record, DataOutputView target) throws IOException {
-		if (record.isLeft()) {
-			target.writeBoolean(true);
-			leftSerializer.serialize(record.left(), target);
-		}
-		else {
-			target.writeBoolean(false);
-			rightSerializer.serialize(record.right(), target);
-		}
-	}
-
-	@Override
-	public Either<L, R> deserialize(DataInputView source) throws IOException {
-		boolean isLeft = source.readBoolean();
-		if (isLeft) {
-			return Left(leftSerializer.deserialize(source));
-		}
-		else {
-			return Right(rightSerializer.deserialize(source));
-		}
-	}
-
-	@Override
-	public Either<L, R> deserialize(Either<L, R> reuse, DataInputView source) throws IOException {
-		boolean isLeft = source.readBoolean();
-		if (!isLeft) {
-			if (reuse.isRight()) {
-				return Right(rightSerializer.deserialize(reuse.right(), source));
-			}
-			else {
-				// if the reuse record isn't a right value, we cannot reuse
-				return Right(rightSerializer.deserialize(source));
-			}
-		}
-		else {
-			// reuse record is never a left value because we always create a right instance
-			return Left(leftSerializer.deserialize(source));
-		}
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		boolean isLeft = source.readBoolean();
-		target.writeBoolean(isLeft);
-		if (isLeft) {
-			leftSerializer.copy(source, target);
-		}
-		else {
-			rightSerializer.copy(source, target);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof EitherSerializer) {
-			EitherSerializer<L, R> other = (EitherSerializer<L, R>) obj;
-
-			return other.canEqual(this) &&
-				leftSerializer.equals(other.leftSerializer) &&
-				rightSerializer.equals(other.rightSerializer);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof EitherSerializer;
-	}
-
-	@Override
-	public int hashCode() {
-		return 17 * leftSerializer.hashCode() + rightSerializer.hashCode();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
deleted file mode 100644
index 28fea6a..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.types.NormalizableKey;
-import org.apache.flink.util.InstantiationUtil;
-
-/**
- * TypeComparator for all types that extend Comparable.
- */
-public class GenericTypeComparator<T extends Comparable<T>> extends TypeComparator<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	private final boolean ascending;
-
-	private final Class<T> type;
-
-	private TypeSerializer<T> serializer;
-
-	private transient T reference;
-
-	private transient T tmpReference;
-
-	@SuppressWarnings("rawtypes")
-	private final TypeComparator[] comparators = new TypeComparator[] {this};
-
-	// ------------------------------------------------------------------------
-
-	public GenericTypeComparator(boolean ascending, TypeSerializer<T> serializer, Class<T> type) {
-		this.ascending = ascending;
-		this.serializer = serializer;
-		this.type = type;
-	}
-
-	private GenericTypeComparator(GenericTypeComparator<T> toClone) {
-		this.ascending = toClone.ascending;
-		this.serializer = toClone.serializer.duplicate();
-		this.type = toClone.type;
-	}
-
-	@Override
-	public int hash(T record) {
-		return record.hashCode();
-	}
-
-	@Override
-	public void setReference(T toCompare) {
-		this.reference = this.serializer.copy(toCompare);
-	}
-
-	@Override
-	public boolean equalToReference(T candidate) {
-		return candidate.equals(this.reference);
-	}
-
-	@Override
-	public int compareToReference(TypeComparator<T> referencedComparator) {
-		T otherRef = ((GenericTypeComparator<T>) referencedComparator).reference;
-		int cmp = otherRef.compareTo(this.reference);
-
-		return this.ascending ? cmp : -cmp;
-	}
-
-	@Override
-	public int compare(T first, T second) {
-		int cmp = first.compareTo(second);
-		return this.ascending ? cmp : -cmp;
-	}
-
-	@Override
-	public int compareSerialized(final DataInputView firstSource, final DataInputView secondSource) throws IOException {
-
-		if (this.reference == null) {
-			this.reference = this.serializer.createInstance();
-		}
-
-		if (this.tmpReference == null) {
-			this.tmpReference = this.serializer.createInstance();
-		}
-
-		this.reference = this.serializer.deserialize(this.reference, firstSource);
-		this.tmpReference = this.serializer.deserialize(this.tmpReference, secondSource);
-
-		int cmp = this.reference.compareTo(this.tmpReference);
-		return this.ascending ? cmp : -cmp;
-	}
-
-	@Override
-	public boolean supportsNormalizedKey() {
-		return NormalizableKey.class.isAssignableFrom(this.type);
-	}
-
-	@Override
-	public int getNormalizeKeyLen() {
-		if (this.reference == null) {
-			this.reference = InstantiationUtil.instantiate(this.type);
-		}
-
-		NormalizableKey<?> key = (NormalizableKey<?>) this.reference;
-		return key.getMaxNormalizedKeyLen();
-	}
-
-	@Override
-	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-		return keyBytes < getNormalizeKeyLen();
-	}
-
-	@Override
-	public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
-		NormalizableKey<?> key = (NormalizableKey<?>) record;
-		key.copyNormalizedKey(target, offset, numBytes);
-	}
-
-	@Override
-	public boolean invertNormalizedKey() {
-		return !ascending;
-	}
-
-	@Override
-	public TypeComparator<T> duplicate() {
-		return new GenericTypeComparator<T>(this);
-	}
-
-	@Override
-	public int extractKeys(Object record, Object[] target, int index) {
-		target[index] = record;
-		return 1;
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public TypeComparator[] getFlatComparators() {
-		return comparators;
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public boolean supportsSerializationWithKeyNormalization() {
-		return false;
-	}
-
-	@Override
-	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java
deleted file mode 100644
index faf5646..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.KryoException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.IOException;
-
-/**
- * Convenience methods for Kryo
- */
-public class KryoUtils {
-
-	/**
-	 * Tries to copy the given record from using the provided Kryo instance. If this fails, then
-	 * the record from is copied by serializing it into a byte buffer and deserializing it from
-	 * there.
-	 *
-	 * @param from Element to copy
-	 * @param kryo Kryo instance to use
-	 * @param serializer TypeSerializer which is used in case of a Kryo failure
-	 * @param <T> Type of the element to be copied
-	 * @return Copied element
-	 */
-	public static <T> T copy(T from, Kryo kryo, TypeSerializer<T> serializer) {
-		try {
-			return kryo.copy(from);
-		} catch (KryoException ke) {
-			// Kryo could not copy the object --> try to serialize/deserialize the object
-			try {
-				byte[] byteArray = InstantiationUtil.serializeToByteArray(serializer, from);
-
-				return InstantiationUtil.deserializeFromByteArray(serializer, byteArray);
-			} catch (IOException ioe) {
-				throw new RuntimeException("Could not copy object by serializing/deserializing" +
-					" it.", ioe);
-			}
-		}
-	}
-
-	/**
-	 * Tries to copy the given record from using the provided Kryo instance. If this fails, then
-	 * the record from is copied by serializing it into a byte buffer and deserializing it from
-	 * there.
-	 *
-	 * @param from Element to copy
-	 * @param reuse Reuse element for the deserialization
-	 * @param kryo Kryo instance to use
-	 * @param serializer TypeSerializer which is used in case of a Kryo failure
-	 * @param <T> Type of the element to be copied
-	 * @return Copied element
-	 */
-	public static <T> T copy(T from, T reuse, Kryo kryo, TypeSerializer<T> serializer) {
-		try {
-			return kryo.copy(from);
-		} catch (KryoException ke) {
-			// Kryo could not copy the object --> try to serialize/deserialize the object
-			try {
-				byte[] byteArray = InstantiationUtil.serializeToByteArray(serializer, from);
-
-				return InstantiationUtil.deserializeFromByteArray(serializer, reuse, byteArray);
-			} catch (IOException ioe) {
-				throw new RuntimeException("Could not copy object by serializing/deserializing" +
-					" it.", ioe);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java
deleted file mode 100644
index 0f4fe94..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import com.esotericsoftware.kryo.KryoException;
-import com.esotericsoftware.kryo.io.Input;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
-public class NoFetchingInput extends Input {
-	public NoFetchingInput(InputStream inputStream){
-		super(inputStream, 8);
-	}
-
-	@Override
-	public boolean eof(){
-		throw new UnsupportedOperationException("NoFetchingInput does not support EOF.");
-	}
-
-	@Override
-	public int read() throws KryoException {
-		require(1);
-		return buffer[position++] & 0xFF;
-	}
-
-	@Override
-	public boolean canReadInt() throws KryoException {
-		throw new UnsupportedOperationException("NoFetchingInput cannot prefetch data.");
-	}
-
-	@Override
-	public boolean canReadLong() throws KryoException {
-		throw new UnsupportedOperationException("NoFetchingInput cannot prefetch data.");
-	}
-
-	/**
-	 * Require makes sure that at least required number of bytes are kept in the buffer. If not, then
-	 * it will load exactly the difference between required and currently available number of bytes.
-	 * Thus, it will only load the data which is required and never prefetch data.
-	 *
-	 * @param required the number of bytes being available in the buffer
-	 * @return the number of bytes remaining, which is equal to required
-	 * @throws KryoException
-	 */
-	@Override
-	protected int require(int required) throws KryoException {
-		if(required > capacity) {
-			throw new KryoException("Buffer too small: capacity: " + capacity + ", " +
-					"required: " + required);
-		}
-
-		position = 0;
-		int bytesRead = 0;
-		int count;
-		while(true){
-			count = fill(buffer, bytesRead, required - bytesRead);
-
-			if(count == -1){
-				throw new KryoException(new EOFException("No more bytes left."));
-			}
-
-			bytesRead += count;
-			if(bytesRead == required){
-				break;
-			}
-		}
-		limit = required;
-		return required;
-	}
-
-	@Override
-	public int read(byte[] bytes, int offset, int count) throws KryoException {
-		if(bytes == null){
-			throw new IllegalArgumentException("bytes cannot be null.");
-		}
-
-		try {
-			return inputStream.read(bytes, offset, count);
-		}catch(IOException ex){
-			throw new KryoException(ex);
-		}
-	}
-
-	@Override
-	public void skip(int count) throws KryoException {
-		try{
-			inputStream.skip(count);
-		}catch(IOException ex){
-			throw new KryoException(ex);
-		}
-	}
-
-	@Override
-	public void readBytes(byte[] bytes, int offset, int count) throws KryoException {
-		if(bytes == null){
-			throw new IllegalArgumentException("bytes cannot be null.");
-		}
-
-		try{
-			int bytesRead = 0;
-			int c;
-
-			while(true){
-				c = inputStream.read(bytes, offset+bytesRead, count-bytesRead);
-
-				if(c == -1){
-					throw new KryoException(new EOFException("No more bytes left."));
-				}
-
-				bytesRead += c;
-
-				if(bytesRead == count){
-					break;
-				}
-			}
-		}catch(IOException ex){
-			throw new KryoException(ex);
-		}
-	}
-
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
deleted file mode 100644
index c0c7797..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
+++ /dev/null
@@ -1,354 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Field;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.types.NullKeyFieldException;
-import org.apache.flink.util.InstantiationUtil;
-
-
-public final class PojoComparator<T> extends CompositeTypeComparator<T> implements java.io.Serializable {
-	
-	private static final long serialVersionUID = 1L;
-
-	// Reflection fields for the comp fields
-	private transient Field[] keyFields;
-
-	private final TypeComparator<Object>[] comparators;
-
-	private final int[] normalizedKeyLengths;
-
-	private final int numLeadingNormalizableKeys;
-
-	private final int normalizableKeyPrefixLen;
-
-	private final boolean invertNormKey;
-
-	private TypeSerializer<T> serializer;
-
-	private final Class<T> type;
-
-	@SuppressWarnings("unchecked")
-	public PojoComparator(Field[] keyFields, TypeComparator<?>[] comparators, TypeSerializer<T> serializer, Class<T> type) {
-		this.keyFields = keyFields;
-		this.comparators = (TypeComparator<Object>[]) comparators;
-
-		this.type = type;
-		this.serializer = serializer;
-
-		// set up auxiliary fields for normalized key support
-		this.normalizedKeyLengths = new int[keyFields.length];
-		int nKeys = 0;
-		int nKeyLen = 0;
-		boolean inverted = false;
-
-		for (int i = 0; i < this.comparators.length; i++) {
-			TypeComparator<?> k = this.comparators[i];
-			if(k == null) {
-				throw new IllegalArgumentException("One of the passed comparators is null");
-			}
-			if(keyFields[i] == null) {
-				throw new IllegalArgumentException("One of the passed reflection fields is null");
-			}
-
-			// as long as the leading keys support normalized keys, we can build up the composite key
-			if (k.supportsNormalizedKey()) {
-				if (i == 0) {
-					// the first comparator decides whether we need to invert the key direction
-					inverted = k.invertNormalizedKey();
-				}
-				else if (k.invertNormalizedKey() != inverted) {
-					// if a successor does not agree on the invertion direction, it cannot be part of the normalized key
-					break;
-				}
-
-				nKeys++;
-				final int len = k.getNormalizeKeyLen();
-				if (len < 0) {
-					throw new RuntimeException("Comparator " + k.getClass().getName() + " specifies an invalid length for the normalized key: " + len);
-				}
-				this.normalizedKeyLengths[i] = len;
-				nKeyLen += this.normalizedKeyLengths[i];
-
-				if (nKeyLen < 0) {
-					// overflow, which means we are out of budget for normalized key space anyways
-					nKeyLen = Integer.MAX_VALUE;
-					break;
-				}
-			} else {
-				break;
-			}
-		}
-		this.numLeadingNormalizableKeys = nKeys;
-		this.normalizableKeyPrefixLen = nKeyLen;
-		this.invertNormKey = inverted;
-	}
-
-	@SuppressWarnings("unchecked")
-	private PojoComparator(PojoComparator<T> toClone) {
-		this.keyFields = toClone.keyFields;
-		this.comparators = new TypeComparator[toClone.comparators.length];
-
-		for (int i = 0; i < toClone.comparators.length; i++) {
-			this.comparators[i] = toClone.comparators[i].duplicate();
-		}
-
-		this.normalizedKeyLengths = toClone.normalizedKeyLengths;
-		this.numLeadingNormalizableKeys = toClone.numLeadingNormalizableKeys;
-		this.normalizableKeyPrefixLen = toClone.normalizableKeyPrefixLen;
-		this.invertNormKey = toClone.invertNormKey;
-
-		this.type = toClone.type;
-
-		try {
-			this.serializer = (TypeSerializer<T>) InstantiationUtil.deserializeObject(
-					InstantiationUtil.serializeObject(toClone.serializer), Thread.currentThread().getContextClassLoader());
-		} catch (IOException e) {
-			throw new RuntimeException("Cannot copy serializer", e);
-		} catch (ClassNotFoundException e) {
-			throw new RuntimeException("Cannot copy serializer", e);
-		}
-	}
-
-	private void writeObject(ObjectOutputStream out)
-			throws IOException, ClassNotFoundException {
-		out.defaultWriteObject();
-		out.writeInt(keyFields.length);
-		for (Field field: keyFields) {
-			out.writeObject(field.getDeclaringClass());
-			out.writeUTF(field.getName());
-		}
-	}
-
-	private void readObject(ObjectInputStream in)
-			throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-		int numKeyFields = in.readInt();
-		keyFields = new Field[numKeyFields];
-		for (int i = 0; i < numKeyFields; i++) {
-			Class<?> clazz = (Class<?>) in.readObject();
-			String fieldName = in.readUTF();
-			// try superclasses as well
-			while (clazz != null) {
-				try {
-					Field field = clazz.getDeclaredField(fieldName);
-					field.setAccessible(true);
-					keyFields[i] = field;
-					break;
-				} catch (NoSuchFieldException e) {
-					clazz = clazz.getSuperclass();
-				}
-			}
-			if (keyFields[i] == null ) {
-				throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup."
-						+ " (" + fieldName + ")");
-			}
-		}
-	}
-
-	public Field[] getKeyFields() {
-		return this.keyFields;
-	}
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Override
-	public void getFlatComparator(List<TypeComparator> flatComparators) {
-		for(int i = 0; i < comparators.length; i++) {
-			if(comparators[i] instanceof CompositeTypeComparator) {
-				((CompositeTypeComparator)comparators[i]).getFlatComparator(flatComparators);
-			} else {
-				flatComparators.add(comparators[i]);
-			}
-		}
-	}
-	
-	/**
-	 * This method is handling the IllegalAccess exceptions of Field.get()
-	 */
-	public final Object accessField(Field field, Object object) {
-		try {
-			object = field.get(object);
-		} catch (NullPointerException npex) {
-			throw new NullKeyFieldException("Unable to access field "+field+" on object "+object);
-		} catch (IllegalAccessException iaex) {
-			throw new RuntimeException("This should not happen since we call setAccesssible(true) in PojoTypeInfo."
-			+ " fields: " + field + " obj: " + object);
-		}
-		return object;
-	}
-
-	@Override
-	public int hash(T value) {
-		int i = 0;
-		int code = 0;
-		for (; i < this.keyFields.length; i++) {
-			code *= TupleComparatorBase.HASH_SALT[i & 0x1F];
-			try {
-				code += this.comparators[i].hash(accessField(keyFields[i], value));
-			}catch(NullPointerException npe) {
-				throw new RuntimeException("A NullPointerException occured while accessing a key field in a POJO. " +
-						"Most likely, the value grouped/joined on is null. Field name: "+keyFields[i].getName(), npe);
-			}
-		}
-		return code;
-
-	}
-
-	@Override
-	public void setReference(T toCompare) {
-		int i = 0;
-		for (; i < this.keyFields.length; i++) {
-			this.comparators[i].setReference(accessField(keyFields[i], toCompare));
-		}
-	}
-
-	@Override
-	public boolean equalToReference(T candidate) {
-		int i = 0;
-		for (; i < this.keyFields.length; i++) {
-			if (!this.comparators[i].equalToReference(accessField(keyFields[i], candidate))) {
-				return false;
-			}
-		}
-		return true;
-	}
-
-	@Override
-	public int compareToReference(TypeComparator<T> referencedComparator) {
-		PojoComparator<T> other = (PojoComparator<T>) referencedComparator;
-
-		int i = 0;
-		try {
-			for (; i < this.keyFields.length; i++) {
-				int cmp = this.comparators[i].compareToReference(other.comparators[i]);
-				if (cmp != 0) {
-					return cmp;
-				}
-			}
-			return 0;
-		}
-		catch (NullPointerException npex) {
-			throw new NullKeyFieldException(this.keyFields[i].toString());
-		}
-	}
-
-	@Override
-	public int compare(T first, T second) {
-		int i = 0;
-		for (; i < keyFields.length; i++) {
-			int cmp = comparators[i].compare(accessField(keyFields[i], first), accessField(keyFields[i], second));
-			if (cmp != 0) {
-				return cmp;
-			}
-		}
-
-		return 0;
-	}
-
-	
-	@Override
-	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
-		T first = this.serializer.createInstance();
-		T second = this.serializer.createInstance();
-
-		first = this.serializer.deserialize(first, firstSource);
-		second = this.serializer.deserialize(second, secondSource);
-
-		return this.compare(first, second);
-	}
-
-	@Override
-	public boolean supportsNormalizedKey() {
-		return this.numLeadingNormalizableKeys > 0;
-	}
-
-	@Override
-	public int getNormalizeKeyLen() {
-		return this.normalizableKeyPrefixLen;
-	}
-
-	@Override
-	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-		return this.numLeadingNormalizableKeys < this.keyFields.length ||
-				this.normalizableKeyPrefixLen == Integer.MAX_VALUE ||
-				this.normalizableKeyPrefixLen > keyBytes;
-	}
-
-	@Override
-	public void putNormalizedKey(T value, MemorySegment target, int offset, int numBytes) {
-		int i = 0;
-		for (; i < this.numLeadingNormalizableKeys & numBytes > 0; i++)
-		{
-			int len = this.normalizedKeyLengths[i];
-			len = numBytes >= len ? len : numBytes;
-			this.comparators[i].putNormalizedKey(accessField(keyFields[i], value), target, offset, len);
-			numBytes -= len;
-			offset += len;
-		}
-	}
-
-	@Override
-	public boolean invertNormalizedKey() {
-		return this.invertNormKey;
-	}
-
-
-	@Override
-	public boolean supportsSerializationWithKeyNormalization() {
-		return false;
-	}
-
-	@Override
-	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public PojoComparator<T> duplicate() {
-		return new PojoComparator<T>(this);
-	}
-
-	@Override
-	public int extractKeys(Object record, Object[] target, int index) {
-		int localIndex = index;
-		for (int i = 0; i < comparators.length; i++) {
-			localIndex += comparators[i].extractKeys(accessField(keyFields[i], record), target, localIndex);
-		}
-		return localIndex - index;
-	}
-
-	// --------------------------------------------------------------------------------------------
-}
-


Mime
View raw message