flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/2] flink git commit: [FLINK-1463] Fix stateful/stateless Serializers and Comparators
Date Mon, 09 Feb 2015 13:28:29 GMT
[FLINK-1463] Fix stateful/stateless Serializers and Comparators

Before, Serializers would announce whether they are stateful or not and
rely on RuntimeStatefulSerializerFactory to do the duplication.
Comparators, on the other hand, had a duplicate method that the user was
required to call.

This commit removes the statful/stateless property from Serializers but
instead introduces a duplicate() method, similar to Comparators, that
can return the same instance.

The two serializer factories are merged into one that always calls
duplicate() before returning a serializer.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02b6f85f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02b6f85f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02b6f85f

Branch: refs/heads/release-0.8
Commit: 02b6f85fed8c1409b586e1e934acd72cee54adac
Parents: 91382bb
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Fri Jan 30 16:43:31 2015 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Feb 9 14:20:23 2015 +0100

----------------------------------------------------------------------
 .../api/function/source/FileSourceFunction.java |  10 +-
 .../streamrecord/StreamRecordSerializer.java    |   4 +-
 .../compiler/postpass/JavaApiPostPass.java      |  11 +-
 .../api/common/typeutils/TypeSerializer.java    |  16 +--
 .../typeutils/base/BooleanSerializer.java       |   5 -
 .../typeutils/base/BooleanValueSerializer.java  |   5 -
 .../common/typeutils/base/ByteSerializer.java   |   5 -
 .../typeutils/base/ByteValueSerializer.java     |   5 -
 .../common/typeutils/base/CharSerializer.java   |   5 -
 .../typeutils/base/CharValueSerializer.java     |   5 -
 .../common/typeutils/base/DateSerializer.java   |   5 -
 .../common/typeutils/base/DoubleSerializer.java |   5 -
 .../typeutils/base/DoubleValueSerializer.java   |   5 -
 .../common/typeutils/base/EnumSerializer.java   |   4 +-
 .../common/typeutils/base/FloatSerializer.java  |   5 -
 .../typeutils/base/FloatValueSerializer.java    |   5 -
 .../typeutils/base/GenericArraySerializer.java  |  12 +-
 .../common/typeutils/base/IntSerializer.java    |   5 -
 .../typeutils/base/IntValueSerializer.java      |   5 -
 .../common/typeutils/base/LongSerializer.java   |   5 -
 .../typeutils/base/LongValueSerializer.java     |   5 -
 .../common/typeutils/base/ShortSerializer.java  |   5 -
 .../typeutils/base/ShortValueSerializer.java    |   5 -
 .../common/typeutils/base/StringSerializer.java |   5 -
 .../typeutils/base/StringValueSerializer.java   |   5 -
 .../typeutils/base/TypeSerializerSingleton.java |   7 +-
 .../common/typeutils/base/VoidSerializer.java   |   5 -
 .../array/BooleanPrimitiveArraySerializer.java  |   5 -
 .../array/BytePrimitiveArraySerializer.java     |   5 -
 .../array/CharPrimitiveArraySerializer.java     |   5 -
 .../array/DoublePrimitiveArraySerializer.java   |   5 -
 .../array/FloatPrimitiveArraySerializer.java    |   5 -
 .../base/array/IntPrimitiveArraySerializer.java |   5 -
 .../array/LongPrimitiveArraySerializer.java     |   5 -
 .../array/ShortPrimitiveArraySerializer.java    |   5 -
 .../base/array/StringArraySerializer.java       |   5 -
 .../typeutils/record/RecordSerializer.java      |   5 +-
 .../java/typeutils/runtime/AvroSerializer.java  |   4 +-
 .../runtime/CopyableValueSerializer.java        |   4 +-
 .../runtime/GenericTypeComparator.java          |  14 +-
 .../java/typeutils/runtime/KryoSerializer.java  |   4 +-
 .../java/typeutils/runtime/PojoSerializer.java  |  33 +++--
 .../runtime/RuntimeSerializerFactory.java       | 124 ++++++++++++++++
 .../RuntimeStatefulSerializerFactory.java       | 140 -------------------
 .../RuntimeStatelessSerializerFactory.java      | 120 ----------------
 .../typeutils/runtime/TupleComparatorBase.java  |  28 +---
 .../java/typeutils/runtime/TupleSerializer.java |  20 +++
 .../typeutils/runtime/TupleSerializerBase.java  |  19 +--
 .../java/typeutils/runtime/ValueSerializer.java |   4 +-
 .../typeutils/runtime/WritableSerializer.java   |   4 +-
 .../api/java/io/CollectionInputFormatTest.java  |   4 +-
 .../operators/drivers/TestTaskContext.java      |   6 +-
 .../sort/MassiveStringSortingITCase.java        |   7 +-
 .../sort/MassiveStringValueSortingITCase.java   |   7 +-
 .../testutils/types/IntListSerializer.java      |   4 +-
 .../testutils/types/IntPairSerializer.java      |   8 +-
 .../testutils/types/StringPairSerializer.java   |   4 +-
 .../scala/typeutils/CaseClassSerializer.scala   |  16 ++-
 .../api/scala/typeutils/EitherSerializer.scala  |   2 +-
 .../api/scala/typeutils/NothingSerializer.scala |   2 +-
 .../api/scala/typeutils/OptionSerializer.scala  |   2 +-
 .../scala/typeutils/TraversableSerializer.scala |  18 ++-
 .../api/scala/typeutils/TrySerializer.scala     |   2 +-
 .../VertexWithAdjacencyListSerializer.java      |   5 -
 .../VertexWithRankAndDanglingSerializer.java    |   5 -
 .../types/VertexWithRankSerializer.java         |   5 -
 .../misc/MassiveCaseClassSortingITCase.scala    |   6 +-
 67 files changed, 268 insertions(+), 567 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index 5dfe4b2..a5ef3a7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -24,8 +24,7 @@ import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -49,12 +48,7 @@ public class FileSourceFunction extends RichSourceFunction<String> {
 	private static TypeSerializerFactory<String> createSerializer(TypeInformation<String> typeInfo) {
 		TypeSerializer<String> serializer = typeInfo.createSerializer();
 
-		if (serializer.isStateful()) {
-			return new RuntimeStatefulSerializerFactory<String>(serializer, typeInfo.getTypeClass());
-		} else {
-			return new RuntimeStatelessSerializerFactory<String>(serializer,
-					typeInfo.getTypeClass());
-		}
+		return new RuntimeSerializerFactory<String>(serializer, typeInfo.getTypeClass());
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
index 85faa9e..98f12ec 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
@@ -47,8 +47,8 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
+	public StreamRecordSerializer<T> duplicate() {
+		return this;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
index 2273461..e1b5bf8 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
@@ -43,8 +43,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.RuntimeComparatorFactory;
 import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.CompilerPostPassException;
 import org.apache.flink.compiler.plan.BulkIterationPlanNode;
@@ -278,12 +277,8 @@ public class JavaApiPostPass implements OptimizerPostPass {
 	
 	private static <T> TypeSerializerFactory<?> createSerializer(TypeInformation<T> typeInfo) {
 		TypeSerializer<T> serializer = typeInfo.createSerializer();
-		
-		if (serializer.isStateful()) {
-			return new RuntimeStatefulSerializerFactory<T>(serializer, typeInfo.getTypeClass());
-		} else {
-			return new RuntimeStatelessSerializerFactory<T>(serializer, typeInfo.getTypeClass());
-		}
+
+		return new RuntimeSerializerFactory<T>(serializer, typeInfo.getTypeClass());
 	}
 	
 	private static <T> TypeComparatorFactory<?> createComparator(TypeInformation<T> typeInfo, FieldList keys, boolean[] sortOrder) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 5e32c86..329e826 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -50,17 +50,15 @@ public abstract class TypeSerializer<T> implements Serializable {
 	 */
 	public abstract boolean isImmutableType();
 	
-	
 	/**
-	 * Gets whether the serializer is stateful. Statefulness means in this context that some of the serializer's
-	 * methods have objects with state and are thus not inherently thread-safe. A stateful serializer might be used by
-	 * multiple threads concurrently. For a stateful one, different instances will be used by different threads.
-	 * 
-	 * @return True, if the serializer is stateful, false if it is stateless;
+	 * Creates a deep copy of this serializer if it is necessary, i.e. if it is stateful. This
+	 * can return itself if the serializer is not stateful.
+	 *
+	 * We need this because Serializers might be used in several threads. Stateless serializers
+	 * are inherently thread-safe while stateful serializers might not be thread-safe.
 	 */
-	public abstract boolean isStateful();
-	
-	
+	public abstract TypeSerializer<T> duplicate();
+
 	// --------------------------------------------------------------------------------------------
 	// Instantiation & Cloning
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
index ecfb3c2..a844ac8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
@@ -37,11 +37,6 @@ public final class BooleanSerializer extends TypeSerializerSingleton<Boolean> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public Boolean createInstance() {
 		return FALSE;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
index 4795055..3aae95d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
@@ -38,11 +38,6 @@ public final class BooleanValueSerializer extends TypeSerializerSingleton<Boolea
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public BooleanValue createInstance() {
 		return new BooleanValue();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
index 32f3edd..92b3685 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
@@ -39,11 +39,6 @@ public final class ByteSerializer extends TypeSerializerSingleton<Byte> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public Byte createInstance() {
 		return ZERO;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
index 24cc98b..e523d5e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
@@ -38,11 +38,6 @@ public final class ByteValueSerializer extends TypeSerializerSingleton<ByteValue
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public ByteValue createInstance() {
 		return new ByteValue();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
index c46d3a0..181db56 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
@@ -39,11 +39,6 @@ public final class CharSerializer extends TypeSerializerSingleton<Character> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public Character createInstance() {
 		return ZERO;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
index 71a8ef4..690509c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
@@ -37,11 +37,6 @@ public class CharValueSerializer extends TypeSerializerSingleton<CharValue> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public CharValue createInstance() {
 		return new CharValue();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
index 4bd2ea8..6aa11eb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
@@ -37,11 +37,6 @@ public final class DateSerializer extends TypeSerializerSingleton<Date> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public Date createInstance() {
 		return new Date();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
index 8e09f7c..24af95c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
@@ -38,11 +38,6 @@ public final class DoubleSerializer extends TypeSerializerSingleton<Double> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-
-	@Override
 	public Double createInstance() {
 		return ZERO;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
index f4c7f37..34434f1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
@@ -38,11 +38,6 @@ public final class DoubleValueSerializer extends TypeSerializerSingleton<DoubleV
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public DoubleValue createInstance() {
 		return new DoubleValue();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
index 7ecf82a..643e4fa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
@@ -46,8 +46,8 @@ public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
+	public EnumSerializer<T> duplicate() {
+		return this;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
index b1a46b0..c823783 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
@@ -38,11 +38,6 @@ public final class FloatSerializer extends TypeSerializerSingleton<Float> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-
-	@Override
 	public Float createInstance() {
 		return ZERO;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
index 6ebb268..15d00b5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
@@ -38,11 +38,6 @@ public class FloatValueSerializer extends TypeSerializerSingleton<FloatValue> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public FloatValue createInstance() {
 		return new FloatValue();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index c72132d..355abd0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -50,15 +50,21 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 		this.componentSerializer = componentSerializer;
 		this.EMPTY = create(0);
 	}
-	
+
 	@Override
 	public boolean isImmutableType() {
 		return false;
 	}
 
 	@Override
-	public boolean isStateful() {
-		return this.componentSerializer.isStateful();
+	public GenericArraySerializer<C> duplicate() {
+		TypeSerializer<C> duplicateComponentSerializer = this.componentSerializer.duplicate();
+		if (duplicateComponentSerializer == this.componentSerializer) {
+			// is not stateful, return ourselves
+			return this;
+		} else {
+			return new GenericArraySerializer<C>(componentClass, duplicateComponentSerializer);
+		}
 	}
 
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
index 2937b2a..778f044 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
@@ -39,11 +39,6 @@ public final class IntSerializer extends TypeSerializerSingleton<Integer> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public Integer createInstance() {
 		return ZERO;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
index ec1f345..c2d1b60 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
@@ -38,11 +38,6 @@ public final class IntValueSerializer extends TypeSerializerSingleton<IntValue>
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public IntValue createInstance() {
 		return new IntValue();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
index 6b25596..6d8b758 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
@@ -39,11 +39,6 @@ public final class LongSerializer extends TypeSerializerSingleton<Long> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public Long createInstance() {
 		return ZERO;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
index 95caf04..37dec40 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
@@ -38,11 +38,6 @@ public final class LongValueSerializer extends TypeSerializerSingleton<LongValue
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public LongValue createInstance() {
 		return new LongValue();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
index c6e7870..44e5e3e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
@@ -39,11 +39,6 @@ public final class ShortSerializer extends TypeSerializerSingleton<Short> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-
-	@Override
 	public Short createInstance() {
 		return ZERO;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
index ab58987..1dbe4a5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
@@ -38,11 +38,6 @@ public final class ShortValueSerializer extends TypeSerializerSingleton<ShortVal
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public ShortValue createInstance() {
 		return new ShortValue();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
index 71221a2..7b26600 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
@@ -39,11 +39,6 @@ public final class StringSerializer extends TypeSerializerSingleton<String> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-
-	@Override
 	public String createInstance() {
 		return EMPTY;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
index c5d5b55..7628cab 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
@@ -40,11 +40,6 @@ public final class StringValueSerializer extends TypeSerializerSingleton<StringV
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public StringValue createInstance() {
 		return new StringValue();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
index 979d5ab..e076e5b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
@@ -25,7 +25,12 @@ public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{
 	private static final long serialVersionUID = 8766687317209282373L;
 
 	// --------------------------------------------------------------------------------------------
-	
+
+	@Override
+	public TypeSerializerSingleton<T> duplicate() {
+		return this;
+	}
+
 	@Override
 	public int hashCode() {
 		return super.hashCode();

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
index 33bb901..272ffbd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
@@ -35,11 +35,6 @@ public final class VoidSerializer extends TypeSerializerSingleton<Void> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public Void createInstance() {
 		return null;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
index e9941a8..4a493ac 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
@@ -40,11 +40,6 @@ public final class BooleanPrimitiveArraySerializer extends TypeSerializerSinglet
 	public boolean isImmutableType() {
 		return false;
 	}
-
-	@Override
-	public boolean isStateful() {
-		return false;
-	}
 	
 	@Override
 	public boolean[] createInstance() {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
index aaf867f..fb4d506 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
@@ -41,11 +41,6 @@ public final class BytePrimitiveArraySerializer extends TypeSerializerSingleton<
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public byte[] createInstance() {
 		return EMPTY;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
index 64632bd..8e3c4ea 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
@@ -42,11 +42,6 @@ public final class CharPrimitiveArraySerializer extends TypeSerializerSingleton<
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public char[] createInstance() {
 		return EMPTY;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
index 846ae74..10e25c2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
@@ -42,11 +42,6 @@ public final class DoublePrimitiveArraySerializer extends TypeSerializerSingleto
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public double[] createInstance() {
 		return EMPTY;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
index 8f42ac8..d57af00 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
@@ -42,11 +42,6 @@ public final class FloatPrimitiveArraySerializer extends TypeSerializerSingleton
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public float[] createInstance() {
 		return EMPTY;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
index 2ab056c..eaff287 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
@@ -42,11 +42,6 @@ public class IntPrimitiveArraySerializer extends TypeSerializerSingleton<int[]>{
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public int[] createInstance() {
 		return EMPTY;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
index 5d34dfe..55a22c2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
@@ -42,11 +42,6 @@ public final class LongPrimitiveArraySerializer extends TypeSerializerSingleton<
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public long[] createInstance() {
 		return EMPTY;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
index 2f37033..08275b0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
@@ -40,11 +40,6 @@ public final class ShortPrimitiveArraySerializer extends TypeSerializerSingleton
 	public boolean isImmutableType() {
 		return false;
 	}
-
-	@Override
-	public boolean isStateful() {
-		return false;
-	}
 	
 	@Override
 	public short[] createInstance() {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
index d5ab030..ad172a8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
@@ -44,11 +44,6 @@ public final class StringArraySerializer extends TypeSerializerSingleton<String[
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	
-	@Override
 	public String[] createInstance() {
 		return EMPTY;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
index 7b72e89..11b21d6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
@@ -57,8 +57,9 @@ public final class RecordSerializer extends TypeSerializer<Record> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
+	public RecordSerializer duplicate() {
+		// does not hold state, so just return ourselves
+		return this;
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index cc72fa3..2758bd6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -79,8 +79,8 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return true;
+	public AvroSerializer duplicate() {
+		return new AvroSerializer(type, typeToInstantiate);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
index 8710f2d..193d495 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
@@ -47,8 +47,8 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
+	public CopyableValueSerializer<T> duplicate() {
+		return this;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
index 7caa770..039cef7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
@@ -42,9 +41,7 @@ public class GenericTypeComparator<T extends Comparable<T>> extends TypeComparat
 
 	private final Class<T> type;
 
-	private final TypeSerializerFactory<T> serializerFactory;
-
-	private transient TypeSerializer<T> serializer;
+	private TypeSerializer<T> serializer;
 
 	private transient T reference;
 
@@ -61,15 +58,11 @@ public class GenericTypeComparator<T extends Comparable<T>> extends TypeComparat
 		this.ascending = ascending;
 		this.serializer = serializer;
 		this.type = type;
-
-		this.serializerFactory = this.serializer.isStateful()
-				? new RuntimeStatefulSerializerFactory<T>(this.serializer, this.type)
-				: new RuntimeStatelessSerializerFactory<T>(this.serializer, this.type);
 	}
 
 	private GenericTypeComparator(GenericTypeComparator<T> toClone) {
 		this.ascending = toClone.ascending;
-		this.serializerFactory = toClone.serializerFactory;
+		this.serializer = toClone.serializer.duplicate();
 		this.type = toClone.type;
 	}
 
@@ -104,9 +97,6 @@ public class GenericTypeComparator<T extends Comparable<T>> extends TypeComparat
 
 	@Override
 	public int compareSerialized(final DataInputView firstSource, final DataInputView secondSource) throws IOException {
-		if (this.serializer == null) {
-			this.serializer = this.serializerFactory.getSerializer();
-		}
 
 		if (this.reference == null) {
 			this.reference = this.serializer.createInstance();

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index 3a51534..a5f5f65 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -63,8 +63,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return true;
+	public KryoSerializer<T> duplicate() {
+		return new KryoSerializer<T>(this.type);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 99b9f65..7bd3243 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -42,9 +42,6 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 	private final int numFields;
 
-	private final boolean stateful;
-
-
 	@SuppressWarnings("unchecked")
 	public PojoSerializer(Class<T> clazz, TypeSerializer<?>[] fieldSerializers, Field[] fields) {
 		this.clazz = clazz;
@@ -55,15 +52,6 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 		for (int i = 0; i < numFields; i++) {
 			this.fields[i].setAccessible(true);
 		}
-
-		boolean stateful = false;
-		for (TypeSerializer<?> ser : fieldSerializers) {
-			if (ser.isStateful()) {
-				stateful = true;
-				break;
-			}
-		}
-		this.stateful = stateful;
 	}
 
 	private void writeObject(ObjectOutputStream out)
@@ -109,10 +97,25 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return this.stateful;
+	public PojoSerializer<T> duplicate() {
+		boolean stateful = false;
+		TypeSerializer[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length];
+
+		for (int i = 0; i < fieldSerializers.length; i++) {
+			duplicateFieldSerializers[i] = fieldSerializers[i].duplicate();
+			if (duplicateFieldSerializers[i] != fieldSerializers[i]) {
+				// at least one of them is stateful
+				stateful = true;
+			}
+		}
+
+		if (stateful) {
+			return new PojoSerializer<T>(clazz, duplicateFieldSerializers, fields);
+		} else {
+			return this;
+		}
 	}
-	
+
 	
 	@Override
 	public T createInstance() {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
new file mode 100644
index 0000000..96aff73
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.InstantiationUtil;
+
+public final class RuntimeSerializerFactory<T> implements TypeSerializerFactory<T>, java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
+	
+
+	private static final String CONFIG_KEY_SER = "SER_DATA";
+
+	private static final String CONFIG_KEY_CLASS = "CLASS_DATA";
+
+	
+	private TypeSerializer<T> serializer;
+
+	private boolean firstSerializer = true;
+
+	private Class<T> clazz;
+
+	// Because we read the class from the TaskConfig and instantiate ourselves
+	public RuntimeSerializerFactory() {}
+
+	public RuntimeSerializerFactory(TypeSerializer<T> serializer, Class<T> clazz) {
+		if (serializer == null || clazz == null) {
+			throw new NullPointerException();
+		}
+
+		this.clazz = clazz;
+		this.serializer = serializer;
+	}
+
+
+	@Override
+	public void writeParametersToConfig(Configuration config) {
+		try {
+			InstantiationUtil.writeObjectToConfig(clazz, config, CONFIG_KEY_CLASS);
+			InstantiationUtil.writeObjectToConfig(serializer, config, CONFIG_KEY_SER);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Could not serialize serializer into the configuration.", e);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {
+		if (config == null || cl == null) {
+			throw new NullPointerException();
+		}
+		
+		try {
+			this.clazz = (Class<T>) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl);
+			this.serializer = (TypeSerializer<T>)  InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_SER, cl);
+			firstSerializer = true;
+		}
+		catch (ClassNotFoundException e) {
+			throw e;
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Could not load deserializer from the configuration.", e);
+		}
+	}
+
+	@Override
+	public TypeSerializer<T> getSerializer() {
+		if (this.serializer != null) {
+			if (firstSerializer) {
+				firstSerializer = false;
+				return this.serializer;
+			} else {
+				return this.serializer.duplicate();
+			}
+		} else {
+			throw new RuntimeException("SerializerFactory has not been initialized from configuration.");
+		}
+	}
+
+	@Override
+	public Class<T> getDataType() {
+		return clazz;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return clazz.hashCode() ^ serializer.hashCode();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj != null && obj instanceof RuntimeSerializerFactory) {
+			RuntimeSerializerFactory<?> other = (RuntimeSerializerFactory<?>) obj;
+			
+			return this.clazz == other.clazz &&
+					this.serializer.equals(other.serializer);
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java
deleted file mode 100644
index 19cd3b7..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.InstantiationUtil;
-
-public final class RuntimeStatefulSerializerFactory<T> implements TypeSerializerFactory<T>, java.io.Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final String CONFIG_KEY_SER = "SER_DATA";
-
-	private static final String CONFIG_KEY_CLASS = "CLASS_DATA";
-
-	private byte[] serializerData;
-	
-	private TypeSerializer<T> serializer;		// only for equality comparisons
-	
-	private transient ClassLoader loader;
-
-	private Class<T> clazz;
-
-	public RuntimeStatefulSerializerFactory() {}
-
-	public RuntimeStatefulSerializerFactory(TypeSerializer<T> serializer, Class<T> clazz) {
-		this.clazz = clazz;
-		this.loader = serializer.getClass().getClassLoader();
-		
-		try {
-			this.serializerData = InstantiationUtil.serializeObject(serializer);
-		} catch (IOException e) {
-			throw new RuntimeException("Cannt serialize the Serializer.", e);
-		}
-	}
-
-
-	@Override
-	public void writeParametersToConfig(Configuration config) {
-		try {
-			InstantiationUtil.writeObjectToConfig(clazz, config, CONFIG_KEY_CLASS);
-			config.setBytes(CONFIG_KEY_SER, this.serializerData);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Could not serialize serializer into the configuration.", e);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {
-		if (config == null || cl == null) {
-			throw new NullPointerException();
-		}
-		
-		this.serializerData = config.getBytes(CONFIG_KEY_SER, null);
-		if (this.serializerData == null) {
-			throw new RuntimeException("Could not find deserializer in the configuration."); 
-		}
-		
-		this.loader = cl;
-		
-		try {
-			this.clazz = (Class<T>) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl);
-		}
-		catch (ClassNotFoundException e) {
-			throw e;
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Could not load deserializer from the configuration.", e);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public TypeSerializer<T> getSerializer() {
-		if (serializerData != null) {
-			try {
-				return (TypeSerializer<T>) InstantiationUtil.deserializeObject(this.serializerData, this.loader);
-			} catch (Exception e) {
-				throw new RuntimeException("Repeated instantiation of serializer failed.", e);
-			}
-		} else {
-			throw new RuntimeException("SerializerFactory has not been initialized from configuration.");
-		}
-	}
-
-	@Override
-	public Class<T> getDataType() {
-		return this.clazz;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return clazz.hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj != null && obj instanceof RuntimeStatefulSerializerFactory) {
-			@SuppressWarnings("unchecked")
-			RuntimeStatefulSerializerFactory<T> other = (RuntimeStatefulSerializerFactory<T>) obj;
-			
-			if (this.serializer == null) {
-				this.serializer = getSerializer();
-			}
-			if (other.serializer == null) {
-				other.serializer = other.getSerializer();
-			}
-			
-			return this.clazz == other.clazz &&
-					this.serializer.equals(other.serializer);
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatelessSerializerFactory.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatelessSerializerFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatelessSerializerFactory.java
deleted file mode 100644
index 041b824..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatelessSerializerFactory.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.InstantiationUtil;
-
-public final class RuntimeStatelessSerializerFactory<T> implements TypeSerializerFactory<T>, java.io.Serializable {
-
-	private static final long serialVersionUID = 1L;
-	
-
-	private static final String CONFIG_KEY_SER = "SER_DATA";
-
-	private static final String CONFIG_KEY_CLASS = "CLASS_DATA";
-
-	
-	private TypeSerializer<T> serializer;
-
-	private Class<T> clazz;
-
-
-	public RuntimeStatelessSerializerFactory() {}
-
-	public RuntimeStatelessSerializerFactory(TypeSerializer<T> serializer, Class<T> clazz) {
-		if (serializer == null || clazz == null) {
-			throw new NullPointerException();
-		}
-		
-		if (serializer.isStateful()) {
-			throw new IllegalArgumentException("Cannot use the stateless serializer factory with a stateful serializer.");
-		}
-		
-		this.clazz = clazz;
-		this.serializer = serializer;
-	}
-
-
-	@Override
-	public void writeParametersToConfig(Configuration config) {
-		try {
-			InstantiationUtil.writeObjectToConfig(clazz, config, CONFIG_KEY_CLASS);
-			InstantiationUtil.writeObjectToConfig(serializer, config, CONFIG_KEY_SER);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Could not serialize serializer into the configuration.", e);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {
-		if (config == null || cl == null) {
-			throw new NullPointerException();
-		}
-		
-		try {
-			this.clazz = (Class<T>) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl);
-			this.serializer = (TypeSerializer<T>)  InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_SER, cl);
-		}
-		catch (ClassNotFoundException e) {
-			throw e;
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Could not load deserializer from the configuration.", e);
-		}
-	}
-
-	@Override
-	public TypeSerializer<T> getSerializer() {
-		if (this.serializer != null) {
-			return this.serializer;
-		} else {
-			throw new RuntimeException("SerializerFactory has not been initialized from configuration.");
-		}
-	}
-
-	@Override
-	public Class<T> getDataType() {
-		return clazz;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return clazz.hashCode() ^ serializer.hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj != null && obj instanceof RuntimeStatelessSerializerFactory) {
-			RuntimeStatelessSerializerFactory<?> other = (RuntimeStatelessSerializerFactory<?>) obj;
-			
-			return this.clazz == other.clazz &&
-					this.serializer.equals(other.serializer);
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
index abcf89c..28169e5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
@@ -23,7 +23,6 @@ import java.util.List;
 import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.KeyFieldOutOfBoundsException;
@@ -41,10 +40,6 @@ public abstract class TupleComparatorBase<T> extends CompositeTypeComparator<T>
 	@SuppressWarnings("rawtypes")
 	protected TypeComparator[] comparators;
 
-	/** serializer factories to duplicate non thread-safe serializers */
-	protected TypeSerializerFactory<Object>[] serializerFactories;
-
-
 	protected int[] normalizedKeyLengths;
 
 	protected int numLeadingNormalizableKeys;
@@ -56,7 +51,7 @@ public abstract class TupleComparatorBase<T> extends CompositeTypeComparator<T>
 
 	/** serializers to deserialize the first n fields for comparison */
 	@SuppressWarnings("rawtypes")
-	protected transient TypeSerializer[] serializers;
+	protected TypeSerializer[] serializers;
 
 	// cache for the deserialized field objects
 	protected transient Object[] deserializedFields1;
@@ -70,14 +65,6 @@ public abstract class TupleComparatorBase<T> extends CompositeTypeComparator<T>
 		this.comparators = (TypeComparator<Object>[]) comparators;
 		this.serializers = (TypeSerializer<Object>[]) serializers;
 
-		// set the serializer factories.
-		this.serializerFactories = new TypeSerializerFactory[this.serializers.length];
-		for (int i = 0; i < serializers.length; i++) {
-			this.serializerFactories[i] = this.serializers[i].isStateful() ?
-					new RuntimeStatefulSerializerFactory<Object>(this.serializers[i], Object.class) :
-					new RuntimeStatelessSerializerFactory<Object>(this.serializers[i], Object.class);
-		}
-
 		// set up auxiliary fields for normalized key support
 		this.normalizedKeyLengths = new int[keyPositions.length];
 		int nKeys = 0;
@@ -129,7 +116,11 @@ public abstract class TupleComparatorBase<T> extends CompositeTypeComparator<T>
 	protected void privateDuplicate(TupleComparatorBase<T> toClone) {
 		// copy fields and serializer factories
 		this.keyPositions = toClone.keyPositions;
-		this.serializerFactories = toClone.serializerFactories;
+
+		this.serializers = new TypeSerializer[toClone.serializers.length];
+		for (int i = 0; i < toClone.serializers.length; i++) {
+			this.serializers[i] = toClone.serializers[i].duplicate();
+		}
 
 		this.comparators = new TypeComparator[toClone.comparators.length];
 		for (int i = 0; i < toClone.comparators.length; i++) {
@@ -261,13 +252,6 @@ public abstract class TupleComparatorBase<T> extends CompositeTypeComparator<T>
 	// --------------------------------------------------------------------------------------------
 	
 	protected final void instantiateDeserializationUtils() {
-		if (this.serializers == null) {
-			this.serializers = new TypeSerializer[this.serializerFactories.length];
-			for (int i = 0; i < this.serializers.length; i++) {
-				this.serializers[i] = this.serializerFactories[i].getSerializer();
-			}
-		}
-		
 		this.deserializedFields1 = new Object[this.serializers.length];
 		this.deserializedFields2 = new Object[this.serializers.length];
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index ae429a7..9071446 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -36,6 +36,26 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
 	}
 
 	@Override
+	public TupleSerializer<T> duplicate() {
+		boolean stateful = false;
+		TypeSerializer[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length];
+
+		for (int i = 0; i < fieldSerializers.length; i++) {
+			duplicateFieldSerializers[i] = fieldSerializers[i].duplicate();
+			if (duplicateFieldSerializers[i] != fieldSerializers[i]) {
+				// at least one of them is stateful
+				stateful = true;
+			}
+		}
+
+		if (stateful) {
+			return new TupleSerializer<T>(tupleClass, duplicateFieldSerializers);
+		} else {
+			return this;
+		}
+	}
+
+	@Override
 	public T createInstance() {
 		try {
 			T t = tupleClass.newInstance();

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index 08df7d3..afc99d0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -32,27 +32,15 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
 
 	protected final Class<T> tupleClass;
 
-	protected final TypeSerializer<Object>[] fieldSerializers;
+	protected TypeSerializer<Object>[] fieldSerializers;
 
 	protected final int arity;
 
-	protected final boolean stateful;
-
-
 	@SuppressWarnings("unchecked")
 	public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
 		this.tupleClass = tupleClass;
 		this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
 		this.arity = fieldSerializers.length;
-		
-		boolean stateful = false;
-		for (TypeSerializer<?> ser : fieldSerializers) {
-			if (ser.isStateful()) {
-				stateful = true;
-				break;
-			}
-		}
-		this.stateful = stateful;
 	}
 	
 	
@@ -62,11 +50,6 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return this.stateful;
-	}
-
-	@Override
 	public int getLength() {
 		return -1;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index d6c35cb..ad1b0f0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -62,8 +62,8 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return true;
+	public ValueSerializer<T> duplicate() {
+		return new ValueSerializer<T>(type);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index c89733e..777122e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -98,8 +98,8 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
 	}
 	
 	@Override
-	public boolean isStateful() {
-		return true;
+	public WritableSerializer duplicate() {
+		return new WritableSerializer(typeClass);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 64dae22..118e707 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -278,8 +278,8 @@ public class CollectionInputFormatTest {
 		}
 
 		@Override
-		public boolean isStateful() {
-			return false;
+		public TestSerializer duplicate() {
+			return this;
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index e5ece3f..09ef9ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.operators.drivers;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -86,13 +86,13 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
 	@SuppressWarnings("unchecked")
 	public <X> void setInput1(MutableObjectIterator<X> input, TypeSerializer<X> serializer) {
 		this.input1 = input;
-		this.serializer1 = new RuntimeStatefulSerializerFactory<X>(serializer, (Class<X>) serializer.createInstance().getClass());
+		this.serializer1 = new RuntimeSerializerFactory<X>(serializer, (Class<X>) serializer.createInstance().getClass());
 	}
 
 	@SuppressWarnings("unchecked")
 	public <X> void setInput2(MutableObjectIterator<X> input, TypeSerializer<X> serializer) {
 		this.input2 = input;
-		this.serializer2 = new RuntimeStatefulSerializerFactory<X>(serializer, (Class<X>) serializer.createInstance().getClass());
+		this.serializer2 = new RuntimeSerializerFactory<X>(serializer, (Class<X>) serializer.createInstance().getClass());
 	}
 	
 	public void setComparator1(TypeComparator<?> comparator) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
index 9dec847..63ca948 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
@@ -34,12 +34,11 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Assert;
@@ -95,7 +94,7 @@ public class MassiveStringSortingITCase {
 				MutableObjectIterator<String> inputIterator = new StringReaderMutableObjectIterator(reader);
 				
 				sorter = new UnilateralSortMerger<String>(mm, ioMan, inputIterator, new DummyInvokable(),
-						new RuntimeStatelessSerializerFactory<String>(serializer, String.class), comparator, 1.0, 4, 0.8f);
+						new RuntimeSerializerFactory<String>(serializer, String.class), comparator, 1.0, 4, 0.8f);
 
 				MutableObjectIterator<String> sortedData = sorter.getIterator();
 				
@@ -187,7 +186,7 @@ public class MassiveStringSortingITCase {
 				MutableObjectIterator<Tuple2<String, String[]>> inputIterator = new StringTupleReaderMutableObjectIterator(reader);
 				
 				sorter = new UnilateralSortMerger<Tuple2<String, String[]>>(mm, ioMan, inputIterator, new DummyInvokable(),
-						new RuntimeStatelessSerializerFactory<Tuple2<String, String[]>>(serializer, (Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f);
+						new RuntimeSerializerFactory<Tuple2<String, String[]>>(serializer, (Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f);
 
 				
 				

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
index f2a3fc7..8368c61 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
@@ -33,12 +33,11 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator;
 import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.MutableObjectIterator;
@@ -91,7 +90,7 @@ public class MassiveStringValueSortingITCase {
 				MutableObjectIterator<StringValue> inputIterator = new StringValueReaderMutableObjectIterator(reader);
 				
 				sorter = new UnilateralSortMerger<StringValue>(mm, ioMan, inputIterator, new DummyInvokable(),
-						new RuntimeStatelessSerializerFactory<StringValue>(serializer, StringValue.class), comparator, 1.0, 4, 0.8f);
+						new RuntimeSerializerFactory<StringValue>(serializer, StringValue.class), comparator, 1.0, 4, 0.8f);
 
 				MutableObjectIterator<StringValue> sortedData = sorter.getIterator();
 				
@@ -185,7 +184,7 @@ public class MassiveStringValueSortingITCase {
 				MutableObjectIterator<Tuple2<StringValue, StringValue[]>> inputIterator = new StringValueTupleReaderMutableObjectIterator(reader);
 				
 				sorter = new UnilateralSortMerger<Tuple2<StringValue, StringValue[]>>(mm, ioMan, inputIterator, new DummyInvokable(),
-						new RuntimeStatelessSerializerFactory<Tuple2<StringValue, StringValue[]>>(serializer, (Class<Tuple2<StringValue, StringValue[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f);
+						new RuntimeSerializerFactory<Tuple2<StringValue, StringValue[]>>(serializer, (Class<Tuple2<StringValue, StringValue[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f);
 
 				
 				

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
index 2134bcd..69dfeb9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
@@ -35,8 +35,8 @@ public class IntListSerializer extends TypeSerializer<IntList> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
+	public IntListSerializer duplicate() {
+		return this;
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
index 361585d..c2571cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
@@ -38,8 +38,8 @@ public class IntPairSerializer extends TypeSerializer<IntPair> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
+	public IntPairSerializer duplicate() {
+		return this;
 	}
 
 	@Override
@@ -105,12 +105,12 @@ public class IntPairSerializer extends TypeSerializer<IntPair> {
 		public Class<IntPair> getDataType() {
 			return IntPair.class;
 		}
-		
+
 		@Override
 		public int hashCode() {
 			return 42;
 		}
-		
+
 		public boolean equals(Object obj) {
 			return obj.getClass() == IntPairSerializerFactory.class;
 		};

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
index a38633c..388e8bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
@@ -35,8 +35,8 @@ public class StringPairSerializer extends TypeSerializer<StringPair> {
 	}
 
 	@Override
-	public boolean isStateful() {
-		return false;
+	public StringPairSerializer duplicate() {
+		return this;
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
index cba824a..91fb67a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import org.apache.commons.lang.SerializationUtils
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
 import org.apache.flink.core.memory.{DataOutputView, DataInputView}
@@ -29,12 +30,23 @@ import org.apache.flink.core.memory.{DataOutputView, DataInputView}
 abstract class CaseClassSerializer[T <: Product](
     clazz: Class[T],
     scalaFieldSerializers: Array[TypeSerializer[_]])
-  extends TupleSerializerBase[T](clazz, scalaFieldSerializers) {
+  extends TupleSerializerBase[T](clazz, scalaFieldSerializers) with Cloneable {
 
   @transient var fields : Array[AnyRef] = _
 
   @transient var instanceCreationFailed : Boolean = false
 
+  override def duplicate = {
+    val result = this.clone().asInstanceOf[CaseClassSerializer[T]]
+
+    // set transient fields to null and make copy of serializers
+    result.fields = null
+    result.instanceCreationFailed = false
+    result.fieldSerializers = fieldSerializers.map(_.duplicate())
+
+    result
+  }
+
   def createInstance: T = {
     if (instanceCreationFailed) {
       null.asInstanceOf[T]
@@ -58,8 +70,6 @@ abstract class CaseClassSerializer[T <: Product](
     }
   }
 
-  override def isStateful() = true
-
   def copy(from: T, reuse: T): T = {
     copy(from)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index d28e9dd..c14e27a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -28,7 +28,7 @@ class EitherSerializer[A, B, T <: Either[A, B]](
     val rightSerializer: TypeSerializer[B])
   extends TypeSerializer[T] {
 
-  override def isStateful: Boolean = false
+  override def duplicate: EitherSerializer[A,B,T] = this
 
   override def createInstance: T = {
     Left(null).asInstanceOf[T]

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
index f25dd6c..bd076ed 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
@@ -27,7 +27,7 @@ import org.apache.flink.core.memory.{DataOutputView, DataInputView}
  */
 class NothingSerializer extends TypeSerializer[Any] {
 
-  override def isStateful: Boolean = false
+  override def duplicate: NothingSerializer = this
 
   override def createInstance: Any = {
     Integer.valueOf(-1)


Mime
View raw message