flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [5/5] flink git commit: [FLINK-1369] [types] Add support for Subclasses, Interfaces, Abstract Classes.
Date Tue, 10 Feb 2015 13:11:15 GMT
[FLINK-1369] [types] Add support for Subclasses, Interfaces, Abstract Classes.

- Abstract classes with fields are handled as POJO types.
- Interfaces and abstract classes without fields are handled as generic types.

This closes #236
This closes #316


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7407076d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7407076d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7407076d

Branch: refs/heads/master
Commit: 7407076d3990752eb5fa4072cd036efd2f656cbc
Parents: 6b402f4
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Nov 26 13:27:06 2014 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue Feb 10 13:15:03 2015 +0100

----------------------------------------------------------------------
 .../compiler/postpass/JavaApiPostPass.java      |  15 +-
 .../flink/compiler/util/NoOpBinaryUdfOp.java    |   3 +-
 .../flink/compiler/util/NoOpUnaryUdfOp.java     |   3 +-
 flink-core/pom.xml                              |  48 +-
 .../flink/api/common/ExecutionConfig.java       | 160 ++++++-
 .../api/common/functions/RuntimeContext.java    |   7 +
 .../util/AbstractRuntimeUDFContext.java         |  17 +-
 .../functions/util/RuntimeUDFContext.java       |   9 +-
 .../common/operators/CollectionExecutor.java    |  32 +-
 .../api/common/operators/DualInputOperator.java |   3 +-
 .../common/operators/GenericDataSinkBase.java   |   7 +-
 .../common/operators/GenericDataSourceBase.java |   5 +-
 .../common/operators/SingleInputOperator.java   |   3 +-
 .../flink/api/common/operators/Union.java       |   3 +-
 .../operators/base/BulkIterationBase.java       |   3 +-
 .../operators/base/CoGroupOperatorBase.java     |  27 +-
 .../base/CollectorMapOperatorBase.java          |   3 +-
 .../operators/base/CrossOperatorBase.java       |  13 +-
 .../operators/base/DeltaIterationBase.java      |   3 +-
 .../operators/base/FilterOperatorBase.java      |   3 +-
 .../operators/base/FlatMapOperatorBase.java     |  11 +-
 .../operators/base/GroupReduceOperatorBase.java |  23 +-
 .../common/operators/base/JoinOperatorBase.java |  21 +-
 .../common/operators/base/MapOperatorBase.java  |  11 +-
 .../base/MapPartitionOperatorBase.java          |  11 +-
 .../operators/base/PartitionOperatorBase.java   |   3 +-
 .../operators/base/ReduceOperatorBase.java      |  14 +-
 .../flink/api/common/typeinfo/AtomicType.java   |   3 +-
 .../api/common/typeinfo/BasicArrayTypeInfo.java |   5 +-
 .../api/common/typeinfo/BasicTypeInfo.java      |   5 +-
 .../api/common/typeinfo/NothingTypeInfo.java    |   3 +-
 .../common/typeinfo/PrimitiveArrayTypeInfo.java |   3 +-
 .../api/common/typeinfo/TypeInformation.java    |   9 +-
 .../api/common/typeutils/CompositeType.java     |  11 +-
 .../functions/util/RuntimeUDFContextTest.java   |  11 +-
 .../base/FlatMapOperatorCollectionTest.java     |   9 +-
 .../operators/base/JoinOperatorBaseTest.java    |  15 +-
 .../common/operators/base/MapOperatorTest.java  |  16 +-
 .../base/PartitionMapOperatorTest.java          |  10 +-
 .../common/typeutils/SerializerTestBase.java    |   2 +-
 .../flink/api/java/CollectionEnvironment.java   |   2 +-
 .../java/org/apache/flink/api/java/DataSet.java |  12 +-
 .../flink/api/java/ExecutionEnvironment.java    |  43 +-
 .../flink/api/java/io/CsvOutputFormat.java      |   3 +-
 .../java/io/LocalCollectionOutputFormat.java    |   5 +-
 .../api/java/io/TypeSerializerInputFormat.java  |   4 +-
 .../api/java/io/TypeSerializerOutputFormat.java |   5 +-
 .../flink/api/java/operators/CrossOperator.java |   4 +-
 .../flink/api/java/operators/JoinOperator.java  |   4 +-
 .../api/java/operators/ProjectOperator.java     |   4 +-
 .../translation/PlanProjectOperator.java        |   5 +-
 .../flink/api/java/typeutils/EnumTypeInfo.java  |   5 +-
 .../api/java/typeutils/GenericTypeInfo.java     |  12 +-
 .../java/typeutils/InputTypeConfigurable.java   |   9 +-
 .../api/java/typeutils/MissingTypeInfo.java     |   3 +-
 .../api/java/typeutils/ObjectArrayTypeInfo.java |   7 +-
 .../flink/api/java/typeutils/PojoTypeInfo.java  |  17 +-
 .../api/java/typeutils/RecordTypeInfo.java      |   3 +-
 .../flink/api/java/typeutils/TupleTypeInfo.java |  11 +-
 .../flink/api/java/typeutils/TypeExtractor.java |   1 +
 .../flink/api/java/typeutils/ValueTypeInfo.java |   5 +-
 .../api/java/typeutils/WritableTypeInfo.java    |   5 +-
 .../java/typeutils/runtime/KryoSerializer.java  |  96 +---
 .../java/typeutils/runtime/PojoSerializer.java  | 459 +++++++++++++++----
 .../base/CoGroupOperatorCollectionTest.java     |  20 +-
 .../operators/base/GroupReduceOperatorTest.java |  17 +-
 .../operators/base/JoinOperatorBaseTest.java    |   9 +-
 .../operators/base/ReduceOperatorTest.java      |  15 +-
 .../api/java/io/CollectionInputFormatTest.java  |   5 +-
 .../api/java/io/TypeSerializerFormatTest.java   |   3 +-
 .../java/type/extractor/TypeExtractorTest.java  |  38 +-
 .../api/java/typeutils/CompositeTypeTest.java   |   3 +-
 .../api/java/typeutils/TypeInfoParserTest.java  |   8 +-
 .../runtime/CopyableValueComparatorTest.java    |   2 -
 .../runtime/KryoGenericArraySerializerTest.java |   3 +-
 .../runtime/KryoGenericTypeComparatorTest.java  |   3 +-
 .../runtime/KryoGenericTypeSerializerTest.java  |   5 +-
 .../runtime/KryoVersusAvroMinibenchmark.java    |  13 +-
 .../runtime/KryoWithCustomSerializersTest.java  |  12 +-
 .../MultidimensionalArraySerializerTest.java    |  11 +-
 .../typeutils/runtime/PojoComparatorTest.java   |   5 +-
 .../runtime/PojoGenericTypeSerializerTest.java  |   3 +-
 .../typeutils/runtime/PojoSerializerTest.java   |  13 +-
 .../runtime/PojoSubclassComparatorTest.java     |  76 +++
 .../runtime/PojoSubclassSerializerTest.java     | 196 ++++++++
 .../SubclassFromInterfaceSerializerTest.java    | 170 +++++++
 .../runtime/TupleComparatorILD2Test.java        |   2 -
 .../runtime/TupleComparatorILD3Test.java        |   2 -
 .../runtime/TupleComparatorILDC3Test.java       |   2 -
 .../runtime/TupleComparatorILDX1Test.java       |   2 -
 .../runtime/TupleComparatorILDXC2Test.java      |   2 -
 .../runtime/TupleComparatorISD1Test.java        |   2 -
 .../runtime/TupleComparatorISD2Test.java        |   2 -
 .../runtime/TupleComparatorISD3Test.java        |   2 -
 .../typeutils/runtime/TupleSerializerTest.java  |   3 +-
 .../typeutils/runtime/ValueComparatorTest.java  |   2 -
 .../runtime/WritableComparatorTest.java         |   2 -
 .../runtime/WritableSerializerTest.java         |   4 +-
 .../java/type/lambdas/LambdaExtractionTest.java |   1 -
 .../task/AbstractIterativePactTask.java         |   7 +-
 .../jobgraph/tasks/AbstractInvokable.java       |  27 ++
 .../flink/runtime/operators/DataSourceTask.java |  18 -
 .../runtime/operators/PactTaskContext.java      |   6 +-
 .../runtime/operators/RegularPactTask.java      |  19 +-
 .../operators/chaining/ChainedDriver.java       |   2 +-
 .../operators/sort/LargeRecordHandler.java      |  11 +-
 .../util/DistributedRuntimeUDFContext.java      |   9 +-
 .../drivers/AllGroupReduceDriverTest.java       |  15 +-
 .../operators/drivers/AllReduceDriverTest.java  |  27 +-
 .../drivers/GroupReduceDriverTest.java          |  39 +-
 .../drivers/ReduceCombineDriverTest.java        |  39 +-
 .../operators/drivers/ReduceDriverTest.java     |  39 +-
 .../sort/ExternalSortLargeRecordsITCase.java    |  17 +-
 .../sort/LargeRecordHandlerITCase.java          |   7 +-
 .../operators/sort/LargeRecordHandlerTest.java  |  13 +-
 .../sort/MassiveStringSortingITCase.java        |   5 +-
 .../sort/MassiveStringValueSortingITCase.java   |   5 +-
 .../scala/operators/ScalaAggregateOperator.java |  15 +-
 .../scala/operators/ScalaCsvInputFormat.java    |   7 +-
 .../scala/operators/ScalaCsvOutputFormat.java   |   3 +-
 .../apache/flink/api/scala/CrossDataSet.scala   |   5 +-
 .../flink/api/scala/ExecutionEnvironment.scala  |   8 +-
 .../api/scala/UnfinishedCoGroupOperation.scala  |   6 +-
 .../flink/api/scala/codegen/TypeAnalyzer.scala  |   8 +-
 .../api/scala/codegen/TypeInformationGen.scala  |  12 +-
 .../apache/flink/api/scala/joinDataSet.scala    |   6 +-
 .../api/scala/typeutils/CaseClassTypeInfo.scala |  11 +-
 .../api/scala/typeutils/EitherTypeInfo.scala    |  17 +-
 .../api/scala/typeutils/OptionTypeInfo.scala    |   6 +-
 .../scala/typeutils/TraversableTypeInfo.scala   |   3 +-
 .../api/scala/typeutils/TrySerializer.scala     |   5 +-
 .../flink/api/scala/typeutils/TryTypeInfo.scala |   8 +-
 .../mapred/HadoopReduceCombineFunction.java     |   4 +-
 .../mapred/HadoopReduceFunction.java            |   4 +-
 .../wrapper/HadoopTupleUnwrappingIterator.java  |   5 +-
 .../HadoopTupleUnwrappingIteratorTest.java      |   4 +-
 .../apache/flink/streaming/api/StreamGraph.java |  19 +-
 .../streaming/api/datastream/DataStream.java    |   2 +-
 .../temporaloperator/StreamJoinOperator.java    |   8 +-
 .../environment/StreamExecutionEnvironment.java |  63 ++-
 .../aggregation/AggregationFunction.java        |   4 +-
 .../aggregation/ComparableAggregator.java       |   2 +-
 .../api/function/aggregation/SumAggregator.java |   2 +-
 .../api/function/source/FileSourceFunction.java |   4 +-
 .../api/invokable/StreamInvokable.java          |   9 +-
 .../operator/GroupedWindowInvokable.java        |   2 +-
 .../invokable/operator/ProjectInvokable.java    |   2 +-
 .../api/invokable/operator/co/CoInvokable.java  |   3 +-
 .../streamrecord/StreamRecordSerializer.java    |   5 +-
 .../api/streamvertex/CoStreamVertex.java        |   2 +-
 .../api/streamvertex/StreamVertex.java          |   4 +-
 .../streamvertex/StreamingRuntimeContext.java   |   5 +-
 .../streaming/util/keys/KeySelectorUtil.java    |   6 +-
 .../streaming/api/AggregationFunctionTest.java  |   3 +-
 .../flink/streaming/util/MockCoContext.java     |   7 +-
 .../flink/streaming/util/MockContext.java       |   5 +-
 .../flink/streaming/api/scala/DataStream.scala  |   6 +-
 .../api/scala/StreamCrossOperator.scala         |   6 +-
 .../api/scala/StreamExecutionEnvironment.scala  |  30 ++
 .../api/scala/StreamJoinOperator.scala          |  25 +-
 .../api/scala/WindowedDataStream.scala          |   8 +-
 .../WordCountSubclassInterfacePOJOITCase.java   | 152 ++++++
 .../WordCountSubclassPOJOITCase.java            | 123 +++++
 .../scala/io/CollectionInputFormatTest.scala    |  10 +-
 .../misc/MassiveCaseClassSortingITCase.scala    |   9 +-
 .../scala/runtime/CaseClassComparatorTest.scala |   9 +-
 .../runtime/KryoGenericTypeSerializerTest.scala |  13 +-
 .../ScalaSpecialTypesSerializerTest.scala       |   6 +-
 .../runtime/TraversableSerializerTest.scala     |   5 +-
 .../scala/runtime/TupleComparatorILD2Test.scala |   8 +-
 .../scala/runtime/TupleComparatorILD3Test.scala |   9 +-
 .../runtime/TupleComparatorILDC3Test.scala      |   9 +-
 .../runtime/TupleComparatorILDX1Test.scala      |   5 +-
 .../runtime/TupleComparatorILDXC2Test.scala     |   5 +-
 .../scala/runtime/TupleComparatorISD1Test.scala |   5 +-
 .../scala/runtime/TupleComparatorISD2Test.scala |   5 +-
 .../scala/runtime/TupleComparatorISD3Test.scala |   9 +-
 .../api/scala/runtime/TupleSerializerTest.scala |  10 +-
 .../scala/types/TypeInformationGenTest.scala    |   1 +
 179 files changed, 2211 insertions(+), 801 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
index 208ff2e..11ac231 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.DualInputOperator;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.Operator;
@@ -70,9 +71,13 @@ public class JavaApiPostPass implements OptimizerPostPass {
 	
 	private final Set<PlanNode> alreadyDone = new HashSet<PlanNode>();
 
+	private ExecutionConfig executionConfig = null;
 	
 	@Override
 	public void postPass(OptimizedPlan plan) {
+
+		executionConfig = plan.getOriginalPactPlan().getExecutionConfig();
+
 		for (SinkPlanNode sink : plan.getDataSinks()) {
 			traverse(sink);
 		}
@@ -275,22 +280,22 @@ public class JavaApiPostPass implements OptimizerPostPass {
 		}
 	}
 	
-	private static <T> TypeSerializerFactory<?> createSerializer(TypeInformation<T> typeInfo) {
-		TypeSerializer<T> serializer = typeInfo.createSerializer();
+	private <T> TypeSerializerFactory<?> createSerializer(TypeInformation<T> typeInfo) {
+		TypeSerializer<T> serializer = typeInfo.createSerializer(executionConfig);
 
 		return new RuntimeSerializerFactory<T>(serializer, typeInfo.getTypeClass());
 	}
 	
 	@SuppressWarnings("unchecked")
-	private static <T> TypeComparatorFactory<?> createComparator(TypeInformation<T> typeInfo, FieldList keys, boolean[] sortOrder) {
+	private <T> TypeComparatorFactory<?> createComparator(TypeInformation<T> typeInfo, FieldList keys, boolean[] sortOrder) {
 		
 		TypeComparator<T> comparator;
 		if (typeInfo instanceof CompositeType) {
-			comparator = ((CompositeType<T>) typeInfo).createComparator(keys.toArray(), sortOrder, 0);
+			comparator = ((CompositeType<T>) typeInfo).createComparator(keys.toArray(), sortOrder, 0, executionConfig);
 		}
 		else if (typeInfo instanceof AtomicType) {
 			// handle grouping of atomic types
-			comparator = ((AtomicType<T>) typeInfo).createComparator(sortOrder[0]);
+			comparator = ((AtomicType<T>) typeInfo).createComparator(sortOrder[0], executionConfig);
 		}
 		else {
 			throw new RuntimeException("Unrecognized type: " + typeInfo);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java
index 4b48ec7..166b7b8 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java
@@ -20,6 +20,7 @@ package org.apache.flink.compiler.util;
 
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.NoOpFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
@@ -43,7 +44,7 @@ public class NoOpBinaryUdfOp<OUT> extends DualInputOperator<OUT, OUT, OUT, NoOpF
 	}
 
 	@Override
-	protected List<OUT> executeOnCollections(List<OUT> inputData1, List<OUT> inputData2, RuntimeContext runtimeContext, boolean mutables) {
+	protected List<OUT> executeOnCollections(List<OUT> inputData1, List<OUT> inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
 		throw new UnsupportedOperationException();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java
index 474d3a4..5013ae5 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java
@@ -20,6 +20,7 @@ package org.apache.flink.compiler.util;
 
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.NoOpFunction;
 import org.apache.flink.api.common.operators.RecordOperator;
@@ -54,7 +55,7 @@ public class NoOpUnaryUdfOp<OUT> extends SingleInputOperator<OUT, OUT, NoOpFunct
 	}
 
 	@Override
-	protected List<OUT> executeOnCollections(List<OUT> inputData, RuntimeContext runtimeContext, boolean mutables) {
+	protected List<OUT> executeOnCollections(List<OUT> inputData, RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
 		return inputData;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 810860e..182a77a 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -46,7 +46,18 @@ under the License.
 			<artifactId>commons-collections</artifactId>
 			<!-- managed version -->
 		</dependency>
-		
+
+		<dependency>
+			<groupId>com.esotericsoftware.kryo</groupId>
+			<artifactId>kryo</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>com.twitter</groupId>
+			<artifactId>chill_2.10</artifactId>
+			<version>0.5.1</version>
+		</dependency>
+
 		<!--  guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading -->
 		<dependency>
 			<groupId>com.google.guava</groupId>
@@ -72,4 +83,39 @@ under the License.
 		</plugins>
 	</build>
 
+	<!-- See main pom.xml for explanation of profiles -->
+	<profiles>
+		<profile>
+			<id>hadoop-1</id>
+			<activation>
+				<property>
+					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
+					<!--hadoop1--><name>hadoop.profile</name><value>1</value>
+				</property>
+			</activation>
+			<dependencies>
+				<!-- "Old" Hadoop = MapReduce v1 -->
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-core</artifactId>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<id>hadoop-2</id>
+			<activation>
+				<property>
+					<!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
+					<!--hadoop2--><name>!hadoop.profile</name>
+				</property>
+			</activation>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-common</artifactId>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 17e683f..5fa01b7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -18,7 +18,13 @@
 
 package org.apache.flink.api.common;
 
+import com.esotericsoftware.kryo.Serializer;
+
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * A configuration config for configuring behavior of the system, such as whether to use
@@ -34,11 +40,17 @@ public class ExecutionConfig implements Serializable {
 	private boolean useClosureCleaner = true;
 	private int degreeOfParallelism = -1;
 	private int numberOfExecutionRetries = -1;
+	private boolean forceKryo = false;
 
-	// For future use...
-//	private boolean forceGenericSerializer = false;
 	private boolean objectReuse = false;
 
+
+	// Serializers and types registered with Kryo and the PojoSerializer
+	private final Map<Class<?>, Serializer<?>> registeredKryoSerializers = new HashMap<Class<?>, Serializer<?>>();
+	private final Map<Class<?>, Class<? extends Serializer<?>>> registeredKryoSerializersClasses = new HashMap<Class<?>, Class<? extends Serializer<?>>>();
+	private final Set<Class<?>> registeredKryoTypes = new HashSet<Class<?>>();
+	private final Set<Class<?>> registeredPojoTypes = new HashSet<Class<?>>();
+
 	/**
 	 * Enables the ClosureCleaner. This analyzes user code functions and sets fields to null
 	 * that are not used. This will in most cases make closures or anonymous inner classes
@@ -128,21 +140,26 @@ public class ExecutionConfig implements Serializable {
 		return this;
 	}
 
-	// These are for future use...
-//	public ExecutionConfig forceGenericSerializer() {
-//		forceGenericSerializer = true;
-//		return this;
-//	}
-//
-//	public ExecutionConfig disableForceGenericSerializer() {
-//		forceGenericSerializer = false;
-//		return this;
-//	}
-//
-//	public boolean isForceGenericSerializerEnabled() {
-//		return forceGenericSerializer;
-//	}
-//
+	/**
+	 * Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO.
+	 * In some cases this might be preferable. For example, when using interfaces
+	 * with subclasses that cannot be analyzed as POJO.
+	 */
+	public void enableForceKryo() {
+		forceKryo = true;
+	}
+
+
+	/**
+	 * Disable use of Kryo serializer for all POJOs.
+	 */
+	public void disableForceKryo() {
+		forceKryo = false;
+	}
+
+	public boolean isForceKryoEnabled() {
+		return forceKryo;
+	}
 
 	/**
 	 * Enables reusing objects that Flink internally uses for deserialization and passing
@@ -169,4 +186,113 @@ public class ExecutionConfig implements Serializable {
 	public boolean isObjectReuseEnabled() {
 		return objectReuse;
 	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Registry for types and serializers
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Registers the given Serializer as a default serializer for the given type at the
+	 * {@link org.apache.flink.api.common.typeutils.runtime.KryoSerializer}.
+	 *
+	 * Note that the serializer instance must be serializable (as defined by java.io.Serializable),
+	 * because it may be distributed to the worker nodes by java serialization.
+	 *
+	 * @param type The class of the types serialized with the given serializer.
+	 * @param serializer The serializer to use.
+	 */
+	public void registerKryoSerializer(Class<?> type, Serializer<?> serializer) {
+		if (type == null || serializer == null) {
+			throw new NullPointerException("Cannot register null class or serializer.");
+		}
+		if (!(serializer instanceof java.io.Serializable)) {
+			throw new IllegalArgumentException("The serializer instance must be serializable, (for distributing it in the cluster), "
+					+ "as defined by java.io.Serializable. For stateless serializers, you can use the "
+					+ "'registerSerializer(Class, Class)' method to register the serializer via its class.");
+		}
+
+		registeredKryoSerializers.put(type, serializer);
+	}
+
+	/**
+	 * Registers the given Serializer via its class as a serializer for the given type at the
+	 * {@link org.apache.flink.api.common.typeutils.runtime.KryoSerializer}.
+	 *
+	 * @param type The class of the types serialized with the given serializer.
+	 * @param serializerClass The class of the serializer to use.
+	 */
+	public void registerKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) {
+		if (type == null || serializerClass == null) {
+			throw new NullPointerException("Cannot register null class or serializer.");
+		}
+
+		registeredKryoSerializersClasses.put(type, serializerClass);
+	}
+
+	/**
+	 * Registers the given type with the serialization stack. If the type is eventually
+	 * serialized as a POJO, then the type is registered with the POJO serializer. If the
+	 * type ends up being serialized with Kryo, then it will be registered at Kryo to make
+	 * sure that only tags are written.
+	 *
+	 * @param type The class of the type to register.
+	 */
+	public void registerPojoType(Class<?> type) {
+		if (type == null) {
+			throw new NullPointerException("Cannot register null type class.");
+		}
+		registeredPojoTypes.add(type);
+	}
+
+	/**
+	 * Registers the given type with the serialization stack. If the type is eventually
+	 * serialized as a POJO, then the type is registered with the POJO serializer. If the
+	 * type ends up being serialized with Kryo, then it will be registered at Kryo to make
+	 * sure that only tags are written.
+	 *
+	 * @param type The class of the type to register.
+	 */
+	public void registerKryoType(Class<?> type) {
+		if (type == null) {
+			throw new NullPointerException("Cannot register null type class.");
+		}
+		registeredKryoTypes.add(type);
+	}
+
+	/**
+	 * Returns the registered Kryo Serializers.
+	 */
+	public Map<Class<?>, Serializer<?>> getRegisteredKryoSerializers() {
+		return registeredKryoSerializers;
+	}
+
+	/**
+	 * Returns the registered Kryo Serializer classes.
+	 */
+	public Map<Class<?>, Class<? extends Serializer<?>>> getRegisteredKryoSerializersClasses() {
+		return registeredKryoSerializersClasses;
+	}
+
+	/**
+	 * Returns the registered Kryo types.
+	 */
+	public Set<Class<?>> getRegisteredKryoTypes() {
+		if (isForceKryoEnabled()) {
+			// if we force kryo, we must also return all the types that
+			// were previously only registered as POJO
+			Set<Class<?>> result = new HashSet<Class<?>>();
+			result.addAll(registeredKryoTypes);
+			result.addAll(registeredPojoTypes);
+			return result;
+		} else {
+			return registeredKryoTypes;
+		}
+	}
+
+	/**
+	 * Returns the registered POJO types.
+	 */
+	public Set<Class<?>> getRegisteredPojoTypes() {
+		return registeredPojoTypes;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index ab938c0..e9209a8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.DoubleCounter;
 import org.apache.flink.api.common.accumulators.Histogram;
@@ -60,6 +61,12 @@ public interface RuntimeContext {
 	 * @return The number of the parallel subtask.
 	 */
 	int getIndexOfThisSubtask();
+
+	/**
+	 * Returns the {@link org.apache.flink.api.common.ExecutionConfig} for the currently executing
+	 * job.
+	 */
+	ExecutionConfig getExecutionConfig();
 	
 	/**
 	 * Gets the ClassLoader to load classes that were are not in system's classpath, but are part of the

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 6b755e1..c04548c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.FutureTask;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.accumulators.DoubleCounter;
@@ -46,23 +47,31 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 
 	private final ClassLoader userCodeClassLoader;
 
+	private final ExecutionConfig executionConfig;
+
 	private final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
 	
 	private final DistributedCache distributedCache = new DistributedCache();
 	
 	
-	public AbstractRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader) {
+	public AbstractRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) {
 		this.name = name;
 		this.numParallelSubtasks = numParallelSubtasks;
 		this.subtaskIndex = subtaskIndex;
 		this.userCodeClassLoader = userCodeClassLoader;
+		this.executionConfig = executionConfig;
 	}
 	
-	public AbstractRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, Map<String, FutureTask<Path>> cpTasks) {
-		this(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader);
+	public AbstractRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map<String, FutureTask<Path>> cpTasks) {
+		this(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig);
 		this.distributedCache.setCopyTasks(cpTasks);
 	}
-	
+
+	@Override
+	public ExecutionConfig getExecutionConfig() {
+		return executionConfig;
+	}
+
 	@Override
 	public String getTaskName() {
 		return this.name;

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
index 74fddef..b9c98cd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.FutureTask;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.core.fs.Path;
@@ -37,12 +38,12 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext {
 	private final HashMap<String, List<?>> uninitializedBroadcastVars = new HashMap<String, List<?>>();
 	
 	
-	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader) {
-		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader);
+	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) {
+		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig);
 	}
 	
-	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, Map<String, FutureTask<Path>> cpTasks) {
-		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, cpTasks);
+	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map<String, FutureTask<Path>> cpTasks) {
+		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, cpTasks);
 	}
 	
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index ea68554..afccd7c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
@@ -69,15 +70,18 @@ public class CollectionExecutor {
 	private final ClassLoader classLoader;
 	
 	private final boolean mutableObjectSafeMode;
+
+	private final ExecutionConfig executionConfig;
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public CollectionExecutor() {
-		this(DEFAULT_MUTABLE_OBJECT_SAFE_MODE);
+	public CollectionExecutor(ExecutionConfig executionConfig) {
+		this(executionConfig, DEFAULT_MUTABLE_OBJECT_SAFE_MODE);
 	}
 		
-	public CollectionExecutor(boolean mutableObjectSafeMode) {
+	public CollectionExecutor(ExecutionConfig executionConfig, boolean mutableObjectSafeMode) {
 		this.mutableObjectSafeMode = mutableObjectSafeMode;
+		this.executionConfig = executionConfig;
 		
 		this.intermediateResults = new HashMap<Operator<?>, List<?>>();
 		this.accumulators = new HashMap<String, Accumulator<?,?>>();
@@ -161,13 +165,13 @@ public class CollectionExecutor {
 		@SuppressWarnings("unchecked")
 		GenericDataSinkBase<IN> typedSink = (GenericDataSinkBase<IN>) sink;
 
-		typedSink.executeOnCollections(input);
+		typedSink.executeOnCollections(input, executionConfig);
 	}
 	
 	private <OUT> List<OUT> executeDataSource(GenericDataSourceBase<?, ?> source) throws Exception {
 		@SuppressWarnings("unchecked")
 		GenericDataSourceBase<OUT, ?> typedSource = (GenericDataSourceBase<OUT, ?>) source;
-		return typedSource.executeOnCollections(mutableObjectSafeMode);
+		return typedSource.executeOnCollections(executionConfig, mutableObjectSafeMode);
 	}
 	
 	private <IN, OUT> List<OUT> executeUnaryOperator(SingleInputOperator<?, ?, ?> operator, int superStep) throws Exception {
@@ -185,8 +189,8 @@ public class CollectionExecutor {
 		// build the runtime context and compute broadcast variables, if necessary
 		RuntimeUDFContext ctx;
 		if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
-			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader()) :
-					new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader);
+			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader(), executionConfig) :
+					new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader, executionConfig);
 			
 			for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
 				List<?> bcData = execute(bcInputs.getValue());
@@ -196,7 +200,7 @@ public class CollectionExecutor {
 			ctx = null;
 		}
 		
-		List<OUT> result = typedOp.executeOnCollections(inputData, ctx, mutableObjectSafeMode);
+		List<OUT> result = typedOp.executeOnCollections(inputData, ctx, executionConfig);
 		
 		if (ctx != null) {
 			AccumulatorHelper.mergeInto(this.accumulators, ctx.getAllAccumulators());
@@ -227,8 +231,8 @@ public class CollectionExecutor {
 		// build the runtime context and compute broadcast variables, if necessary
 		RuntimeUDFContext ctx;
 		if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
-			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader) :
-				new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader);
+			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig) :
+				new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader, executionConfig);
 			
 			for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
 				List<?> bcData = execute(bcInputs.getValue());
@@ -238,7 +242,7 @@ public class CollectionExecutor {
 			ctx = null;
 		}
 		
-		List<OUT> result = typedOp.executeOnCollections(inputData1, inputData2, ctx, mutableObjectSafeMode);
+		List<OUT> result = typedOp.executeOnCollections(inputData1, inputData2, ctx, executionConfig);
 		
 		if (ctx != null) {
 			AccumulatorHelper.mergeInto(this.accumulators, ctx.getAllAccumulators());
@@ -349,7 +353,7 @@ public class CollectionExecutor {
 
 		int[] keyColumns = iteration.getSolutionSetKeyFields();
 		boolean[] inputOrderings = new boolean[keyColumns.length];
-		TypeComparator<T> inputComparator = ((CompositeType<T>) solutionType).createComparator(keyColumns, inputOrderings, 0);
+		TypeComparator<T> inputComparator = ((CompositeType<T>) solutionType).createComparator(keyColumns, inputOrderings, 0, executionConfig);
 
 		Map<TypeComparable<T>, T> solutionMap = new HashMap<TypeComparable<T>, T>(solutionInputData.size());
 		// fill the solution from the initial input
@@ -482,8 +486,8 @@ public class CollectionExecutor {
 
 		private final int superstep;
 
-		public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, int superstep, ClassLoader classloader) {
-			super(name, numParallelSubtasks, subtaskIndex, classloader);
+		public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, int superstep, ClassLoader classloader, ExecutionConfig executionConfig) {
+			super(name, numParallelSubtasks, subtaskIndex, classloader, executionConfig);
 			this.superstep = superstep;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
index 9cdea6d..f43f847 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators;
 
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
@@ -286,5 +287,5 @@ public abstract class DualInputOperator<IN1, IN2, OUT, FT extends Function> exte
 	
 	// --------------------------------------------------------------------------------------------
 	
-	protected abstract List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) throws Exception;
+	protected abstract List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
index 242e83d..2f8a396 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.io.FinalizeOnMaster;
 import org.apache.flink.api.common.io.InitializeOnMaster;
@@ -298,7 +299,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	protected void executeOnCollections(List<IN> inputData) throws Exception {
+	protected void executeOnCollections(List<IN> inputData, ExecutionConfig executionConfig) throws Exception {
 		OutputFormat<IN> format = this.formatWrapper.getUserCodeObject();
 		TypeInformation<IN> inputType = getInput().getOperatorInfo().getOutputType();
 
@@ -308,9 +309,9 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
 
 			final TypeComparator<IN> sortComparator;
 			if (inputType instanceof CompositeType) {
-				sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0);
+				sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0, executionConfig);
 			} else if (inputType instanceof AtomicType) {
-				sortComparator = ((AtomicType) inputType).createComparator(sortOrderings[0]);
+				sortComparator = ((AtomicType) inputType).createComparator(sortOrderings[0], executionConfig);
 			} else {
 				throw new UnsupportedOperationException("Local output sorting does not support type "+inputType+" yet.");
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
index ad1b2e4..13c5dad 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
@@ -22,6 +22,7 @@ package org.apache.flink.api.common.operators;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
@@ -176,7 +177,7 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O
 	
 	// --------------------------------------------------------------------------------------------
 	
-	protected List<OUT> executeOnCollections(boolean mutableObjectSafe) throws Exception {
+	protected List<OUT> executeOnCollections(ExecutionConfig executionConfig, boolean mutableObjectSafe) throws Exception {
 		@SuppressWarnings("unchecked")
 		InputFormat<OUT, InputSplit> inputFormat = (InputFormat<OUT, InputSplit>) this.formatWrapper.getUserCodeObject();
 		inputFormat.configure(this.parameters);
@@ -185,7 +186,7 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O
 		
 		// splits
 		InputSplit[] splits = inputFormat.createInputSplits(1);
-		TypeSerializer<OUT> serializer = getOperatorInfo().getOutputType().createSerializer();
+		TypeSerializer<OUT> serializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
 		
 		for (InputSplit split : splits) {
 			inputFormat.open(split);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
index eddf89b..ada4ab0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators;
 
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
@@ -203,5 +204,5 @@ public abstract class SingleInputOperator<IN, OUT, FT extends Function> extends
 	
 	// --------------------------------------------------------------------------------------------
 	
-	protected abstract List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) throws Exception;
+	protected abstract List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext runtimeContext, ExecutionConfig executionConfig) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
index d7d0e20..9586c5d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -47,7 +48,7 @@ public class Union<T> extends DualInputOperator<T, T, T, AbstractRichFunction> {
 	}
 
 	@Override
-	protected List<T> executeOnCollections(List<T> inputData1, List<T> inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) {
+	protected List<T> executeOnCollections(List<T> inputData1, List<T> inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
 		ArrayList<T> result = new ArrayList<T>(inputData1.size() + inputData2.size());
 		result.addAll(inputData1);
 		result.addAll(inputData2);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
index 31080cd..6304197 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -298,7 +299,7 @@ public class BulkIterationBase<T> extends SingleInputOperator<T, T, AbstractRich
 	}
 
 	@Override
-	protected List<T> executeOnCollections(List<T> inputData, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) {
+	protected List<T> executeOnCollections(List<T> inputData, RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
 		throw new UnsupportedOperationException();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
index 65b9d1c..4165f3d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.Partitioner;
@@ -186,7 +187,7 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 	// ------------------------------------------------------------------------
 
 	@Override
-	protected List<OUT> executeOnCollections(List<IN1> input1, List<IN2> input2, RuntimeContext ctx, boolean mutableObjectSafe) throws Exception {
+	protected List<OUT> executeOnCollections(List<IN1> input1, List<IN2> input2, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
 		// --------------------------------------------------------------------
 		// Setup
 		// --------------------------------------------------------------------
@@ -196,17 +197,19 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 		// for the grouping / merging comparator
 		int[] inputKeys1 = getKeyColumns(0);
 		int[] inputKeys2 = getKeyColumns(1);
+
+		boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
 		
 		boolean[] inputDirections1 = new boolean[inputKeys1.length];
 		boolean[] inputDirections2 = new boolean[inputKeys2.length];
 		Arrays.fill(inputDirections1, true);
 		Arrays.fill(inputDirections2, true);
 		
-		final TypeSerializer<IN1> inputSerializer1 = inputType1.createSerializer();
-		final TypeSerializer<IN2> inputSerializer2 = inputType2.createSerializer();
+		final TypeSerializer<IN1> inputSerializer1 = inputType1.createSerializer(executionConfig);
+		final TypeSerializer<IN2> inputSerializer2 = inputType2.createSerializer(executionConfig);
 		
-		final TypeComparator<IN1> inputComparator1 = getTypeComparator(inputType1, inputKeys1, inputDirections1);
-		final TypeComparator<IN2> inputComparator2 = getTypeComparator(inputType2, inputKeys2, inputDirections2);
+		final TypeComparator<IN1> inputComparator1 = getTypeComparator(executionConfig, inputType1, inputKeys1, inputDirections1);
+		final TypeComparator<IN2> inputComparator2 = getTypeComparator(executionConfig, inputType2, inputKeys2, inputDirections2);
 		
 		final TypeComparator<IN1> inputSortComparator1;
 		final TypeComparator<IN2> inputSortComparator2;
@@ -227,7 +230,7 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 			Arrays.fill(allSortDirections, 0, inputKeys1.length, true);
 			System.arraycopy(groupSortDirections, 0, allSortDirections, inputKeys1.length, groupSortDirections.length);
 			
-			inputSortComparator1 = getTypeComparator(inputType1, allSortKeys, allSortDirections);
+			inputSortComparator1 = getTypeComparator(executionConfig, inputType1, allSortKeys, allSortDirections);
 		}
 		
 		if (groupOrder2 == null || groupOrder2.getNumberOfFields() == 0) {
@@ -246,12 +249,12 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 			Arrays.fill(allSortDirections, 0, inputKeys2.length, true);
 			System.arraycopy(groupSortDirections, 0, allSortDirections, inputKeys2.length, groupSortDirections.length);
 			
-			inputSortComparator2 = getTypeComparator(inputType2, allSortKeys, allSortDirections);
+			inputSortComparator2 = getTypeComparator(executionConfig, inputType2, allSortKeys, allSortDirections);
 		}
 
 		CoGroupSortListIterator<IN1, IN2> coGroupIterator =
 				new CoGroupSortListIterator<IN1, IN2>(input1, inputSortComparator1, inputComparator1, inputSerializer1,
-						input2, inputSortComparator2, inputComparator2, inputSerializer2, mutableObjectSafe);
+						input2, inputSortComparator2, inputComparator2, inputSerializer2, objectReuseDisabled);
 
 		// --------------------------------------------------------------------
 		// Run UDF
@@ -262,8 +265,8 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 		FunctionUtils.openFunction(function, parameters);
 
 		List<OUT> result = new ArrayList<OUT>();
-		Collector<OUT> resultCollector = mutableObjectSafe ?
-				new CopyingListCollector<OUT>(result, getOperatorInfo().getOutputType().createSerializer()) :
+		Collector<OUT> resultCollector = objectReuseDisabled ?
+				new CopyingListCollector<OUT>(result, getOperatorInfo().getOutputType().createSerializer(executionConfig)) :
 				new ListCollector<OUT>(result);
 
 		while (coGroupIterator.next()) {
@@ -275,12 +278,12 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 		return result;
 	}
 
-	private <T> TypeComparator<T> getTypeComparator(TypeInformation<T> inputType, int[] inputKeys, boolean[] inputSortDirections) {
+	private <T> TypeComparator<T> getTypeComparator(ExecutionConfig executionConfig, TypeInformation<T> inputType, int[] inputKeys, boolean[] inputSortDirections) {
 		if (!(inputType instanceof CompositeType)) {
 			throw new InvalidProgramException("Input types of coGroup must be composite types.");
 		}
 
-		return ((CompositeType<T>) inputType).createComparator(inputKeys, inputSortDirections, 0);
+		return ((CompositeType<T>) inputType).createComparator(inputKeys, inputSortDirections, 0, executionConfig);
 	}
 
 	private static class CoGroupSortListIterator<IN1, IN2> {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
index 8ad91c6..62bdfbe 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.operators.base;
 
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.GenericCollectorMap;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.operators.SingleInputOperator;
@@ -52,7 +53,7 @@ public class CollectorMapOperatorBase<IN, OUT, FT extends GenericCollectorMap<IN
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) {
+	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) {
 		throw new UnsupportedOperationException();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
index c6ceef0..f20659c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -84,18 +85,20 @@ public class CrossOperatorBase<IN1, IN2, OUT, FT extends CrossFunction<IN1, IN2,
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
+	protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
 		CrossFunction<IN1, IN2, OUT> function = this.userFunction.getUserCodeObject();
 		
 		FunctionUtils.setFunctionRuntimeContext(function, ctx);
 		FunctionUtils.openFunction(function, this.parameters);
+
+		boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
 		
 		ArrayList<OUT> result = new ArrayList<OUT>(inputData1.size() * inputData2.size());
 		
-		if (mutableObjectSafeMode) {
-			TypeSerializer<IN1> inSerializer1 = getOperatorInfo().getFirstInputType().createSerializer();
-			TypeSerializer<IN2> inSerializer2 = getOperatorInfo().getSecondInputType().createSerializer();
-			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer();
+		if (objectReuseDisabled) {
+			TypeSerializer<IN1> inSerializer1 = getOperatorInfo().getFirstInputType().createSerializer(executionConfig);
+			TypeSerializer<IN2> inSerializer2 = getOperatorInfo().getSecondInputType().createSerializer(executionConfig);
+			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
 			
 			for (IN1 element1 : inputData1) {
 				for (IN2 element2 : inputData2) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
index f945b1d..2986534 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -332,7 +333,7 @@ public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, Ab
 	}
 
 	@Override
-	protected List<ST> executeOnCollections(List<ST> inputData1, List<WT> inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) {
+	protected List<ST> executeOnCollections(List<ST> inputData1, List<WT> inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
 		throw new UnsupportedOperationException();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
index f4bd537..4db5265 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -50,7 +51,7 @@ public class FilterOperatorBase<T, FT extends FlatMapFunction<T, T>> extends Sin
 	}
 
 	@Override
-	protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
+	protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
 		FlatMapFunction<T, T> function = this.userFunction.getUserCodeObject();
 		
 		FunctionUtils.setFunctionRuntimeContext(function, ctx);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
index 8312a99..615ba87 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.CopyingListCollector;
@@ -53,17 +54,19 @@ public class FlatMapOperatorBase<IN, OUT, FT extends FlatMapFunction<IN, OUT>> e
 	// ------------------------------------------------------------------------
 
 	@Override
-	protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
+	protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
 		FlatMapFunction<IN, OUT> function = userFunction.getUserCodeObject();
 		
 		FunctionUtils.setFunctionRuntimeContext(function, ctx);
 		FunctionUtils.openFunction(function, parameters);
 
 		ArrayList<OUT> result = new ArrayList<OUT>(input.size());
+
+		boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
 		
-		if (mutableObjectSafeMode) {
-			TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer();
-			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer();
+		if (objectReuseDisabled) {
+			TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
+			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
 			
 			CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index ddfd874..f4f7d0f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -152,9 +153,11 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
+	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
 		GroupReduceFunction<IN, OUT> function = this.userFunction.getUserCodeObject();
 
+		boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
+
 		UnaryOperatorInformation<IN, OUT> operatorInfo = getOperatorInfo();
 		TypeInformation<IN> inputType = operatorInfo.getInputType();
 
@@ -176,7 +179,7 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 			if(sortColumns.length == 0) { // => all reduce. No comparator
 				Preconditions.checkArgument(sortOrderings.length == 0);
 			} else {
-				final TypeComparator<IN> sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0);
+				final TypeComparator<IN> sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0, executionConfig);
 	
 				Collections.sort(inputData, new Comparator<IN>() {
 					@Override
@@ -193,9 +196,9 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 		ArrayList<OUT> result = new ArrayList<OUT>();
 
 		if (keyColumns.length == 0) {
-			if (mutableObjectSafeMode) {
-				final TypeSerializer<IN> inputSerializer = inputType.createSerializer();
-				TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer();
+			if (objectReuseDisabled) {
+				final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
+				TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
 				List<IN> inputDataCopy = new ArrayList<IN>(inputData.size());
 				for (IN in: inputData) {
 					inputDataCopy.add(inputSerializer.copy(in));
@@ -208,14 +211,14 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 				function.reduce(inputData, collector);
 			}
 		} else {
-			final TypeSerializer<IN> inputSerializer = inputType.createSerializer();
+			final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
 			boolean[] keyOrderings = new boolean[keyColumns.length];
-			final TypeComparator<IN> comparator = ((CompositeType<IN>) inputType).createComparator(keyColumns, keyOrderings, 0);
+			final TypeComparator<IN> comparator = ((CompositeType<IN>) inputType).createComparator(keyColumns, keyOrderings, 0, executionConfig);
 
-			ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator, mutableObjectSafeMode);
+			ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator, objectReuseDisabled);
 
-			if (mutableObjectSafeMode) {
-				TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer();
+			if (objectReuseDisabled) {
+				TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
 				CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
 
 				while (keyedIterator.nextKey()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
index 555175d..373846f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -139,7 +140,7 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
 
 	@SuppressWarnings("unchecked")
 	@Override
-	protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafe) throws Exception {
+	protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) throws Exception {
 		FlatJoinFunction<IN1, IN2, OUT> function = userFunction.getUserCodeObject();
 
 		FunctionUtils.setFunctionRuntimeContext(function, runtimeContext);
@@ -148,22 +149,24 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
 		TypeInformation<IN1> leftInformation = getOperatorInfo().getFirstInputType();
 		TypeInformation<IN2> rightInformation = getOperatorInfo().getSecondInputType();
 		TypeInformation<OUT> outInformation = getOperatorInfo().getOutputType();
+
+		boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
 		
-		TypeSerializer<IN1> leftSerializer = mutableObjectSafe ? leftInformation.createSerializer() : null;
-		TypeSerializer<IN2> rightSerializer = mutableObjectSafe ? rightInformation.createSerializer() : null;
+		TypeSerializer<IN1> leftSerializer = objectReuseDisabled ? leftInformation.createSerializer(executionConfig) : null;
+		TypeSerializer<IN2> rightSerializer = objectReuseDisabled ? rightInformation.createSerializer(executionConfig) : null;
 		
 		TypeComparator<IN1> leftComparator;
 		TypeComparator<IN2> rightComparator;
 
 		if (leftInformation instanceof AtomicType) {
-			leftComparator = ((AtomicType<IN1>) leftInformation).createComparator(true);
+			leftComparator = ((AtomicType<IN1>) leftInformation).createComparator(true, executionConfig);
 		}
 		else if (leftInformation instanceof CompositeType) {
 			int[] keyPositions = getKeyColumns(0);
 			boolean[] orders = new boolean[keyPositions.length];
 			Arrays.fill(orders, true);
 
-			leftComparator = ((CompositeType<IN1>) leftInformation).createComparator(keyPositions, orders, 0);
+			leftComparator = ((CompositeType<IN1>) leftInformation).createComparator(keyPositions, orders, 0, executionConfig);
 		}
 		else {
 			throw new RuntimeException("Type information for left input of type " + leftInformation.getClass()
@@ -171,14 +174,14 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
 		}
 
 		if (rightInformation instanceof AtomicType) {
-			rightComparator = ((AtomicType<IN2>) rightInformation).createComparator(true);
+			rightComparator = ((AtomicType<IN2>) rightInformation).createComparator(true, executionConfig);
 		}
 		else if (rightInformation instanceof CompositeType) {
 			int[] keyPositions = getKeyColumns(1);
 			boolean[] orders = new boolean[keyPositions.length];
 			Arrays.fill(orders, true);
 
-			rightComparator = ((CompositeType<IN2>) rightInformation).createComparator(keyPositions, orders, 0);
+			rightComparator = ((CompositeType<IN2>) rightInformation).createComparator(keyPositions, orders, 0, executionConfig);
 		}
 		else {
 			throw new RuntimeException("Type information for right input of type " + rightInformation.getClass()
@@ -188,7 +191,7 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
 		TypePairComparator<IN1, IN2> pairComparator = new GenericPairComparator<IN1, IN2>(leftComparator, rightComparator);
 
 		List<OUT> result = new ArrayList<OUT>();
-		Collector<OUT> collector = mutableObjectSafe ? new CopyingListCollector<OUT>(result, outInformation.createSerializer())
+		Collector<OUT> collector = objectReuseDisabled ? new CopyingListCollector<OUT>(result, outInformation.createSerializer(executionConfig))
 														: new ListCollector<OUT>(result);
 
 		Map<Integer, List<IN2>> probeTable = new HashMap<Integer, List<IN2>>();
@@ -212,7 +215,7 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
 				pairComparator.setReference(left);
 				for (IN2 right : matchingHashes) {
 					if (pairComparator.equalToReference(right)) {
-						if (mutableObjectSafe) {
+						if (objectReuseDisabled) {
 							function.join(leftSerializer.copy(left), rightSerializer.copy(right), collector);
 						} else {
 							function.join(left, right, collector);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
index 0218bfa..cde3b74 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -54,17 +55,19 @@ public class MapOperatorBase<IN, OUT, FT extends MapFunction<IN, OUT>> extends S
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
+	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
 		MapFunction<IN, OUT> function = this.userFunction.getUserCodeObject();
 		
 		FunctionUtils.setFunctionRuntimeContext(function, ctx);
 		FunctionUtils.openFunction(function, this.parameters);
 		
 		ArrayList<OUT> result = new ArrayList<OUT>(inputData.size());
+
+		boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
 		
-		if (mutableObjectSafeMode) {
-			TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer();
-			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer();
+		if (objectReuseDisabled) {
+			TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
+			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
 			
 			for (IN element : inputData) {
 				IN inCopy = inSerializer.copy(element);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
index 7c1fcef..25b3bb8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.CopyingIterator;
@@ -57,17 +58,19 @@ public class MapPartitionOperatorBase<IN, OUT, FT extends MapPartitionFunction<I
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
+	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
 		MapPartitionFunction<IN, OUT> function = this.userFunction.getUserCodeObject();
 		
 		FunctionUtils.setFunctionRuntimeContext(function, ctx);
 		FunctionUtils.openFunction(function, this.parameters);
 		
 		ArrayList<OUT> result = new ArrayList<OUT>(inputData.size() / 4);
+
+		boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
 		
-		if (mutableObjectSafeMode) {
-			TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer();
-			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer();
+		if (objectReuseDisabled) {
+			TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
+			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
 			
 			CopyingIterator<IN> source = new CopyingIterator<IN>(inputData.iterator(), inSerializer);
 			CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
index 3602a82..f91d7d8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.operators.base;
 
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.NoOpFunction;
@@ -88,7 +89,7 @@ public class PartitionOperatorBase<IN> extends SingleInputOperator<IN, IN, NoOpF
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	protected List<IN> executeOnCollections(List<IN> inputData, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) {
+	protected List<IN> executeOnCollections(List<IN> inputData, RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
 		return inputData;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
index f1bf0e9..d3d61e9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -149,12 +150,13 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
+	protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
 		// make sure we can handle empty inputs
 		if (inputData.isEmpty()) {
 			return Collections.emptyList();
 		}
-		
+
+		boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
 		ReduceFunction<T> function = this.userFunction.getUserCodeObject();
 
 		UnaryOperatorInformation<T, T> operatorInfo = getOperatorInfo();
@@ -169,11 +171,11 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
 		FunctionUtils.setFunctionRuntimeContext(function, ctx);
 		FunctionUtils.openFunction(function, this.parameters);
 
-		TypeSerializer<T> serializer = getOperatorInfo().getInputType().createSerializer();
+		TypeSerializer<T> serializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
 
 		if (inputColumns.length > 0) {
 			boolean[] inputOrderings = new boolean[inputColumns.length];
-			TypeComparator<T> inputComparator = ((CompositeType<T>) inputType).createComparator(inputColumns, inputOrderings, 0);
+			TypeComparator<T> inputComparator = ((CompositeType<T>) inputType).createComparator(inputColumns, inputOrderings, 0, executionConfig);
 
 			Map<TypeComparable<T>, T> aggregateMap = new HashMap<TypeComparable<T>, T>(inputData.size() / 10);
 
@@ -183,7 +185,7 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
 				T existing = aggregateMap.get(wrapper);
 				T result;
 
-				if (mutableObjectSafeMode) {
+				if (objectReuseDisabled) {
 					if (existing != null) {
 						result = function.reduce(existing, serializer.copy(next));
 					} else {
@@ -209,7 +211,7 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
 		else {
 			T aggregate = inputData.get(0);
 			
-			if (mutableObjectSafeMode) {
+			if (objectReuseDisabled) {
 				aggregate = serializer.copy(aggregate);
 				
 				for (int i = 1; i < inputData.size(); i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java
index 10dbbfe..2fc8a1b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.typeinfo;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 
 
@@ -26,5 +27,5 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
  */
 public interface AtomicType<T> {
 	
-	TypeComparator<T> createComparator(boolean sortOrderAscending);
+	TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
index 646a549..80f5f63 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.typeinfo;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer;
 import org.apache.flink.api.common.functions.InvalidTypesException;
@@ -94,12 +95,12 @@ public class BasicArrayTypeInfo<T, C> extends TypeInformation<T> {
 
 	@Override
 	@SuppressWarnings("unchecked")
-	public TypeSerializer<T> createSerializer() {
+	public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
 		// special case the string array
 		if (componentClass.equals(String.class)) {
 			return (TypeSerializer<T>) StringArraySerializer.INSTANCE;
 		} else {
-			return (TypeSerializer<T>) new GenericArraySerializer<C>(this.componentClass, this.componentInfo.createSerializer());
+			return (TypeSerializer<T>) new GenericArraySerializer<C>(this.componentClass, this.componentInfo.createSerializer(executionConfig));
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index 61d830a..7bf7298 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -23,6 +23,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -114,12 +115,12 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
 	}
 	
 	@Override
-	public TypeSerializer<T> createSerializer() {
+	public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
 		return this.serializer;
 	}
 	
 	@Override
-	public TypeComparator<T> createComparator(boolean sortOrderAscending) {
+	public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
 		if (comparatorClass != null) {
 			return instantiateComparator(comparatorClass, sortOrderAscending);
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
index dba0e6f..367670c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.api.common.typeinfo;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.types.Nothing;
 
@@ -54,7 +55,7 @@ public class NothingTypeInfo extends TypeInformation<Nothing> {
 	}
 
 	@Override
-	public TypeSerializer<Nothing> createSerializer() {
+	public TypeSerializer<Nothing> createSerializer(ExecutionConfig executionConfig) {
 		throw new RuntimeException("The Nothing type cannot have a serializer.");
 	}
 }


Mime
View raw message