flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [6/9] flink git commit: [FLINK-2637] [api-breaking] [types] Adds equals and hashCode method to TypeInformations and TypeSerializers - Fixes ObjectArrayTypeInfo - Makes CompositeTypes serializable - Adds test for equality relation's symmetric property - A
Date Thu, 17 Sep 2015 11:24:09 GMT
[FLINK-2637] [api-breaking] [types] Adds equals and hashCode method to TypeInformations and TypeSerializers
- Fixes ObjectArrayTypeInfo
- Makes CompositeTypes serializable
- Adds test for equality relation's symmetric property
- Adds test for PojoTypeInfo serializability

This closes #1134


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ca853e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ca853e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ca853e0

Branch: refs/heads/master
Commit: 8ca853e0f6c18be8e6b066c6ec0f23badb797323
Parents: 2c9e2c8
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Sep 8 01:12:09 2015 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Thu Sep 17 11:19:33 2015 +0200

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       |  56 ++++++
 .../api/common/typeinfo/BasicArrayTypeInfo.java |  44 ++++-
 .../api/common/typeinfo/BasicTypeInfo.java      |  29 ++-
 .../api/common/typeinfo/FractionalTypeInfo.java |  16 +-
 .../api/common/typeinfo/IntegerTypeInfo.java    |  19 +-
 .../api/common/typeinfo/NothingTypeInfo.java    |  26 +++
 .../api/common/typeinfo/NumericTypeInfo.java    |  23 ++-
 .../common/typeinfo/PrimitiveArrayTypeInfo.java |  45 +++--
 .../api/common/typeinfo/TypeInformation.java    |  17 ++
 .../api/common/typeutils/CompositeType.java     | 130 +++++++++----
 .../api/common/typeutils/TypeSerializer.java    |  12 ++
 .../typeutils/base/BooleanSerializer.java       |   5 +
 .../typeutils/base/BooleanValueSerializer.java  |   5 +
 .../common/typeutils/base/ByteSerializer.java   |   5 +
 .../typeutils/base/ByteValueSerializer.java     |   5 +
 .../common/typeutils/base/CharSerializer.java   |   5 +
 .../typeutils/base/CharValueSerializer.java     |   5 +
 .../common/typeutils/base/DateSerializer.java   |   5 +
 .../common/typeutils/base/DoubleSerializer.java |   5 +
 .../typeutils/base/DoubleValueSerializer.java   |   5 +
 .../common/typeutils/base/EnumSerializer.java   |  16 +-
 .../common/typeutils/base/FloatSerializer.java  |   5 +
 .../typeutils/base/FloatValueSerializer.java    |   5 +
 .../typeutils/base/GenericArraySerializer.java  |  35 ++--
 .../common/typeutils/base/IntSerializer.java    |   5 +
 .../typeutils/base/IntValueSerializer.java      |   5 +
 .../common/typeutils/base/LongSerializer.java   |   5 +
 .../typeutils/base/LongValueSerializer.java     |   5 +
 .../common/typeutils/base/ShortSerializer.java  |   5 +
 .../typeutils/base/ShortValueSerializer.java    |   5 +
 .../common/typeutils/base/StringSerializer.java |   5 +
 .../typeutils/base/StringValueSerializer.java   |   5 +
 .../typeutils/base/TypeSerializerSingleton.java |  10 +-
 .../common/typeutils/base/VoidSerializer.java   |   5 +
 .../array/BooleanPrimitiveArraySerializer.java  |   5 +
 .../array/BytePrimitiveArraySerializer.java     |   5 +
 .../array/CharPrimitiveArraySerializer.java     |   5 +
 .../array/DoublePrimitiveArraySerializer.java   |   5 +
 .../array/FloatPrimitiveArraySerializer.java    |   5 +
 .../base/array/IntPrimitiveArraySerializer.java |   5 +
 .../array/LongPrimitiveArraySerializer.java     |   5 +
 .../array/ShortPrimitiveArraySerializer.java    |   5 +
 .../base/array/StringArraySerializer.java       |   5 +
 .../typeutils/record/RecordSerializer.java      |  22 ++-
 .../flink/types/BasicArrayTypeInfoTest.java     |  56 ++++++
 .../apache/flink/types/BasicTypeInfoTest.java   |  58 ++++++
 .../apache/flink/types/NothingTypeInfoTest.java |  47 +++++
 .../flink/types/PrimitiveArrayTypeInfoTest.java |  56 ++++++
 .../org/apache/flink/api/java/io/CsvReader.java |   2 +-
 .../flink/api/java/sca/UdfAnalyzerUtils.java    |   2 +-
 .../flink/api/java/typeutils/AvroTypeInfo.java  |   4 +-
 .../flink/api/java/typeutils/EnumTypeInfo.java  |  22 ++-
 .../api/java/typeutils/GenericTypeInfo.java     |  17 +-
 .../api/java/typeutils/MissingTypeInfo.java     |  32 +++-
 .../api/java/typeutils/ObjectArrayTypeInfo.java | 114 ++++-------
 .../flink/api/java/typeutils/PojoField.java     |  42 +++-
 .../flink/api/java/typeutils/PojoTypeInfo.java  | 190 +++++++++++--------
 .../api/java/typeutils/RecordTypeInfo.java      |  16 +-
 .../flink/api/java/typeutils/TupleTypeInfo.java | 115 ++++++-----
 .../api/java/typeutils/TupleTypeInfoBase.java   | 116 +++++------
 .../flink/api/java/typeutils/TypeExtractor.java |  41 +++-
 .../flink/api/java/typeutils/ValueTypeInfo.java |  32 ++--
 .../api/java/typeutils/WritableTypeInfo.java    |  31 ++-
 .../java/typeutils/runtime/AvroSerializer.java  |  26 ++-
 .../runtime/CopyableValueSerializer.java        |  19 +-
 .../java/typeutils/runtime/PojoSerializer.java  |  59 +++---
 .../typeutils/runtime/Tuple0Serializer.java     |  16 +-
 .../typeutils/runtime/TupleSerializerBase.java  |  34 ++--
 .../java/typeutils/runtime/ValueSerializer.java |  19 +-
 .../typeutils/runtime/WritableSerializer.java   |  12 +-
 .../typeutils/runtime/kryo/KryoSerializer.java  |  30 ++-
 .../api/java/io/CollectionInputFormatTest.java  |  26 ++-
 .../type/extractor/PojoTypeExtractionTest.java  | 108 +++++------
 .../java/type/extractor/TypeExtractorTest.java  |   4 +-
 .../api/java/typeutils/EnumTypeInfoTest.java    |  51 +++++
 .../api/java/typeutils/GenericTypeInfoTest.java |  47 +++++
 .../api/java/typeutils/MissingTypeInfoTest.java |  47 +++++
 .../java/typeutils/ObjectArrayTypeInfoTest.java |  58 ++++++
 .../api/java/typeutils/PojoTypeInfoTest.java    |  52 ++++-
 .../api/java/typeutils/RecordTypeInfoTest.java  |  44 +++++
 .../api/java/typeutils/TupleTypeInfoTest.java   |  96 ++++++++++
 .../api/java/typeutils/TypeInfoParserTest.java  |  24 +--
 .../api/java/typeutils/ValueTypeInfoTest.java   |  87 +++++++++
 .../java/typeutils/WritableTypeInfoTest.java    |  74 ++++++++
 .../testutils/types/IntListSerializer.java      |  21 ++
 .../testutils/types/IntPairSerializer.java      |  21 ++
 .../testutils/types/StringPairSerializer.java   |  21 ++
 .../flink/api/scala/ExecutionEnvironment.scala  |   2 +-
 .../api/scala/codegen/TypeInformationGen.scala  |   2 +-
 .../scala/typeutils/CaseClassSerializer.scala   |  15 +-
 .../api/scala/typeutils/CaseClassTypeInfo.scala | 186 ++++++++++--------
 .../api/scala/typeutils/EitherSerializer.scala  |  19 +-
 .../api/scala/typeutils/EitherTypeInfo.scala    |  25 ++-
 .../scala/typeutils/EnumValueSerializer.scala   |  13 +-
 .../api/scala/typeutils/EnumValueTypeInfo.scala |  20 +-
 .../api/scala/typeutils/NothingSerializer.scala |  13 +-
 .../api/scala/typeutils/OptionSerializer.scala  |  17 +-
 .../api/scala/typeutils/OptionTypeInfo.scala    |  18 +-
 .../scala/typeutils/TraversableSerializer.scala |  17 +-
 .../scala/typeutils/TraversableTypeInfo.scala   |  24 ++-
 .../api/scala/typeutils/TrySerializer.scala     |  21 +-
 .../flink/api/scala/typeutils/TryTypeInfo.scala |  19 +-
 .../scala/typeutils/CaseClassTypeInfoTest.scala |  73 +++++++
 .../scala/typeutils/EitherTypeInfoTest.scala    |  59 ++++++
 .../scala/typeutils/EnumValueTypeInfoTest.scala |  64 +++++++
 .../scala/typeutils/OptionTypeInfoTest.scala    |  54 ++++++
 .../typeutils/TraversableTypeInfoTest.scala     |  68 +++++++
 .../api/scala/typeutils/TryTypeInfoTest.scala   |  55 ++++++
 .../api/windowing/StreamWindowSerializer.java   |  24 +++
 .../api/windowing/StreamWindowTypeInfo.java     |  38 +++-
 .../MultiplexingStreamRecordSerializer.java     |  21 ++
 .../streamrecord/StreamRecordSerializer.java    |  21 ++
 .../api/windowing/StreamWindowTypeInfoTest.java |  51 +++++
 .../table/typeinfo/RenamingProxyTypeInfo.scala  |  43 +++--
 .../api/table/typeinfo/RowSerializer.scala      |  14 +-
 .../api/java/table/test/PojoGroupingITCase.java |  89 +++++++++
 .../typeinfo/RenamingProxyTypeInfoTest.scala    |  75 ++++++++
 .../flink/api/scala/operators/UnionITCase.scala |  54 +++---
 .../scala/types/TypeInformationGenTest.scala    |   2 +-
 119 files changed, 3120 insertions(+), 730 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 4ae0b8d..3233327 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * A config to define the behavior of the program execution. It allows to define (among other
@@ -605,6 +606,61 @@ public class ExecutionConfig implements Serializable {
 		this.autoTypeRegistrationEnabled = false;
 	}
 
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof ExecutionConfig) {
+			ExecutionConfig other = (ExecutionConfig) obj;
+
+			return other.canEqual(this) &&
+				Objects.equals(executionMode, other.executionMode) &&
+				useClosureCleaner == other.useClosureCleaner &&
+				parallelism == other.parallelism &&
+				numberOfExecutionRetries == other.numberOfExecutionRetries &&
+				forceKryo == other.forceKryo &&
+				objectReuse == other.objectReuse &&
+				autoTypeRegistrationEnabled == other.autoTypeRegistrationEnabled &&
+				forceAvro == other.forceAvro &&
+				Objects.equals(codeAnalysisMode, other.codeAnalysisMode) &&
+				printProgressDuringExecution == other.printProgressDuringExecution &&
+				Objects.equals(globalJobParameters, other.globalJobParameters) &&
+				autoWatermarkInterval == other.autoWatermarkInterval &&
+				timestampsEnabled == other.timestampsEnabled &&
+				registeredTypesWithKryoSerializerClasses.equals(other.registeredTypesWithKryoSerializerClasses) &&
+				defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses) &&
+				registeredKryoTypes.equals(other.registeredKryoTypes) &&
+				registeredPojoTypes.equals(other.registeredPojoTypes);
+
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(
+			executionMode,
+			useClosureCleaner,
+			parallelism,
+			numberOfExecutionRetries,
+			forceKryo,
+			objectReuse,
+			autoTypeRegistrationEnabled,
+			forceAvro,
+			codeAnalysisMode,
+			printProgressDuringExecution,
+			globalJobParameters,
+			autoWatermarkInterval,
+			timestampsEnabled,
+			registeredTypesWithKryoSerializerClasses,
+			defaultKryoSerializerClasses,
+			registeredKryoTypes,
+			registeredPojoTypes);
+	}
+
+	public boolean canEqual(Object obj) {
+		return obj instanceof ExecutionConfig;
+	}
+
 
 	// ------------------------------ Utilities  ----------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
index 4fcb266..c72e8ed 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
@@ -20,14 +20,16 @@ package org.apache.flink.api.common.typeinfo;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
 
-public class BasicArrayTypeInfo<T, C> extends TypeInformation<T> {
+public final class BasicArrayTypeInfo<T, C> extends TypeInformation<T> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -45,14 +47,12 @@ public class BasicArrayTypeInfo<T, C> extends TypeInformation<T> {
 	// --------------------------------------------------------------------------------------------
 
 	private final Class<T> arrayClass;
-	private final Class<C> componentClass;
 	private final TypeInformation<C> componentInfo;
 
 	@SuppressWarnings("unchecked")
 	private BasicArrayTypeInfo(Class<T> arrayClass, BasicTypeInfo<C> componentInfo) {
-		this.arrayClass = arrayClass;
-		this.componentClass = (Class<C>) arrayClass.getComponentType();
-		this.componentInfo = componentInfo;
+		this.arrayClass = Preconditions.checkNotNull(arrayClass);
+		this.componentInfo = Preconditions.checkNotNull(componentInfo);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -83,7 +83,7 @@ public class BasicArrayTypeInfo<T, C> extends TypeInformation<T> {
 	}
 
 	public Class<C> getComponentTypeClass() {
-		return this.componentClass;
+		return this.componentInfo.getTypeClass();
 	}
 	
 	public TypeInformation<C> getComponentInfo() {
@@ -99,13 +99,39 @@ public class BasicArrayTypeInfo<T, C> extends TypeInformation<T> {
 	@SuppressWarnings("unchecked")
 	public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
 		// special case the string array
-		if (componentClass.equals(String.class)) {
+		if (componentInfo.getTypeClass().equals(String.class)) {
 			return (TypeSerializer<T>) StringArraySerializer.INSTANCE;
 		} else {
-			return (TypeSerializer<T>) new GenericArraySerializer<C>(this.componentClass, this.componentInfo.createSerializer(executionConfig));
+			return (TypeSerializer<T>) new GenericArraySerializer<C>(
+				this.componentInfo.getTypeClass(),
+				this.componentInfo.createSerializer(executionConfig));
 		}
 	}
-	
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof BasicArrayTypeInfo) {
+			BasicArrayTypeInfo<?, ?> other = (BasicArrayTypeInfo<?, ?>) obj;
+
+			return other.canEqual(this) &&
+				arrayClass == other.arrayClass &&
+				componentInfo.equals(other.componentInfo);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(arrayClass, componentInfo);
+	}
+
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof BasicArrayTypeInfo;
+	}
+
 	@Override
 	public String toString() {
 		return this.getClass().getSimpleName()+"<"+this.componentInfo+">";

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index c622151..3e34644 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -19,10 +19,13 @@
 package org.apache.flink.api.common.typeinfo;
 
 import java.lang.reflect.Constructor;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -55,6 +58,8 @@ import org.apache.flink.api.common.typeutils.base.VoidSerializer;
  */
 public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
 
+	private static final long serialVersionUID = -430955220409131770L;
+
 	public static final BasicTypeInfo<String> STRING_TYPE_INFO = new BasicTypeInfo<String>(String.class, new Class<?>[]{}, StringSerializer.INSTANCE, StringComparator.class);
 	public static final BasicTypeInfo<Boolean> BOOLEAN_TYPE_INFO = new BasicTypeInfo<Boolean>(Boolean.class, new Class<?>[]{}, BooleanSerializer.INSTANCE, BooleanComparator.class);
 	public static final BasicTypeInfo<Byte> BYTE_TYPE_INFO = new IntegerTypeInfo<Byte>(Byte.class, new Class<?>[]{Short.class, Integer.class, Long.class, Float.class, Double.class, Character.class}, ByteSerializer.INSTANCE, ByteComparator.class);
@@ -66,7 +71,7 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
 	public static final BasicTypeInfo<Character> CHAR_TYPE_INFO = new BasicTypeInfo<Character>(Character.class, new Class<?>[]{}, CharSerializer.INSTANCE, CharComparator.class);
 	public static final BasicTypeInfo<Date> DATE_TYPE_INFO = new BasicTypeInfo<Date>(Date.class, new Class<?>[]{}, DateSerializer.INSTANCE, DateComparator.class);
 	public static final BasicTypeInfo<Void> VOID_TYPE_INFO = new BasicTypeInfo<Void>(Void.class, new Class<?>[]{}, VoidSerializer.INSTANCE, null);
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	private final Class<T> clazz;
@@ -79,9 +84,10 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
 	
 	
 	protected BasicTypeInfo(Class<T> clazz, Class<?>[] possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends TypeComparator<T>> comparatorClass) {
-		this.clazz = clazz;
-		this.possibleCastTargetTypes = possibleCastTargetTypes;
-		this.serializer = serializer;
+		this.clazz = Preconditions.checkNotNull(clazz);
+		this.possibleCastTargetTypes = Preconditions.checkNotNull(possibleCastTargetTypes);
+		this.serializer = Preconditions.checkNotNull(serializer);
+		// comparator can be null as in VOID_TYPE_INFO
 		this.comparatorClass = comparatorClass;
 	}
 	
@@ -148,15 +154,24 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
 	
 	@Override
 	public int hashCode() {
-		return this.clazz.hashCode();
+		return (31 * Objects.hash(clazz, serializer, comparatorClass)) + Arrays.hashCode(possibleCastTargetTypes);
 	}
-	
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof BasicTypeInfo;
+	}
+
 	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof BasicTypeInfo) {
 			@SuppressWarnings("unchecked")
 			BasicTypeInfo<T> other = (BasicTypeInfo<T>) obj;
-			return this.clazz.equals(other.clazz);
+
+			return other.canEqual(this) &&
+				this.clazz == other.clazz &&
+				serializer.equals(other.serializer) &&
+				this.comparatorClass == other.comparatorClass;
 		} else {
 			return false;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
index 7e5e95d..f357e34 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/FractionalTypeInfo.java
@@ -18,15 +18,29 @@
 
 package org.apache.flink.api.common.typeinfo;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
+import java.util.Set;
+
 /**
- * Type information for numeric primitive types (int, long, double, byte, ...).
+ * Type information for numeric fractional primitive types (double, float).
  */
 public class FractionalTypeInfo<T> extends NumericTypeInfo<T> {
 
+	private static final long serialVersionUID = 554334260950199994L;
+
+	private static final Set<Class<?>> fractionalTypes = Sets.<Class<?>>newHashSet(
+			Double.class,
+			Float.class
+	);
+
 	protected FractionalTypeInfo(Class<T> clazz, Class<?>[] possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends TypeComparator<T>> comparatorClass) {
 		super(clazz, possibleCastTargetTypes, serializer, comparatorClass);
+
+		Preconditions.checkArgument(fractionalTypes.contains(clazz), "The given class " +
+			clazz.getSimpleName() + " is not a fractional type.");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
index 5a7e304..7f9dd92 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/IntegerTypeInfo.java
@@ -18,15 +18,32 @@
 
 package org.apache.flink.api.common.typeinfo;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
+import java.util.Set;
+
 /**
- * Type information for numeric primitive types (int, long, double, byte, ...).
+ * Type information for numeric integer primitive types: int, long, byte, short, character.
  */
 public class IntegerTypeInfo<T> extends NumericTypeInfo<T> {
 
+	private static final long serialVersionUID = -8068827354966766955L;
+
+	private static final Set<Class<?>> integerTypes = Sets.<Class<?>>newHashSet(
+			Integer.class,
+			Long.class,
+			Byte.class,
+			Short.class,
+			Character.class
+	);
+
 	protected IntegerTypeInfo(Class<T> clazz, Class<?>[] possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends TypeComparator<T>> comparatorClass) {
 		super(clazz, possibleCastTargetTypes, serializer, comparatorClass);
+
+		Preconditions.checkArgument(integerTypes.contains(clazz), "The given class " +
+		clazz.getSimpleName() + " is not a integer type.");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
index 184bdff..a32c898 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
@@ -63,4 +63,30 @@ public class NothingTypeInfo extends TypeInformation<Nothing> {
 	public TypeSerializer<Nothing> createSerializer(ExecutionConfig executionConfig) {
 		throw new RuntimeException("The Nothing type cannot have a serializer.");
 	}
+
+	@Override
+	public String toString() {
+		return getClass().getSimpleName();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof NothingTypeInfo) {
+			NothingTypeInfo nothingTypeInfo = (NothingTypeInfo) obj;
+
+			return nothingTypeInfo.canEqual(this);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return NothingTypeInfo.class.hashCode();
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof NothingTypeInfo;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
index 0f598f4..830e297 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NumericTypeInfo.java
@@ -18,16 +18,35 @@
 
 package org.apache.flink.api.common.typeinfo;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
+import java.util.Set;
+
 /**
- * Type information for numeric primitive types (int, long, double, byte, ...).
+ * Type information for numeric primitive types: int, long, double, byte, short, float, char.
  */
-public class NumericTypeInfo<T> extends BasicTypeInfo<T> {
+public abstract class NumericTypeInfo<T> extends BasicTypeInfo<T> {
+
+	private static final long serialVersionUID = -5937777910658986986L;
+
+	private static final Set<Class<?>> numericalTypes = Sets.<Class<?>>newHashSet(
+			Integer.class,
+			Long.class,
+			Double.class,
+			Byte.class,
+			Short.class,
+			Float.class,
+			Character.class
+	);
 
 	protected NumericTypeInfo(Class<T> clazz, Class<?>[] possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends
 			TypeComparator<T>> comparatorClass) {
 		super(clazz, possibleCastTargetTypes, serializer, comparatorClass);
+
+		Preconditions.checkArgument(numericalTypes.contains(clazz), "The given class " +
+				clazz.getSimpleName() + " is not a numerical type.");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
index 3843f28..44339ac 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
@@ -20,7 +20,9 @@ package org.apache.flink.api.common.typeinfo;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -69,23 +71,22 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> implements Ato
 	private final TypeSerializer<T> serializer;
 
 	/** The class of the comparator for the array */
-	private Class<? extends PrimitiveArrayComparator<T, ?>> comparatorClass;
+	private final Class<? extends PrimitiveArrayComparator<T, ?>> comparatorClass;
 
 	/**
 	 * Creates a new type info for a 
 	 * @param arrayClass The class of the array (such as int[].class)
 	 * @param serializer The serializer for the array.
+	 * @param comparatorClass The class of the array comparator
 	 */
 	private PrimitiveArrayTypeInfo(Class<T> arrayClass, TypeSerializer<T> serializer, Class<? extends PrimitiveArrayComparator<T, ?>> comparatorClass) {
-		if (arrayClass == null || serializer == null) {
-			throw new NullPointerException();
-		}
-		if (!(arrayClass.isArray() && arrayClass.getComponentType().isPrimitive())) {
-			throw new IllegalArgumentException("Class must represent an array of primitives.");
-		}
-		this.arrayClass = arrayClass;
-		this.serializer = serializer;
-		this.comparatorClass = comparatorClass;
+		this.arrayClass = Preconditions.checkNotNull(arrayClass);
+		this.serializer = Preconditions.checkNotNull(serializer);
+		this.comparatorClass = Preconditions.checkNotNull(comparatorClass);
+
+		Preconditions.checkArgument(
+			arrayClass.isArray() && arrayClass.getComponentType().isPrimitive(),
+			"Class must represent an array of primitives");
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -133,12 +134,28 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> implements Ato
 	@Override
 	public boolean equals(Object other) {
 		if (other instanceof PrimitiveArrayTypeInfo) {
-			PrimitiveArrayTypeInfo otherArray = (PrimitiveArrayTypeInfo) other;
-			return otherArray.arrayClass == arrayClass;
+			@SuppressWarnings("unchecked")
+			PrimitiveArrayTypeInfo<T> otherArray = (PrimitiveArrayTypeInfo<T>) other;
+
+			return otherArray.canEqual(this) &&
+				arrayClass == otherArray.arrayClass &&
+				serializer.equals(otherArray.serializer) &&
+				comparatorClass == otherArray.comparatorClass;
+		} else {
+			return false;
 		}
-		return false;
 	}
-	
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(arrayClass, serializer, comparatorClass);
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof PrimitiveArrayTypeInfo;
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
index b090e67..309c968 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
@@ -146,4 +146,21 @@ public abstract class TypeInformation<T> implements Serializable {
 	 * @return A serializer for this type.
 	 */
 	public abstract TypeSerializer<T> createSerializer(ExecutionConfig config);
+
+	@Override
+	public abstract String toString();
+
+	@Override
+	public abstract boolean equals(Object obj);
+
+	@Override
+	public abstract int hashCode();
+
+	/**
+	 * Returns true if the given object can be equaled with this object. If not, it returns false.
+	 *
+	 * @param obj Object which wants to take part in the equality relation
+	 * @return true if obj can be equaled with this, otherwise false
+	 */
+	public abstract boolean canEqual(Object obj);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
index de39ec8..909afdd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
@@ -20,14 +20,16 @@ package org.apache.flink.api.common.typeutils;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 
 /**
- * Type Information for Tuple and Pojo types
+ * Base type information class for Tuple and Pojo types
  * 
  * The class is taking care of serialization and comparators for Tuples as well.
  */
@@ -35,10 +37,19 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
 	
 	private static final long serialVersionUID = 1L;
 	
-	protected final Class<T> typeClass;
+	private final Class<T> typeClass;
 	
 	public CompositeType(Class<T> typeClass) {
-		this.typeClass = typeClass;
+		this.typeClass = Preconditions.checkNotNull(typeClass);
+	}
+
+	/**
+	 * Returns the type class of the composite type
+	 *
+	 * @return Type class of the composite type
+	 */
+	public Class<T> getTypeClass() {
+		return typeClass;
 	}
 	
 	/**
@@ -78,24 +89,8 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
 	 * @return The type of the field at the given position.
 	 */
 	public abstract <X> TypeInformation<X> getTypeAt(int pos);
-	
-	/**
-	 * Initializes the internal state inside a Composite type to create a new comparator 
-	 * (such as the lists / arrays for the fields and field comparators)
-	 * @param localKeyCount 
-	 */
-	protected abstract void initializeNewComparator(int localKeyCount);
-	
-	/**
-	 * Add a field for comparison in this type.
-	 */
-	protected abstract void addCompareField(int fieldId, TypeComparator<?> comparator);
-	
-	/**
-	 * Get the actual comparator we've initialized.
-	 */
-	protected abstract TypeComparator<T> getNewComparator(ExecutionConfig config);
-	
+
+	protected abstract TypeComparatorBuilder<T> createTypeComparatorBuilder();
 	
 	/**
 	 * Generic implementation of the comparator creation. Composite types are supplying the infrastructure
@@ -103,38 +98,74 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
 	 * @return The comparator
 	 */
 	public TypeComparator<T> createComparator(int[] logicalKeyFields, boolean[] orders, int logicalFieldOffset, ExecutionConfig config) {
-		initializeNewComparator(logicalKeyFields.length);
-		
-		for(int logicalKeyFieldIndex = 0; logicalKeyFieldIndex < logicalKeyFields.length; logicalKeyFieldIndex++) {
+
+		TypeComparatorBuilder<T> builder = createTypeComparatorBuilder();
+
+		builder.initializeTypeComparatorBuilder(logicalKeyFields.length);
+
+		for (int logicalKeyFieldIndex = 0; logicalKeyFieldIndex < logicalKeyFields.length; logicalKeyFieldIndex++) {
 			int logicalKeyField = logicalKeyFields[logicalKeyFieldIndex];
 			int logicalField = logicalFieldOffset; // this is the global/logical field number
-			for(int localFieldId = 0; localFieldId < this.getArity(); localFieldId++) {
+			boolean comparatorAdded = false;
+
+			for (int localFieldId = 0; localFieldId < this.getArity() && logicalField <= logicalKeyField && !comparatorAdded; localFieldId++) {
 				TypeInformation<?> localFieldType = this.getTypeAt(localFieldId);
 				
-				if(localFieldType instanceof AtomicType && logicalField == logicalKeyField) {
+				if (localFieldType instanceof AtomicType && logicalField == logicalKeyField) {
 					// we found an atomic key --> create comparator
-					addCompareField(localFieldId, ((AtomicType<?>) localFieldType).createComparator(orders[logicalKeyFieldIndex], config) );
-				} else if(localFieldType instanceof CompositeType  && // must be a composite type
-						( logicalField <= logicalKeyField //check if keyField can be at or behind the current logicalField
-						&& logicalKeyField <= logicalField + (localFieldType.getTotalFields() - 1) ) // check if logical field + lookahead could contain our key
-						) {
+					builder.addComparatorField(
+						localFieldId,
+						((AtomicType<?>) localFieldType).createComparator(
+							orders[logicalKeyFieldIndex],
+							config));
+
+					comparatorAdded = true;
+				}
+				// must be composite type and check that the logicalKeyField is within the bounds
+				// of the composite type's logical fields
+				else if (localFieldType instanceof CompositeType &&
+					logicalField <= logicalKeyField &&
+					logicalKeyField <= logicalField + (localFieldType.getTotalFields() - 1)) {
 					// we found a compositeType that is containing the logicalKeyField we are looking for --> create comparator
-					addCompareField(localFieldId, ((CompositeType<?>) localFieldType).createComparator(new int[] {logicalKeyField}, new boolean[] {orders[logicalKeyFieldIndex]}, logicalField, config));
+					builder.addComparatorField(
+						localFieldId,
+						((CompositeType<?>) localFieldType).createComparator(
+							new int[]{logicalKeyField},
+							new boolean[]{orders[logicalKeyFieldIndex]},
+							logicalField,
+							config)
+					);
+
+					comparatorAdded = true;
 				}
-				
-				// maintain logicalField
-				if(localFieldType instanceof CompositeType) {
+
+				if (localFieldType instanceof CompositeType) {
 					// we need to subtract 1 because we are not accounting for the local field (not accessible for the user)
 					logicalField += localFieldType.getTotalFields() - 1;
 				}
+				
 				logicalField++;
 			}
+
+			if (!comparatorAdded) {
+				throw new IllegalArgumentException("Could not add a comparator for the logical" +
+					"key field index " + logicalKeyFieldIndex + ".");
+			}
 		}
-		return getNewComparator(config);
+
+		return builder.createTypeComparator(config);
 	}
 
 	// --------------------------------------------------------------------------------------------
 
+	protected interface TypeComparatorBuilder<T> {
+		void initializeTypeComparatorBuilder(int size);
+
+		void addComparatorField(int fieldId, TypeComparator<?> comparator);
+
+		TypeComparator<T> createTypeComparator(ExecutionConfig config);
+	}
+
 	public static class FlatFieldDescriptor {
 		private int keyPosition;
 		private TypeInformation<?> type;
@@ -222,4 +253,31 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
 			super(s);
 		}
 	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof CompositeType) {
+			@SuppressWarnings("unchecked")
+			CompositeType<T> compositeType = (CompositeType<T>)obj;
+
+			return compositeType.canEqual(this) && typeClass == compositeType.typeClass;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(typeClass);
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof CompositeType;
+	}
+
+	@Override
+	public String toString() {
+		return getClass().getSimpleName() + "<" + typeClass.getSimpleName() + ">";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 45b0669..6d603d3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -146,4 +146,16 @@ public abstract class TypeSerializer<T> implements Serializable {
 	 * @throws IOException Thrown if any of the two views raises an exception.
 	 */
 	public abstract void copy(DataInputView source, DataOutputView target) throws IOException;
+
+	public abstract boolean equals(Object obj);
+
+	/**
+	 * Returns true if the given object can be equaled with this object. If not, it returns false.
+	 *
+	 * @param obj Object which wants to take part in the equality relation
+	 * @return true if obj can be equaled with this, otherwise false
+	 */
+	public abstract boolean canEqual(Object obj);
+
+	public abstract int hashCode();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
index a844ac8..5fe586b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
@@ -75,4 +75,9 @@ public final class BooleanSerializer extends TypeSerializerSingleton<Boolean> {
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.writeBoolean(source.readBoolean());
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof BooleanSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
index 3aae95d..a3d2c39 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
@@ -80,4 +80,9 @@ public final class BooleanValueSerializer extends TypeSerializerSingleton<Boolea
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.writeBoolean(source.readBoolean());
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof BooleanValueSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
index 92b3685..abc816c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
@@ -77,4 +77,9 @@ public final class ByteSerializer extends TypeSerializerSingleton<Byte> {
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.writeByte(source.readByte());
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof ByteSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
index e523d5e..ff44c27 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
@@ -78,4 +78,9 @@ public final class ByteValueSerializer extends TypeSerializerSingleton<ByteValue
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.writeByte(source.readByte());
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof ByteValueSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
index 181db56..2734af7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
@@ -77,4 +77,9 @@ public final class CharSerializer extends TypeSerializerSingleton<Character> {
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.writeChar(source.readChar());
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof CharSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
index 690509c..e14a861 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
@@ -77,4 +77,9 @@ public class CharValueSerializer extends TypeSerializerSingleton<CharValue> {
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.writeChar(source.readChar());
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof CharValueSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
index d427918..6703240 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
@@ -97,4 +97,9 @@ public final class DateSerializer extends TypeSerializerSingleton<Date> {
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.writeLong(source.readLong());
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof DateSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
index 24af95c..f982399 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
@@ -76,4 +76,9 @@ public final class DoubleSerializer extends TypeSerializerSingleton<Double> {
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.writeDouble(source.readDouble());
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof DoubleSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
index 34434f1..747d969 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
@@ -78,4 +78,9 @@ public final class DoubleValueSerializer extends TypeSerializerSingleton<DoubleV
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.writeDouble(source.readDouble());
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof DoubleValueSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
index 643e4fa..1c09e71 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.lang.reflect.Method;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -36,7 +37,7 @@ public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T> {
 	private final Class<T> enumClass;
 
 	public EnumSerializer(Class<T> enumClass) {
-		this.enumClass = enumClass;
+		this.enumClass = Preconditions.checkNotNull(enumClass);
 		this.values = createValues(enumClass);
 	}
 
@@ -94,12 +95,23 @@ public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T> {
 	public boolean equals(Object obj) {
 		if(obj instanceof EnumSerializer) {
 			EnumSerializer<?> other = (EnumSerializer<?>) obj;
-			return other.enumClass == this.enumClass;
+
+			return other.canEqual(this) && other.enumClass == this.enumClass;
 		} else {
 			return false;
 		}
 	}
 
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof EnumSerializer;
+	}
+
+	@Override
+	public int hashCode() {
+		return enumClass.hashCode();
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
index c823783..1cc574c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
@@ -76,4 +76,9 @@ public final class FloatSerializer extends TypeSerializerSingleton<Float> {
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.writeFloat(source.readFloat());
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof FloatSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
index 15d00b5..11b17c8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
@@ -78,4 +78,9 @@ public class FloatValueSerializer extends TypeSerializerSingleton<FloatValue> {
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.writeFloat(source.readFloat());
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof FloatValueSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index 6687661..9ea6811 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.typeutils.base;
 import java.io.IOException;
 import java.lang.reflect.Array;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -38,17 +39,12 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 	
 	private final TypeSerializer<C> componentSerializer;
 	
-	private final C[] EMPTY;
+	private transient C[] EMPTY;
 	
 	
 	public GenericArraySerializer(Class<C> componentClass, TypeSerializer<C> componentSerializer) {
-		if (componentClass == null || componentSerializer == null) {
-			throw new NullPointerException();
-		}
-		
-		this.componentClass = componentClass;
-		this.componentSerializer = componentSerializer;
-		this.EMPTY = create(0);
+		this.componentClass = Preconditions.checkNotNull(componentClass);
+		this.componentSerializer = Preconditions.checkNotNull(componentSerializer);
 	}
 
 	@Override
@@ -70,6 +66,10 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 	
 	@Override
 	public C[] createInstance() {
+		if (EMPTY == null) {
+			EMPTY = create(0);
+		}
+
 		return EMPTY;
 	}
 
@@ -158,20 +158,27 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 	
 	@Override
 	public int hashCode() {
-		return componentClass.hashCode() + componentSerializer.hashCode();
+		return 31 * componentClass.hashCode() + componentSerializer.hashCode();
 	}
 	
 	@Override
 	public boolean equals(Object obj) {
-		if (obj != null && obj instanceof GenericArraySerializer) {
-			GenericArraySerializer<?> other = (GenericArraySerializer<?>) obj;
-			return this.componentClass == other.componentClass &&
-					this.componentSerializer.equals(other.componentSerializer);
+		if (obj instanceof GenericArraySerializer) {
+			GenericArraySerializer<?> other = (GenericArraySerializer<?>)obj;
+
+			return other.canEqual(this) &&
+				componentClass == other.componentClass &&
+				componentSerializer.equals(other.componentSerializer);
 		} else {
 			return false;
 		}
 	}
-	
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof GenericArraySerializer;
+	}
+
 	@Override
 	public String toString() {
 		return "Serializer " + componentClass.getName() + "[]";

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
index 778f044..e9d9059 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
@@ -77,4 +77,9 @@ public final class IntSerializer extends TypeSerializerSingleton<Integer> {
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.writeInt(source.readInt());
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof IntSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
index c2d1b60..a3a5cbb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
@@ -78,4 +78,9 @@ public final class IntValueSerializer extends TypeSerializerSingleton<IntValue>
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.writeInt(source.readInt());
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof IntValueSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
index 6d8b758..2954477 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
@@ -77,4 +77,9 @@ public final class LongSerializer extends TypeSerializerSingleton<Long> {
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.writeLong(source.readLong());
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof LongSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
index 37dec40..6233240 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
@@ -78,4 +78,9 @@ public final class LongValueSerializer extends TypeSerializerSingleton<LongValue
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.writeLong(source.readLong());
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof LongValueSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
index 44e5e3e..45004fa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
@@ -77,4 +77,9 @@ public final class ShortSerializer extends TypeSerializerSingleton<Short> {
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.writeShort(source.readShort());
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof ShortSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
index 1dbe4a5..338d328 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
@@ -78,4 +78,9 @@ public final class ShortValueSerializer extends TypeSerializerSingleton<ShortVal
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.writeShort(source.readShort());
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof ShortValueSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
index 7b26600..b5332f8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
@@ -77,4 +77,9 @@ public final class StringSerializer extends TypeSerializerSingleton<String> {
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		StringValue.copyString(source, target);
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof StringSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
index 7628cab..6ac8b6c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
@@ -103,4 +103,9 @@ public final class StringValueSerializer extends TypeSerializerSingleton<StringV
 			}
 		}
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof StringValueSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
index e076e5b..68842d6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
@@ -33,11 +33,17 @@ public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{
 
 	@Override
 	public int hashCode() {
-		return super.hashCode();
+		return TypeSerializerSingleton.class.hashCode();
 	}
 	
 	@Override
 	public boolean equals(Object obj) {
-		return obj != null && obj.getClass() == this.getClass();
+		if (obj instanceof TypeSerializerSingleton) {
+			TypeSerializerSingleton<?> other = (TypeSerializerSingleton<?>) obj;
+
+			return other.canEqual(this);
+		} else {
+			return false;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
index 272ffbd..ee1ebbd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
@@ -77,4 +77,9 @@ public final class VoidSerializer extends TypeSerializerSingleton<Void> {
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.write(source.readByte());
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof VoidSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
index 4a493ac..6386fc1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
@@ -101,4 +101,9 @@ public final class BooleanPrimitiveArraySerializer extends TypeSerializerSinglet
 		target.writeInt(len);
 		target.write(source, len);
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof BooleanPrimitiveArraySerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
index fb4d506..ea4c43e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
@@ -93,4 +93,9 @@ public final class BytePrimitiveArraySerializer extends TypeSerializerSingleton<
 		target.writeInt(len);
 		target.write(source, len);
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof BytePrimitiveArraySerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
index 8e3c4ea..e2f7515 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
@@ -100,4 +100,9 @@ public final class CharPrimitiveArraySerializer extends TypeSerializerSingleton<
 		target.writeInt(len);
 		target.write(source, len * 2);
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof CharPrimitiveArraySerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
index 10e25c2..2500312 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
@@ -100,4 +100,9 @@ public final class DoublePrimitiveArraySerializer extends TypeSerializerSingleto
 		target.writeInt(len);
 		target.write(source, len * 8);
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof DoublePrimitiveArraySerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
index d57af00..ea2513e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
@@ -100,4 +100,9 @@ public final class FloatPrimitiveArraySerializer extends TypeSerializerSingleton
 		target.writeInt(len);
 		target.write(source, len * 4);
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof FloatPrimitiveArraySerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
index eaff287..05d70c5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
@@ -100,4 +100,9 @@ public class IntPrimitiveArraySerializer extends TypeSerializerSingleton<int[]>{
 		target.writeInt(len);
 		target.write(source, len * 4);
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof IntPrimitiveArraySerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
index 55a22c2..e6f6cc5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
@@ -100,4 +100,9 @@ public final class LongPrimitiveArraySerializer extends TypeSerializerSingleton<
 		target.writeInt(len);
 		target.write(source, len * 8);
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof LongPrimitiveArraySerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
index 08275b0..e2261e8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
@@ -100,4 +100,9 @@ public final class ShortPrimitiveArraySerializer extends TypeSerializerSingleton
 		target.writeInt(len);
 		target.write(source, len * 2);
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof ShortPrimitiveArraySerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
index ad172a8..228429d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
@@ -104,4 +104,9 @@ public final class StringArraySerializer extends TypeSerializerSingleton<String[
 			StringValue.copyString(source, target);
 		}
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof StringArraySerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
index 11b21d6..6ffa0df 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
@@ -40,7 +40,7 @@ public final class RecordSerializer extends TypeSerializer<Record> {
 	
 	// --------------------------------------------------------------------------------------------
 
-	public static final RecordSerializer get() {
+	public static RecordSerializer get() {
 		return INSTANCE;
 	}
 	
@@ -121,4 +121,24 @@ public final class RecordSerializer extends TypeSerializer<Record> {
 		
 		target.write(source, val);
 	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof RecordSerializer) {
+			RecordSerializer other = (RecordSerializer) obj;
+			return other.canEqual(this);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof RecordSerializer;
+	}
+
+	@Override
+	public int hashCode() {
+		return RecordSerializer.class.hashCode();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/test/java/org/apache/flink/types/BasicArrayTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/BasicArrayTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/types/BasicArrayTypeInfoTest.java
new file mode 100644
index 0000000..3e086ff
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/BasicArrayTypeInfoTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class BasicArrayTypeInfoTest extends TestLogger {
+
+	static Class<?>[] classes = {String[].class, Integer[].class, Boolean[].class, Byte[].class,
+		Short[].class, Long[].class, Float[].class, Double[].class, Character[].class};
+
+	@Test
+	public void testBasicArrayTypeInfoEquality() {
+		for (Class<?> clazz: classes) {
+			BasicArrayTypeInfo<?, ?> tpeInfo1 = BasicArrayTypeInfo.getInfoFor(clazz);
+			BasicArrayTypeInfo<?, ?> tpeInfo2 = BasicArrayTypeInfo.getInfoFor(clazz);
+
+			assertEquals(tpeInfo1, tpeInfo2);
+			assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+		}
+	}
+
+	@Test
+	public void testBasicArrayTypeInfoInequality() {
+		for (Class<?> clazz1: classes) {
+			for (Class<?> clazz2: classes) {
+				if (!clazz1.equals(clazz2)) {
+					BasicArrayTypeInfo<?, ?> tpeInfo1 = BasicArrayTypeInfo.getInfoFor(clazz1);
+					BasicArrayTypeInfo<?, ?> tpeInfo2 = BasicArrayTypeInfo.getInfoFor(clazz2);
+					assertNotEquals(tpeInfo1, tpeInfo2);
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java
new file mode 100644
index 0000000..cdd06d0
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WNTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.Date;
+
+import static org.junit.Assert.*;
+
+public class BasicTypeInfoTest extends TestLogger {
+
+	static Class<?>[] classes = {String.class, Integer.class, Boolean.class, Byte.class,
+		Short.class, Long.class, Float.class, Double.class, Character.class, Date.class,
+		Void.class};
+
+	@Test
+	public void testBasicTypeInfoEquality() {
+		for (Class<?> clazz: classes) {
+			BasicTypeInfo<?> tpeInfo1 = BasicTypeInfo.getInfoFor(clazz);
+			BasicTypeInfo<?> tpeInfo2 = BasicTypeInfo.getInfoFor(clazz);
+
+			assertEquals(tpeInfo1, tpeInfo2);
+			assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+		}
+	}
+
+	@Test
+	public void testBasicTypeInfoInequality() {
+		for (Class<?> clazz1: classes) {
+			for (Class<?> clazz2: classes) {
+				if (!clazz1.equals(clazz2)) {
+					BasicTypeInfo<?> tpeInfo1 = BasicTypeInfo.getInfoFor(clazz1);
+					BasicTypeInfo<?> tpeInfo2 = BasicTypeInfo.getInfoFor(clazz2);
+					assertNotEquals(tpeInfo1, tpeInfo2);
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/test/java/org/apache/flink/types/NothingTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/NothingTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/types/NothingTypeInfoTest.java
new file mode 100644
index 0000000..a7976ee
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/NothingTypeInfoTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class NothingTypeInfoTest extends TestLogger {
+
+	@Test
+	public void testNothingTypeInfoEquality() {
+		NothingTypeInfo tpeInfo1 = new NothingTypeInfo();
+		NothingTypeInfo tpeInfo2 = new NothingTypeInfo();
+
+		assertEquals(tpeInfo1, tpeInfo2);
+		assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+	}
+
+	@Test
+	public void testNothingTypeInfoInequality() {
+		NothingTypeInfo tpeInfo1 = new NothingTypeInfo();
+		BasicTypeInfo<Integer> tpeInfo2 = BasicTypeInfo.getInfoFor(Integer.class);
+
+		assertNotEquals(tpeInfo1, tpeInfo2);
+		assertNotEquals(tpeInfo2, tpeInfo1);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-core/src/test/java/org/apache/flink/types/PrimitiveArrayTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/PrimitiveArrayTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/types/PrimitiveArrayTypeInfoTest.java
new file mode 100644
index 0000000..7ef14f9
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/PrimitiveArrayTypeInfoTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class PrimitiveArrayTypeInfoTest extends TestLogger {
+
+	static Class<?>[] classes = {int[].class, boolean[].class, byte[].class,
+		short[].class, long[].class, float[].class, double[].class, char[].class};
+
+	@Test
+	public void testPrimitiveArrayTypeInfoEquality() {
+		for (Class<?> clazz: classes) {
+			PrimitiveArrayTypeInfo<?> tpeInfo1 = PrimitiveArrayTypeInfo.getInfoFor(clazz);
+			PrimitiveArrayTypeInfo<?> tpeInfo2 = PrimitiveArrayTypeInfo.getInfoFor(clazz);
+
+			assertEquals(tpeInfo1, tpeInfo2);
+			assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+		}
+	}
+
+	@Test
+	public void testBasicArrayTypeInfoInequality() {
+		for (Class<?> clazz1: classes) {
+			for (Class<?> clazz2: classes) {
+				if (!clazz1.equals(clazz2)) {
+					PrimitiveArrayTypeInfo<?> tpeInfo1 = PrimitiveArrayTypeInfo.getInfoFor(clazz1);
+					PrimitiveArrayTypeInfo<?> tpeInfo2 = PrimitiveArrayTypeInfo.getInfoFor(clazz2);
+					assertNotEquals(tpeInfo1, tpeInfo2);
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index 052f960..34b5b47 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -300,7 +300,7 @@ public class CsvReader {
 			if(pos < 0) {
 				throw new IllegalArgumentException("Field \""+pojoFields[i]+"\" not part of POJO type "+pojoType.getCanonicalName());
 			}
-			classes[i] = typeInfo.getPojoFieldAt(pos).type.getTypeClass();
+			classes[i] = typeInfo.getPojoFieldAt(pos).getTypeInformation().getTypeClass();
 		}
 
 		configureInputFormat(inputFormat, classes);

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
index dbfd29e..eccdeac 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/UdfAnalyzerUtils.java
@@ -75,7 +75,7 @@ public final class UdfAnalyzerUtils {
 			final PojoTypeInfo<?> pojoTypeInfo = (PojoTypeInfo<?>) typeInfo;
 			HashMap<String, TaggedValue> containerMapping = new HashMap<String, TaggedValue>();
 			for (int i = 0; i < pojoTypeInfo.getArity(); i++) {
-				final String fieldName = pojoTypeInfo.getPojoFieldAt(i).field.getName();
+				final String fieldName = pojoTypeInfo.getPojoFieldAt(i).getField().getName();
 				containerMapping.put(fieldName,
 						convertTypeInfoToTaggedValue(input,
 								pojoTypeInfo.getTypeAt(i),

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
index ed03627..59c7eb0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
@@ -56,7 +56,7 @@ public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T>
 
 		for(int i = 0; i < pti.getArity(); i++) {
 			PojoField f = pti.getPojoFieldAt(i);
-			TypeInformation newType = f.type;
+			TypeInformation newType = f.getTypeInformation();
 			// check if type is a CharSequence
 			if(newType instanceof GenericTypeInfo) {
 				if((newType).getTypeClass().equals(CharSequence.class)) {
@@ -64,7 +64,7 @@ public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T>
 					newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class);
 				}
 			}
-			PojoField newField = new PojoField(f.field, newType);
+			PojoField newField = new PojoField(f.getField(), newType);
 			newFields.add(newField);
 		}
 		return newFields;


Mime
View raw message