flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [20/23] git commit: [FLINK-1110] By default, collection-based execution behaves mutable-object safe.
Date Fri, 03 Oct 2014 16:25:16 GMT
[FLINK-1110] By default, collection-based execution behaves mutable-object safe.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ac69cb3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ac69cb3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ac69cb3e

Branch: refs/heads/master
Commit: ac69cb3ef1f3ddc5b838d11dba06a05255a543e0
Parents: 3fd3110
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Sep 30 21:32:54 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Oct 3 16:22:34 2014 +0200

----------------------------------------------------------------------
 .../flink/compiler/util/NoOpBinaryUdfOp.java    |   2 +-
 .../flink/compiler/util/NoOpUnaryUdfOp.java     |   2 +-
 .../common/functions/util/CopyingIterator.java  |  63 ++++++++++
 .../functions/util/CopyingListCollector.java    |  43 +++++++
 .../common/operators/CollectionExecutor.java    |  13 ++-
 .../api/common/operators/DualInputOperator.java |   2 +-
 .../common/operators/SingleInputOperator.java   |   2 +-
 .../flink/api/common/operators/Union.java       |   2 +-
 .../operators/base/BulkIterationBase.java       |   2 +-
 .../operators/base/CoGroupOperatorBase.java     |  25 ++--
 .../base/CollectorMapOperatorBase.java          |   2 +-
 .../operators/base/CrossOperatorBase.java       |  27 ++++-
 .../operators/base/DeltaIterationBase.java      |   3 +-
 .../common/operators/base/FileDataSinkBase.java |   5 +-
 .../operators/base/FilterOperatorBase.java      |   2 +-
 .../operators/base/FlatMapOperatorBase.java     |  26 ++++-
 .../operators/base/GroupReduceOperatorBase.java |  42 ++++---
 .../common/operators/base/JoinOperatorBase.java |  41 ++++---
 .../common/operators/base/MapOperatorBase.java  |  21 +++-
 .../base/MapPartitionOperatorBase.java          |  21 +++-
 .../operators/base/PartitionOperatorBase.java   |   7 ++
 .../operators/base/ReduceOperatorBase.java      |  47 +++++---
 .../operators/util/ListKeyGroupedIterator.java  |  22 ++--
 .../base/FlatMapOperatorCollectionTest.java     |  33 ++++--
 .../operators/base/JoinOperatorBaseTest.java    |  19 +--
 .../common/operators/base/MapOperatorTest.java  |  12 +-
 .../base/PartitionMapOperatorTest.java          |   7 +-
 .../CollectionExecutionAccumulatorsTest.java    |   2 +-
 .../CollectionExecutionIterationTest.java       |   6 +-
 ...ctionExecutionWithBroadcastVariableTest.java |   2 +-
 .../base/CoGroupOperatorCollectionTest.java     |  37 ++++--
 .../operators/base/GroupReduceOperatorTest.java |  28 +++--
 .../operators/base/JoinOperatorBaseTest.java    | 115 ++++++++++++++++++
 .../operators/base/ReduceOperatorTest.java      |  28 +++--
 .../operators/base/JoinOperatorBaseTest.java    | 116 -------------------
 35 files changed, 550 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java
index 1abaeac..4b48ec7 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java
@@ -43,7 +43,7 @@ public class NoOpBinaryUdfOp<OUT> extends DualInputOperator<OUT, OUT, OUT, NoOpF
 	}
 
 	@Override
-	protected List<OUT> executeOnCollections(List<OUT> inputData1, List<OUT> inputData2, RuntimeContext runtimeContext) {
+	protected List<OUT> executeOnCollections(List<OUT> inputData1, List<OUT> inputData2, RuntimeContext runtimeContext, boolean mutables) {
 		throw new UnsupportedOperationException();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java
index 8f88168..474d3a4 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java
@@ -54,7 +54,7 @@ public class NoOpUnaryUdfOp<OUT> extends SingleInputOperator<OUT, OUT, NoOpFunct
 	}
 
 	@Override
-	protected List<OUT> executeOnCollections(List<OUT> inputData, RuntimeContext runtimeContext) {
+	protected List<OUT> executeOnCollections(List<OUT> inputData, RuntimeContext runtimeContext, boolean mutables) {
 		return inputData;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java
new file mode 100644
index 0000000..b7a8dc9
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+import java.util.Iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.TraversableOnceException;
+
+public class CopyingIterator<E> implements Iterator<E>, Iterable<E> {
+
+	private final Iterator<E> source;
+	private final TypeSerializer<E> serializer;
+	
+	private boolean available = true;
+	
+	public CopyingIterator(Iterator<E> source, TypeSerializer<E> serializer) {
+		this.source = source;
+		this.serializer = serializer;
+	}
+
+	@Override
+	public Iterator<E> iterator() {
+		if (available) {
+			available = false;
+			return this;
+		} else {
+			throw new TraversableOnceException();
+		}
+	}
+
+	@Override
+	public boolean hasNext() {
+		return source.hasNext();
+	}
+
+	@Override
+	public E next() {
+		E next = source.next();
+		return serializer.copy(next);
+	}
+
+	@Override
+	public void remove() {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java
new file mode 100644
index 0000000..8620981
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+import java.util.List;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Collector;
+
+public class CopyingListCollector<T> implements Collector<T> {
+
+	private final List<T> list;
+	private final TypeSerializer<T> serializer;
+
+	public CopyingListCollector(List<T> list, TypeSerializer<T> serializer) {
+		this.list = list;
+		this.serializer = serializer;
+	}
+
+	@Override
+	public void collect(T record) {
+		list.add(serializer.copy(record));
+	}
+
+	@Override
+	public void close() {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index a748a5e..5d30df4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -52,13 +52,22 @@ import org.apache.flink.util.Visitor;
  */
 public class CollectionExecutor {
 	
+	private static final boolean DEFAULT_MUTABLE_OBJECT_SAFE_MODE = true;
+	
 	private final Map<Operator<?>, List<?>> intermediateResults;
 	
 	private final Map<String, Accumulator<?, ?>> accumulators;
 	
+	private final boolean mutableObjectSafeMode;
+	
 	// --------------------------------------------------------------------------------------------
 	
 	public CollectionExecutor() {
+		this(DEFAULT_MUTABLE_OBJECT_SAFE_MODE);
+	}
+		
+	public CollectionExecutor(boolean mutableObjectSafeMode) {
+		this.mutableObjectSafeMode = mutableObjectSafeMode;
 		this.intermediateResults = new HashMap<Operator<?>, List<?>>();
 		this.accumulators = new HashMap<String, Accumulator<?,?>>();
 	}
@@ -172,7 +181,7 @@ public class CollectionExecutor {
 			ctx = null;
 		}
 		
-		List<OUT> result = typedOp.executeOnCollections(inputData, ctx);
+		List<OUT> result = typedOp.executeOnCollections(inputData, ctx, mutableObjectSafeMode);
 		
 		if (ctx != null) {
 			AccumulatorHelper.mergeInto(this.accumulators, ctx.getAllAccumulators());
@@ -214,7 +223,7 @@ public class CollectionExecutor {
 			ctx = null;
 		}
 		
-		List<OUT> result = typedOp.executeOnCollections(inputData1, inputData2, ctx);
+		List<OUT> result = typedOp.executeOnCollections(inputData1, inputData2, ctx, mutableObjectSafeMode);
 		
 		if (ctx != null) {
 			AccumulatorHelper.mergeInto(this.accumulators, ctx.getAllAccumulators());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
index 2d28180..6325788 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
@@ -286,5 +286,5 @@ public abstract class DualInputOperator<IN1, IN2, OUT, FT extends Function> exte
 	
 	// --------------------------------------------------------------------------------------------
 	
-	protected abstract List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext runtimeContext) throws Exception;
+	protected abstract List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
index fa923e7..f1bf2ad 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
@@ -209,5 +209,5 @@ public abstract class SingleInputOperator<IN, OUT, FT extends Function> extends
 	
 	// --------------------------------------------------------------------------------------------
 	
-	protected abstract List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext runtimeContext) throws Exception;
+	protected abstract List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
index f4b3c51..fb8626d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
@@ -48,7 +48,7 @@ public class Union<T> extends DualInputOperator<T, T, T, AbstractRichFunction> {
 	}
 
 	@Override
-	protected List<T> executeOnCollections(List<T> inputData1, List<T> inputData2, RuntimeContext runtimeContext) {
+	protected List<T> executeOnCollections(List<T> inputData1, List<T> inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) {
 		ArrayList<T> result = new ArrayList<T>(inputData1.size() + inputData2.size());
 		result.addAll(inputData1);
 		result.addAll(inputData2);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
index 31b5191..4fbf65e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
@@ -303,7 +303,7 @@ public class BulkIterationBase<T> extends SingleInputOperator<T, T, AbstractRich
 	}
 
 	@Override
-	protected List<T> executeOnCollections(List<T> inputData, RuntimeContext runtimeContext) {
+	protected List<T> executeOnCollections(List<T> inputData, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) {
 		throw new UnsupportedOperationException();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
index 4b6d639..6aa3da0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
@@ -35,6 +36,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.GenericPairComparator;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.util.Collector;
 
 import java.io.IOException;
@@ -177,7 +179,7 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 	// ------------------------------------------------------------------------
 
 	@Override
-	protected List<OUT> executeOnCollections(List<IN1> input1, List<IN2> input2, RuntimeContext ctx) throws Exception {
+	protected List<OUT> executeOnCollections(List<IN1> input1, List<IN2> input2, RuntimeContext ctx, boolean mutableObjectSafe) throws Exception {
 		// --------------------------------------------------------------------
 		// Setup
 		// --------------------------------------------------------------------
@@ -193,11 +195,15 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 		Arrays.fill(inputSortDirections1, true);
 		Arrays.fill(inputSortDirections2, true);
 
+		final TypeSerializer<IN1> inputSerializer1 = inputType1.createSerializer();
+		final TypeSerializer<IN2> inputSerializer2 = inputType2.createSerializer();
+		
 		final TypeComparator<IN1> inputComparator1 = getTypeComparator(inputType1, inputKeys1, inputSortDirections1);
 		final TypeComparator<IN2> inputComparator2 = getTypeComparator(inputType2, inputKeys2, inputSortDirections2);
 
 		CoGroupSortListIterator<IN1, IN2> coGroupIterator =
-				new CoGroupSortListIterator<IN1, IN2>(input1, inputComparator1, input2, inputComparator2);
+				new CoGroupSortListIterator<IN1, IN2>(input1, inputComparator1, inputSerializer1,
+						input2, inputComparator2, inputSerializer2, mutableObjectSafe);
 
 		// --------------------------------------------------------------------
 		// Run UDF
@@ -208,7 +214,9 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 		FunctionUtils.openFunction(function, parameters);
 
 		List<OUT> result = new ArrayList<OUT>();
-		Collector<OUT> resultCollector = new ListCollector<OUT>(result);
+		Collector<OUT> resultCollector = mutableObjectSafe ?
+				new CopyingListCollector<OUT>(result, getOperatorInfo().getOutputType().createSerializer()) :
+				new ListCollector<OUT>(result);
 
 		while (coGroupIterator.next()) {
 			function.coGroup(coGroupIterator.getValues1(), coGroupIterator.getValues2(), resultCollector);
@@ -247,13 +255,14 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 		private Iterable<IN2> secondReturn;
 
 		private CoGroupSortListIterator(
-				List<IN1> input1, final TypeComparator<IN1> inputComparator1,
-				List<IN2> input2, final TypeComparator<IN2> inputComparator2) {
-
+				List<IN1> input1, final TypeComparator<IN1> inputComparator1, TypeSerializer<IN1> serializer1,
+				List<IN2> input2, final TypeComparator<IN2> inputComparator2, TypeSerializer<IN2> serializer2,
+				boolean copyElements)
+		{
 			this.pairComparator = new GenericPairComparator<IN1, IN2>(inputComparator1, inputComparator2);
 
-			this.iterator1 = new ListKeyGroupedIterator<IN1>(input1, inputComparator1);
-			this.iterator2 = new ListKeyGroupedIterator<IN2>(input2, inputComparator2);
+			this.iterator1 = new ListKeyGroupedIterator<IN1>(input1, serializer1, inputComparator1, copyElements);
+			this.iterator2 = new ListKeyGroupedIterator<IN2>(input2, serializer2, inputComparator2, copyElements);
 
 			// ----------------------------------------------------------------
 			// Sort

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
index c3e00bc..8ad91c6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
@@ -52,7 +52,7 @@ public class CollectorMapOperatorBase<IN, OUT, FT extends GenericCollectorMap<IN
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx) {
+	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) {
 		throw new UnsupportedOperationException();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
index c70460a..8f49518 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.operators.DualInputOperator;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 /**
  * @see org.apache.flink.api.common.functions.CrossFunction
@@ -50,21 +51,37 @@ public class CrossOperatorBase<IN1, IN2, OUT, FT extends CrossFunction<IN1, IN2,
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext ctx) throws Exception {
+	protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
 		CrossFunction<IN1, IN2, OUT> function = this.userFunction.getUserCodeObject();
 		
 		FunctionUtils.setFunctionRuntimeContext(function, ctx);
 		FunctionUtils.openFunction(function, this.parameters);
 		
 		ArrayList<OUT> result = new ArrayList<OUT>(inputData1.size() * inputData2.size());
-		for (IN1 element1 : inputData1) {
-			for (IN2 element2 : inputData2) {
-				result.add(function.cross(element1, element2));
+		
+		if (mutableObjectSafeMode) {
+			TypeSerializer<IN1> inSerializer1 = getOperatorInfo().getFirstInputType().createSerializer();
+			TypeSerializer<IN2> inSerializer2 = getOperatorInfo().getSecondInputType().createSerializer();
+			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer();
+			
+			for (IN1 element1 : inputData1) {
+				for (IN2 element2 : inputData2) {
+					IN1 copy1 = inSerializer1.copy(element1);
+					IN2 copy2 = inSerializer2.copy(element2);
+					OUT o = function.cross(copy1, copy2);
+					result.add(outSerializer.copy(o));
+				}
+			}
+		}
+		else {
+			for (IN1 element1 : inputData1) {
+				for (IN2 element2 : inputData2) {
+					result.add(function.cross(element1, element2));
+				}
 			}
 		}
 		
 		FunctionUtils.closeFunction(function);
-		
 		return result;
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
index fddf6e7..f945b1d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators.base;
 
 import java.util.Collections;
@@ -333,7 +332,7 @@ public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, Ab
 	}
 
 	@Override
-	protected List<ST> executeOnCollections(List<ST> inputData1, List<WT> inputData2, RuntimeContext runtimeContext) throws Exception {
+	protected List<ST> executeOnCollections(List<ST> inputData1, List<WT> inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) {
 		throw new UnsupportedOperationException();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSinkBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSinkBase.java
index a36d44b..8f5c4e3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSinkBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSinkBase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.io.FileOutputFormat;
@@ -72,8 +71,7 @@ public class FileDataSinkBase<IN> extends GenericDataSinkBase<IN> {
 	 * 
 	 * @return The path to which the output shall be written.
 	 */
-	public String getFilePath()
-	{
+	public String getFilePath() {
 		return this.filePath;
 	}
 	
@@ -82,5 +80,4 @@ public class FileDataSinkBase<IN> extends GenericDataSinkBase<IN> {
 	public String toString() {
 		return this.filePath;
 	}
-	
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
index 39f8984..7f7add2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
@@ -50,7 +50,7 @@ public class FilterOperatorBase<T, FT extends FlatMapFunction<T, T>> extends Sin
 	}
 
 	@Override
-	protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx) throws Exception {
+	protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
 		FlatMapFunction<T, T> function = this.userFunction.getUserCodeObject();
 		
 		FunctionUtils.openFunction(function, this.parameters);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
index 81f3bcf..8312a99 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.operators.SingleInputOperator;
@@ -27,6 +28,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -51,17 +53,29 @@ public class FlatMapOperatorBase<IN, OUT, FT extends FlatMapFunction<IN, OUT>> e
 	// ------------------------------------------------------------------------
 
 	@Override
-	protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx) throws Exception {
+	protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
 		FlatMapFunction<IN, OUT> function = userFunction.getUserCodeObject();
-
+		
 		FunctionUtils.setFunctionRuntimeContext(function, ctx);
 		FunctionUtils.openFunction(function, parameters);
 
 		ArrayList<OUT> result = new ArrayList<OUT>(input.size());
-		ListCollector<OUT> resultCollector = new ListCollector<OUT>(result);
-
-		for (IN element : input) {
-			function.flatMap(element, resultCollector);
+		
+		if (mutableObjectSafeMode) {
+			TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer();
+			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer();
+			
+			CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);
+			
+			for (IN element : input) {
+				IN inCopy = inSerializer.copy(element);
+				function.flatMap(inCopy, resultCollector);
+			}
+		} else {
+			ListCollector<OUT> resultCollector = new ListCollector<OUT>(result);
+			for (IN element : input) {
+				function.flatMap(element, resultCollector);
+			}
 		}
 
 		FunctionUtils.closeFunction(function);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index 6abd7b5..5d3b92d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.api.common.operators.base;
 
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.operators.Ordering;
@@ -35,6 +35,7 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.api.common.typeinfo.CompositeType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -131,20 +132,21 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx)
-			throws Exception {
+	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
 		GroupReduceFunction<IN, OUT> function = this.userFunction.getUserCodeObject();
 
 		UnaryOperatorInformation<IN, OUT> operatorInfo = getOperatorInfo();
 		TypeInformation<IN> inputType = operatorInfo.getInputType();
 
 		if (!(inputType instanceof CompositeType)) {
-			throw new InvalidProgramException("Input type of groupReduce operation must be" +
-					" composite type.");
+			throw new InvalidProgramException("Input type of groupReduce operation must be a composite type.");
 		}
 
 		int[] inputColumns = getKeyColumns(0);
 		boolean[] inputOrderings = new boolean[inputColumns.length];
+		
+		final TypeSerializer<IN> inputSerializer = inputType.createSerializer();
+				
 		@SuppressWarnings("unchecked")
 		final TypeComparator<IN> inputComparator =
 				((CompositeType<IN>) inputType).createComparator(inputColumns, inputOrderings);
@@ -152,26 +154,34 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 		FunctionUtils.setFunctionRuntimeContext(function, ctx);
 		FunctionUtils.openFunction(function, this.parameters);
 
-
-		ArrayList<OUT> result = new ArrayList<OUT>(inputData.size());
-		ListCollector<OUT> collector = new ListCollector<OUT>(result);
-
 		Collections.sort(inputData, new Comparator<IN>() {
 			@Override
 			public int compare(IN o1, IN o2) {
 				return inputComparator.compare(o2, o1);
 			}
 		});
-		ListKeyGroupedIterator<IN> keyedIterator =
-				new ListKeyGroupedIterator<IN>(inputData, inputComparator);
-
-		while (keyedIterator.nextKey()) {
-			function.reduce(keyedIterator.getValues(), collector);
+		
+		ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(
+				inputData, inputSerializer, inputComparator, mutableObjectSafeMode);
+		
+		ArrayList<OUT> result = new ArrayList<OUT>();
+		
+		if (mutableObjectSafeMode) {
+			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer();
+			CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
+			
+			while (keyedIterator.nextKey()) {
+				function.reduce(keyedIterator.getValues(), collector);
+			}
+		}
+		else {
+			ListCollector<OUT> collector = new ListCollector<OUT>(result);
+			while (keyedIterator.nextKey()) {
+				function.reduce(keyedIterator.getValues(), collector);
+			}
 		}
 
 		FunctionUtils.closeFunction(function);
-
 		return result;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
index 34ede65..3d5cf72 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
@@ -16,11 +16,11 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
@@ -34,6 +34,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.GenericPairComparator;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Collector;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -60,7 +62,7 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
 
 	@SuppressWarnings("unchecked")
 	@Override
-	protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext runtimeContext) throws Exception {
+	protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafe) throws Exception {
 		FlatJoinFunction<IN1, IN2, OUT> function = userFunction.getUserCodeObject();
 
 		FunctionUtils.setFunctionRuntimeContext(function, runtimeContext);
@@ -68,13 +70,18 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
 
 		TypeInformation<IN1> leftInformation = getOperatorInfo().getFirstInputType();
 		TypeInformation<IN2> rightInformation = getOperatorInfo().getSecondInputType();
-
+		TypeInformation<OUT> outInformation = getOperatorInfo().getOutputType();
+		
+		TypeSerializer<IN1> leftSerializer = mutableObjectSafe ? leftInformation.createSerializer() : null;
+		TypeSerializer<IN2> rightSerializer = mutableObjectSafe ? rightInformation.createSerializer() : null;
+		
 		TypeComparator<IN1> leftComparator;
 		TypeComparator<IN2> rightComparator;
 
-		if(leftInformation instanceof AtomicType){
+		if (leftInformation instanceof AtomicType){
 			leftComparator = ((AtomicType<IN1>) leftInformation).createComparator(true);
-		}else if(leftInformation instanceof CompositeType){
+		}
+		else if(leftInformation instanceof CompositeType){
 			int[] keyPositions = getKeyColumns(0);
 			boolean[] orders = new boolean[keyPositions.length];
 			Arrays.fill(orders, true);
@@ -102,12 +109,13 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
 				rightComparator);
 
 		List<OUT> result = new ArrayList<OUT>();
-		ListCollector<OUT> collector = new ListCollector<OUT>(result);
+		Collector<OUT> collector = mutableObjectSafe ? new CopyingListCollector<OUT>(result, outInformation.createSerializer())
+														: new ListCollector<OUT>(result);
 
 		Map<Integer, List<IN2>> probeTable = new HashMap<Integer, List<IN2>>();
 
-		//Build probe table
-		for(IN2 element: inputData2){
+		//Build hash table
+		for (IN2 element: inputData2){
 			List<IN2> list = probeTable.get(rightComparator.hash(element));
 			if(list == null){
 				list = new ArrayList<IN2>();
@@ -118,15 +126,18 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
 		}
 
 		//Probing
-		for(IN1 left: inputData1){
+		for (IN1 left: inputData1) {
 			List<IN2> matchingHashes = probeTable.get(leftComparator.hash(left));
 
-			pairComparator.setReference(left);
-
-			if(matchingHashes != null){
-				for(IN2 right: matchingHashes){
-					if(pairComparator.equalToReference(right)){
-						function.join(left, right, collector);
+			if (matchingHashes != null) {
+				pairComparator.setReference(left);
+				for (IN2 right : matchingHashes){
+					if (pairComparator.equalToReference(right)) {
+						if (mutableObjectSafe) {
+							function.join(leftSerializer.copy(left), rightSerializer.copy(right), collector);
+						} else {
+							function.join(left, right, collector);
+						}
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
index d545676..0218bfa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators.base;
 
 import java.util.ArrayList;
@@ -30,7 +29,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 /**
  *
@@ -55,15 +54,27 @@ public class MapOperatorBase<IN, OUT, FT extends MapFunction<IN, OUT>> extends S
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx) throws Exception {
+	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
 		MapFunction<IN, OUT> function = this.userFunction.getUserCodeObject();
 		
 		FunctionUtils.setFunctionRuntimeContext(function, ctx);
 		FunctionUtils.openFunction(function, this.parameters);
 		
 		ArrayList<OUT> result = new ArrayList<OUT>(inputData.size());
-		for (IN element : inputData) {
-			result.add(function.map(element));
+		
+		if (mutableObjectSafeMode) {
+			TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer();
+			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer();
+			
+			for (IN element : inputData) {
+				IN inCopy = inSerializer.copy(element);
+				OUT out = function.map(inCopy);
+				result.add(outSerializer.copy(out));
+			}
+		} else {
+			for (IN element : inputData) {
+				result.add(function.map(element));
+			}
 		}
 		
 		FunctionUtils.closeFunction(function);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
index 21fa9be..7c1fcef 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
@@ -23,6 +23,8 @@ import java.util.List;
 
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingIterator;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.operators.SingleInputOperator;
@@ -30,6 +32,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 /**
  *
@@ -54,18 +57,28 @@ public class MapPartitionOperatorBase<IN, OUT, FT extends MapPartitionFunction<I
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx) throws Exception {
+	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
 		MapPartitionFunction<IN, OUT> function = this.userFunction.getUserCodeObject();
 		
 		FunctionUtils.setFunctionRuntimeContext(function, ctx);
 		FunctionUtils.openFunction(function, this.parameters);
 		
 		ArrayList<OUT> result = new ArrayList<OUT>(inputData.size() / 4);
-		ListCollector<OUT> resultCollector = new ListCollector<OUT>(result);
 		
-		function.mapPartition(inputData, resultCollector);
+		if (mutableObjectSafeMode) {
+			TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer();
+			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer();
+			
+			CopyingIterator<IN> source = new CopyingIterator<IN>(inputData.iterator(), inSerializer);
+			CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);
+			
+			function.mapPartition(source, resultCollector);
+		} else {
+			ListCollector<OUT> resultCollector = new ListCollector<OUT>(result);
+			function.mapPartition(inputData, resultCollector);
+		}
+
 		result.trimToSize();
-		
 		FunctionUtils.closeFunction(function);
 		return result;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
index 1f17db0..af8a111 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.api.common.operators.base;
 
+import java.util.List;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.NoOpFunction;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
@@ -51,4 +54,8 @@ public class PartitionOperatorBase<IN> extends SingleInputOperator<IN, IN, NoOpF
 		RANGE;
 	}
 
+	@Override
+	protected List<IN> executeOnCollections(List<IN> inputData, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) {
+		return inputData;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
index f7975a1..1f192f8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.InvalidProgramException;
@@ -32,13 +31,14 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.api.common.typeinfo.CompositeType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-
 /**
  * Base data flow operator for Reduce user-defined functions. Accepts reduce functions
  * and key positions. The key positions are expected in the flattened common data model.
@@ -123,20 +123,23 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
 		super(new UserCodeClassWrapper<FT>(udf), operatorInfo, name);
 	}
 
-// --------------------------------------------------------------------------------------------
+	// --------------------------------------------------------------------------------------------
 
 	@SuppressWarnings("unchecked")
 	@Override
-	protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx)
-			throws Exception {
+	protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
+		// make sure we can handle empty inputs
+		if (inputData.isEmpty()) {
+			return Collections.emptyList();
+		}
+		
 		ReduceFunction<T> function = this.userFunction.getUserCodeObject();
 
 		UnaryOperatorInformation<T, T> operatorInfo = getOperatorInfo();
 		TypeInformation<T> inputType = operatorInfo.getInputType();
 
 		if (!(inputType instanceof CompositeType)) {
-			throw new InvalidProgramException("Input type of groupReduce operation must be" +
-					" composite type.");
+			throw new InvalidProgramException("Input type of groupReduce operation must be" + " composite type.");
 		}
 
 		FunctionUtils.setFunctionRuntimeContext(function, ctx);
@@ -161,22 +164,30 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
 				aggregateMap.put(wrapper, result);
 			}
 
-			List<T> result = new ArrayList<T>(aggregateMap.values().size());
-			result.addAll(aggregateMap.values());
-
 			FunctionUtils.closeFunction(function);
-			return result;
-		} else {
+			return new ArrayList<T>(aggregateMap.values());
+		}
+		else {
 			T aggregate = inputData.get(0);
-			for (int i = 1; i < inputData.size(); i++) {
-				aggregate = function.reduce(aggregate, inputData.get(i));
+			
+			if (mutableObjectSafeMode) {
+				TypeSerializer<T> serializer = getOperatorInfo().getInputType().createSerializer();
+				aggregate = serializer.copy(aggregate);
+				
+				for (int i = 1; i < inputData.size(); i++) {
+					T next = function.reduce(aggregate, serializer.copy(inputData.get(i)));
+					aggregate = serializer.copy(next);
+				}
+			}
+			else {
+				for (int i = 1; i < inputData.size(); i++) {
+					aggregate = function.reduce(aggregate, inputData.get(i));
+				}
 			}
-			List<T> result = new ArrayList<T>(1);
-			result.add(aggregate);
 
 			FunctionUtils.setFunctionRuntimeContext(function, ctx);
-			return result;
+			
+			return Collections.singletonList(aggregate);
 		}
-
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java
index 8a4c919..e530f8a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.operators.util;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -32,14 +33,16 @@ public final class ListKeyGroupedIterator<E> {
 
 	private final List<E> input;
 
+	private final TypeSerializer<E> serializer;  // != null if the elements should be copied
+	
 	private final TypeComparator<E> comparator;
 
 	private ValuesIterator valuesIterator;
 
 	private int currentPosition = 0;
 
-	private E lookahead = null;
-
+	private E lookahead;
+	
 	private boolean done;
 
 	/**
@@ -48,13 +51,13 @@ public final class ListKeyGroupedIterator<E> {
 	 * @param input The list with the input elements.
 	 * @param comparator The comparator for the data type iterated over.
 	 */
-	public ListKeyGroupedIterator(List<E> input, TypeComparator<E> comparator)
-	{
+	public ListKeyGroupedIterator(List<E> input, TypeSerializer<E> serializer, TypeComparator<E> comparator, boolean copy) {
 		if (input == null || comparator == null) {
 			throw new NullPointerException();
 		}
 
 		this.input = input;
+		this.serializer = copy ? serializer : null;
 		this.comparator = comparator;
 
 		this.done = input.isEmpty() ? true : false;
@@ -109,7 +112,7 @@ public final class ListKeyGroupedIterator<E> {
 			E first = input.get(currentPosition++);
 			if (first != null) {
 				this.comparator.setReference(first);
-				this.valuesIterator = new ValuesIterator(first);
+				this.valuesIterator = new ValuesIterator(first, serializer);
 				return true;
 			}
 			else {
@@ -155,9 +158,12 @@ public final class ListKeyGroupedIterator<E> {
 	public final class ValuesIterator implements Iterator<E>, Iterable<E> {
 
 		private E next;
+		
+		private final TypeSerializer<E> serializer;
 
-		private ValuesIterator(E first) {
+		private ValuesIterator(E first, TypeSerializer<E> serializer) {
 			this.next = first;
+			this.serializer = serializer;
 		}
 
 		@Override
@@ -170,7 +176,7 @@ public final class ListKeyGroupedIterator<E> {
 			if (this.next != null) {
 				E current = this.next;
 				this.next = ListKeyGroupedIterator.this.advanceToNext();
-				return current;
+				return serializer != null ? serializer.copy(current) : current;
 			} else {
 				throw new NoSuchElementException();
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
index 8227c22..74dc889 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -30,8 +30,8 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 @SuppressWarnings("serial")
@@ -41,25 +41,34 @@ public class FlatMapOperatorCollectionTest implements Serializable {
 	public void testExecuteOnCollection() {
 		try {
 			IdRichFlatMap<String> udf = new IdRichFlatMap<String>();
-			testExecuteOnCollection(udf, Arrays.asList("f", "l", "i", "n", "k"));
+			testExecuteOnCollection(udf, Arrays.asList("f", "l", "i", "n", "k"), true);
 			Assert.assertTrue(udf.isClosed);
 
-			testExecuteOnCollection(new IdRichFlatMap<String>(), new ArrayList<String>());
-		} catch (Throwable t) {
-			Assert.fail(t.getMessage());
+			udf = new IdRichFlatMap<String>();
+			testExecuteOnCollection(udf, Arrays.asList("f", "l", "i", "n", "k"), false);
+			Assert.assertTrue(udf.isClosed);
+			
+			udf = new IdRichFlatMap<String>();
+			testExecuteOnCollection(udf, Collections.<String>emptyList(), true);
+			Assert.assertTrue(udf.isClosed);
+			
+			udf = new IdRichFlatMap<String>();
+			testExecuteOnCollection(udf, Collections.<String>emptyList(), false);
+			Assert.assertTrue(udf.isClosed);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
 		}
 	}
 
-	private void testExecuteOnCollection(FlatMapFunction<String, String> udf, List<String> input) throws Exception {
+	private void testExecuteOnCollection(FlatMapFunction<String, String> udf, List<String> input, boolean mutableSafe) throws Exception {
 		// run on collections
 		final List<String> result = getTestFlatMapOperator(udf)
-				.executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0));
+				.executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0), mutableSafe);
 
 		Assert.assertEquals(input.size(), result.size());
-
-		for (int i = 0; i < input.size(); i++) {
-			Assert.assertEquals(input.get(i), result.get(i));
-		}
+		Assert.assertEquals(input, result);
 	}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
index b048cc5..8834989 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
@@ -60,10 +60,13 @@ public class JoinOperatorBaseTest implements Serializable {
 		List<Integer> expected = new ArrayList<Integer>(Arrays.asList(3, 3, 6 ,6));
 
 		try {
-			List<Integer> result = base.executeOnCollections(inputData1, inputData2, null);
+			List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, null, true);
+			List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, null, false);
 
-			assertEquals(expected, result);
-		} catch (Exception e) {
+			assertEquals(expected, resultSafe);
+			assertEquals(expected, resultRegular);
+		}
+		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
@@ -107,11 +110,13 @@ public class JoinOperatorBaseTest implements Serializable {
 
 
 		try {
-			List<Integer> result = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName,
-					1, 0));
+			List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0), true);
+			List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0), false);
 
-			assertEquals(expected, result);
-		} catch (Exception e) {
+			assertEquals(expected, resultSafe);
+			assertEquals(expected, resultRegular);
+		}
+		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
index bb263ad..82778c5 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
@@ -52,9 +52,11 @@ public class MapOperatorTest implements java.io.Serializable {
 					parser, new UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), "TestMapper");
 			
 			List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5", "6"));
-			List<Integer> result = op.executeOnCollections(input, null);
+			List<Integer> resultMutableSafe = op.executeOnCollections(input, null, true);
+			List<Integer> resultRegular = op.executeOnCollections(input, null, false);
 			
-			assertEquals(asList(1, 2, 3, 4, 5, 6), result);
+			assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
+			assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -95,9 +97,11 @@ public class MapOperatorTest implements java.io.Serializable {
 					parser, new UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), taskName);
 			
 			List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5", "6"));
-			List<Integer> result = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0));
+			List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true);
+			List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false);
 			
-			assertEquals(asList(1, 2, 3, 4, 5, 6), result);
+			assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
+			assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);
 			
 			assertTrue(opened.get());
 			assertTrue(closed.get());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
index 0657ac1..1c17fde 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
@@ -74,9 +74,12 @@ public class PartitionMapOperatorTest implements java.io.Serializable {
 					parser, new UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), taskName);
 			
 			List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5", "6"));
-			List<Integer> result = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0));
 			
-			assertEquals(asList(1, 2, 3, 4, 5, 6), result);
+			List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true);
+			List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false);
+			
+			assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
+			assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);
 			
 			assertTrue(opened.get());
 			assertTrue(closed.get());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java
index 7c89f13..f836692 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java
index 0f6acbe..34f9137 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,7 +20,6 @@ package org.apache.flink.api.common.operators;
 
 //CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
 import static org.junit.Assert.*;
-//CHECKSTYLE.ON: AvoidStarImport
 
 import java.util.ArrayList;
 import java.util.List;
@@ -102,11 +101,14 @@ public class CollectionExecutionIterationTest implements java.io.Serializable {
 		try {
 			ExecutionEnvironment env = new CollectionEnvironment();
 
+			@SuppressWarnings("unchecked")
 			DataSet<Tuple2<Integer, Integer>> solInput = env.fromElements(
 					new Tuple2<Integer, Integer>(1, 0),
 					new Tuple2<Integer, Integer>(2, 0),
 					new Tuple2<Integer, Integer>(3, 0),
 					new Tuple2<Integer, Integer>(4, 0));
+			
+			@SuppressWarnings("unchecked")
 			DataSet<Tuple1<Integer>> workInput = env.fromElements(
 					new Tuple1<Integer>(1),
 					new Tuple1<Integer>(2),

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java
index adf2d96..c2db7c9 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
index fe2551d..053b8e2 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -38,6 +38,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+@SuppressWarnings("serial")
 public class CoGroupOperatorCollectionTest implements Serializable {
 
 	@Test
@@ -68,12 +69,17 @@ public class CoGroupOperatorCollectionTest implements Serializable {
 			final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0);
 
 			{
-				SumCoGroup udf = new SumCoGroup();
-				List<Tuple2<String, Integer>> result = getCoGroupOperator(udf)
-						.executeOnCollections(input1, input2, ctx);
-
-				Assert.assertTrue(udf.isClosed);
-
+				SumCoGroup udf1 = new SumCoGroup();
+				SumCoGroup udf2 = new SumCoGroup();
+				
+				List<Tuple2<String, Integer>> resultSafe = getCoGroupOperator(udf1)
+						.executeOnCollections(input1, input2, ctx, true);
+				List<Tuple2<String, Integer>> resultRegular = getCoGroupOperator(udf2)
+						.executeOnCollections(input1, input2, ctx, false);
+
+				Assert.assertTrue(udf1.isClosed);
+				Assert.assertTrue(udf2.isClosed);
+				
 				Set<Tuple2<String, Integer>> expected = new HashSet<Tuple2<String, Integer>>(
 						Arrays.asList(new Tuple2Builder<String, Integer>()
 										.add("foo", 8)
@@ -84,14 +90,21 @@ public class CoGroupOperatorCollectionTest implements Serializable {
 						)
 				);
 
-				Assert.assertEquals(expected, new HashSet(result));
+				Assert.assertEquals(expected, new HashSet<Tuple2<String, Integer>>(resultSafe));
+				Assert.assertEquals(expected, new HashSet<Tuple2<String, Integer>>(resultRegular));
 			}
 
 			{
-				List<Tuple2<String, Integer>> result = getCoGroupOperator(new SumCoGroup())
-						.executeOnCollections(Collections.EMPTY_LIST, Collections.EMPTY_LIST, ctx);
-
-				Assert.assertEquals(0, result.size());
+				List<Tuple2<String, Integer>> resultSafe = getCoGroupOperator(new SumCoGroup())
+						.executeOnCollections(Collections.<Tuple2<String, Integer>>emptyList(),
+								Collections.<Tuple2<String, Integer>>emptyList(), ctx, true);
+				
+				List<Tuple2<String, Integer>> resultRegular = getCoGroupOperator(new SumCoGroup())
+						.executeOnCollections(Collections.<Tuple2<String, Integer>>emptyList(),
+								Collections.<Tuple2<String, Integer>>emptyList(), ctx, false);
+
+				Assert.assertEquals(0, resultSafe.size());
+				Assert.assertEquals(0, resultRegular.size());
 			}
 		} catch (Throwable t) {
 			t.printStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
index f77b292..5d1ca17 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -79,15 +79,21 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
 					Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
 					Integer>("bar", 4)));
 
-			List<Tuple2<String, Integer>> result = op.executeOnCollections(input, null);
-			Set<Tuple2<String, Integer>> resultSet = new HashSet<Tuple2<String, Integer>>(result);
+			
+			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, null, true);
+			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, null, false);
+			
+			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
+			Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular);
 
 			Set<Tuple2<String, Integer>> expectedResult = new HashSet<Tuple2<String,
 					Integer>>(asList(new Tuple2<String, Integer>("foo", 4), new Tuple2<String,
 					Integer>("bar", 6)));
 
-			assertEquals(expectedResult, resultSet);
-		} catch (Exception e) {
+			assertEquals(expectedResult, resultSetMutableSafe);
+			assertEquals(expectedResult, resultSetRegular);
+		}
+		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
@@ -149,15 +155,19 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
 					Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
 					Integer>("bar", 4)));
 
-			List<Tuple2<String, Integer>> result = op.executeOnCollections(input,
-					new RuntimeUDFContext(taskName, 1, 0));
-			Set<Tuple2<String, Integer>> resultSet = new HashSet<Tuple2<String, Integer>>(result);
+			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true);
+			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false);
+			
+			
+			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
+			Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular);
 
 			Set<Tuple2<String, Integer>> expectedResult = new HashSet<Tuple2<String,
 					Integer>>(asList(new Tuple2<String, Integer>("foo", 4), new Tuple2<String,
 					Integer>("bar", 6)));
 
-			assertEquals(expectedResult, resultSet);
+			assertEquals(expectedResult, resultSetMutableSafe);
+			assertEquals(expectedResult, resultSetRegular);
 
 			assertTrue(opened.get());
 			assertTrue(closed.get());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
new file mode 100644
index 0000000..d9abf14
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@SuppressWarnings({ "unchecked", "serial" })
+public class JoinOperatorBaseTest implements Serializable {
+
+	
+	@Test
+	public void testTupleBaseJoiner(){
+		final FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double, String>> joiner =
+					new FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double, String>>()
+		{
+			@Override
+			public void join(Tuple3<String, Double, Integer> first, Tuple2<Integer, String> second, Collector<Tuple2<Double, String>> out) {
+				Tuple3<String, Double, Integer> fst = (Tuple3<String, Double, Integer>)first;
+				Tuple2<Integer, String> snd = (Tuple2<Integer, String>)second;
+
+				assertEquals(fst.f0, snd.f1);
+				assertEquals(fst.f2, snd.f0);
+
+				out.collect(new Tuple2<Double, String>(fst.f1, snd.f0.toString()));
+			}
+		};
+
+		final TupleTypeInfo<Tuple3<String, Double, Integer>> leftTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo
+				(String.class, Double.class, Integer.class);
+		final TupleTypeInfo<Tuple2<Integer, String>> rightTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class,
+				String.class);
+		final TupleTypeInfo<Tuple2<Double, String>> outTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Double.class,
+				String.class);
+
+		final int[] leftKeys = new int[]{0,2};
+		final int[] rightKeys = new int[]{1,0};
+
+		final String taskName = "Collection based tuple joiner";
+
+		final BinaryOperatorInformation<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double,
+				String>> binaryOpInfo = new BinaryOperatorInformation<Tuple3<String, Double, Integer>, Tuple2<Integer,
+				String>, Tuple2<Double, String>>(leftTypeInfo, rightTypeInfo, outTypeInfo);
+
+		final JoinOperatorBase<Tuple3<String, Double, Integer>, Tuple2<Integer,
+				String>, Tuple2<Double, String>, FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer,
+				String>, Tuple2<Double, String>>> base = new JoinOperatorBase<Tuple3<String, Double, Integer>,
+				Tuple2<Integer, String>, Tuple2<Double, String>, FlatJoinFunction<Tuple3<String, Double, Integer>,
+				Tuple2<Integer, String>, Tuple2<Double, String>>>(joiner, binaryOpInfo, leftKeys, rightKeys, taskName);
+
+		final List<Tuple3<String, Double, Integer> > inputData1 = new ArrayList<Tuple3<String, Double,
+				Integer>>(Arrays.asList(
+				new Tuple3<String, Double, Integer>("foo", 42.0, 1),
+				new Tuple3<String,Double, Integer>("bar", 1.0, 2),
+				new Tuple3<String, Double, Integer>("bar", 2.0, 3),
+				new Tuple3<String, Double, Integer>("foobar", 3.0, 4),
+				new Tuple3<String, Double, Integer>("bar", 3.0, 3)
+		));
+
+		final List<Tuple2<Integer, String>> inputData2 = new ArrayList<Tuple2<Integer, String>>(Arrays.asList(
+				new Tuple2<Integer, String>(3, "bar"),
+				new Tuple2<Integer, String>(4, "foobar"),
+				new Tuple2<Integer, String>(2, "foo")
+		));
+		final Set<Tuple2<Double, String>> expected = new HashSet<Tuple2<Double, String>>(Arrays.asList(
+				new Tuple2<Double, String>(2.0, "3"),
+				new Tuple2<Double, String>(3.0, "3"),
+				new Tuple2<Double, String>(3.0, "4")
+		));
+
+		try {
+			List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0), true);
+			List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0), false);
+
+			assertEquals(expected, new HashSet<Tuple2<Double, String>>(resultSafe));
+			assertEquals(expected, new HashSet<Tuple2<Double, String>>(resultRegular));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
index a4de40b..2baf57e 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -68,15 +68,20 @@ public class ReduceOperatorTest implements java.io.Serializable {
 					Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
 					Integer>("bar", 4)));
 
-			List<Tuple2<String, Integer>> result = op.executeOnCollections(input, null);
-			Set<Tuple2<String, Integer>> resultSet = new HashSet<Tuple2<String, Integer>>(result);
+			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, null, true);
+			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, null, false);
+			
+			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
+			Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular);
 
 			Set<Tuple2<String, Integer>> expectedResult = new HashSet<Tuple2<String,
 					Integer>>(asList(new Tuple2<String, Integer>("foo", 4), new Tuple2<String,
 					Integer>("bar", 6)));
 
-			assertEquals(expectedResult, resultSet);
-		} catch (Exception e) {
+			assertEquals(expectedResult, resultSetMutableSafe);
+			assertEquals(expectedResult, resultSetRegular);
+		}
+		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
@@ -127,20 +132,23 @@ public class ReduceOperatorTest implements java.io.Serializable {
 					Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
 					Integer>("bar", 4)));
 
-			List<Tuple2<String, Integer>> result = op.executeOnCollections(input,
-					new RuntimeUDFContext(taskName, 1, 0));
+			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true);
+			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false);
 
-			Set<Tuple2<String, Integer>> resultSet = new HashSet<Tuple2<String, Integer>>(result);
+			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
+			Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular);
 
 			Set<Tuple2<String, Integer>> expectedResult = new HashSet<Tuple2<String,
 					Integer>>(asList(new Tuple2<String, Integer>("foo", 4), new Tuple2<String,
 					Integer>("bar", 6)));
 
-			assertEquals(expectedResult, resultSet);
+			assertEquals(expectedResult, resultSetMutableSafe);
+			assertEquals(expectedResult, resultSetRegular);
 
 			assertTrue(opened.get());
 			assertTrue(closed.get());
-		} catch (Exception e) {
+		}
+		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac69cb3e/flink-java/src/test/java/org/apache/flink/api/java/operators/base/JoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/base/JoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/base/JoinOperatorBaseTest.java
deleted file mode 100644
index f332832..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/base/JoinOperatorBaseTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.operators.base;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-import java.io.Serializable;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-public class JoinOperatorBaseTest implements Serializable {
-
-	@Test
-	public void testTupleBaseJoiner(){
-		final FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer,
-				String>, Tuple2<Double, String>> joiner = new FlatJoinFunction() {
-			@Override
-			public void join(Object first, Object second, Collector out) throws Exception {
-				Tuple3<String, Double, Integer> fst = (Tuple3<String, Double, Integer>)first;
-				Tuple2<Integer, String> snd = (Tuple2<Integer, String>)second;
-
-				assertEquals(fst.f0, snd.f1);
-				assertEquals(fst.f2, snd.f0);
-
-				out.collect(new Tuple2<Double, String>(fst.f1, snd.f0.toString()));
-			}
-		};
-
-		final TupleTypeInfo<Tuple3<String, Double, Integer>> leftTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo
-				(String.class, Double.class, Integer.class);
-		final TupleTypeInfo<Tuple2<Integer, String>> rightTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class,
-				String.class);
-		final TupleTypeInfo<Tuple2<Double, String>> outTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Double.class,
-				String.class);
-
-		final int[] leftKeys = new int[]{0,2};
-		final int[] rightKeys = new int[]{1,0};
-
-		final String taskName = "Collection based tuple joiner";
-
-		final BinaryOperatorInformation<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double,
-				String>> binaryOpInfo = new BinaryOperatorInformation<Tuple3<String, Double, Integer>, Tuple2<Integer,
-				String>, Tuple2<Double, String>>(leftTypeInfo, rightTypeInfo, outTypeInfo);
-
-		final JoinOperatorBase<Tuple3<String, Double, Integer>, Tuple2<Integer,
-				String>, Tuple2<Double, String>, FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer,
-				String>, Tuple2<Double, String>>> base = new JoinOperatorBase<Tuple3<String, Double, Integer>,
-				Tuple2<Integer, String>, Tuple2<Double, String>, FlatJoinFunction<Tuple3<String, Double, Integer>,
-				Tuple2<Integer, String>, Tuple2<Double, String>>>(joiner, binaryOpInfo, leftKeys, rightKeys, taskName);
-
-		final List<Tuple3<String, Double, Integer> > inputData1 = new ArrayList<Tuple3<String, Double,
-				Integer>>(Arrays.asList(
-				new Tuple3<String, Double, Integer>("foo", 42.0, 1),
-				new Tuple3<String,Double, Integer>("bar", 1.0, 2),
-				new Tuple3<String, Double, Integer>("bar", 2.0, 3),
-				new Tuple3<String, Double, Integer>("foobar", 3.0, 4),
-				new Tuple3<String, Double, Integer>("bar", 3.0, 3)
-		));
-
-		final List<Tuple2<Integer, String>> inputData2 = new ArrayList<Tuple2<Integer, String>>(Arrays.asList(
-				new Tuple2<Integer, String>(3, "bar"),
-				new Tuple2<Integer, String>(4, "foobar"),
-				new Tuple2<Integer, String>(2, "foo")
-		));
-		final Set<Tuple2<Double, String>> expected = new HashSet<Tuple2<Double, String>>(Arrays.asList(
-				new Tuple2<Double, String>(2.0, "3"),
-				new Tuple2<Double, String>(3.0, "3"),
-				new Tuple2<Double, String>(3.0, "4")
-		));
-
-		try {
-			Method executeOnCollections = base.getClass().getDeclaredMethod("executeOnCollections", List.class,
-					List.class, RuntimeContext.class);
-			executeOnCollections.setAccessible(true);
-
-			Object result = executeOnCollections.invoke(base, inputData1, inputData2, null);
-
-			assertEquals(expected, new HashSet<Tuple2<Double, String>>((List<Tuple2<Double, String>>)result));
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-
-	}
-}


Mime
View raw message