flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [12/13] flink git commit: [FLINK-1285] Various cleanup of object reusing and non-reusing code.
Date Thu, 08 Jan 2015 10:59:07 GMT
[FLINK-1285] Various cleanup of object reusing and non-reusing code.

 - The map driver now also supports this

 - In the merge iterator (merges sorted runs from external sort), we now always use the non-reusing code path,
   because the reusing codepath here implies in all cases additional instances to be held concurrently, and copy
   between elements, which voids the benefits of reusing elements.

 - For many utility iterators (in test cases), consolidates the logic between the two variants of the "next()"
   functions (one calls the other, where possible)

 - Eliminates a few copies between elements in the non-reusing parts (where possible)

 - Removes unused variables in the non-reusing variants (mainly serializers previously used to create instance
   or copy between instances)

 - Remove some unused types

 -  Improves generic type safety (fewer raw types)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/26c9819e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/26c9819e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/26c9819e

Branch: refs/heads/master
Commit: 26c9819edce13ab13d3789e5f825b7a31a12a987
Parents: d529749
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Jan 7 21:43:50 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Jan 7 21:43:50 2015 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/api/common/Plan.java  |  10 +-
 .../flink/util/MutableObjectIterator.java       |  23 +-
 .../BroadcastVariableMaterialization.java       |   5 +-
 .../iterative/io/HashPartitionIterator.java     |   3 +-
 .../CoGroupWithSolutionSetFirstDriver.java      |   2 +-
 .../CoGroupWithSolutionSetSecondDriver.java     |   3 +-
 .../operators/GroupReduceCombineDriver.java     |   8 +-
 .../runtime/operators/GroupReduceDriver.java    |   3 +-
 .../flink/runtime/operators/MapDriver.java      |  18 +-
 .../runtime/operators/RegularPactTask.java      |   4 +-
 .../SynchronousChainedCombineDriver.java        |   3 +-
 .../operators/hash/CompactingHashTable.java     |  19 +-
 .../runtime/operators/hash/HashPartition.java   |  15 +-
 .../sort/CombiningUnilateralSortMerger.java     |   4 +-
 .../runtime/operators/sort/MergeIterator.java   |  72 ++-----
 .../sort/NonReusingMergeMatchIterator.java      |   7 +-
 .../operators/sort/UnilateralSortMerger.java    |   6 +-
 .../runtime/operators/util/ReaderIterator.java  |  14 +-
 .../plugable/DeserializationDelegate.java       |   1 +
 .../util/EmptyMutableObjectIterator.java        |   4 -
 .../util/KeyGroupedMutableObjectIterator.java   | 208 -------------------
 .../util/NonReusingKeyGroupedIterator.java      |  15 +-
 ...nReusingMutableToRegularIteratorWrapper.java |   7 +-
 .../util/RegularToMutableObjectIterator.java    |   4 +-
 .../runtime/instance/InstanceManagerTest.java   |   6 +-
 .../runtime/operators/hash/HashTableITCase.java |  39 +---
 .../sort/MassiveStringSortingITCase.java        |  11 +-
 .../operators/sort/MergeIteratorTest.java       |  35 +---
 .../NonReusingSortMergeMatchIteratorITCase.java |   8 +-
 .../ReusingSortMergeMatchIteratorITCase.java    |   8 +-
 .../operators/testutils/DriverTestBase.java     |   2 +-
 .../operators/testutils/MockEnvironment.java    |   1 -
 .../testutils/MutableObjectIteratorWrapper.java |  15 +-
 .../testutils/RandomIntPairGenerator.java       |   5 +-
 .../runtime/operators/testutils/TestData.java   |  50 ++---
 .../testutils/UniformIntPairGenerator.java      |  30 +--
 .../testutils/UniformRecordGenerator.java       |  38 +---
 .../testutils/UniformStringPairGenerator.java   |  41 +---
 .../operators/testutils/types/StringPair.java   |   3 +-
 .../util/NonReusingKeyGroupedIteratorTest.java  |   4 +-
 40 files changed, 159 insertions(+), 595 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index 4a975d2..fb50742 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -285,16 +285,18 @@ public class Plan implements Visitable<Operator<?>> {
 	}
 
 	/**
-	 * Sets the runtime config object.
-	 * @return
+	 * Gets the execution config object.
+	 * 
+	 * @return The execution config object.
 	 */
 	public ExecutionConfig getExecutionConfig() {
 		return executionConfig;
 	}
 
 	/**
-	 * Gets the runtime config object.
-	 * @param executionConfig
+	 * Sets the runtime config object defining execution parameters.
+	 * 
+	 * @param executionConfig The execution config to use.
 	 */
 	public void setExecutionConfig(ExecutionConfig executionConfig) {
 		this.executionConfig = executionConfig;

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java b/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java
index ea5ed78..d0d393e 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java
@@ -20,11 +20,15 @@ package org.apache.flink.util;
 import java.io.IOException;
 
 /**
- * A simple iterator interface. The key differences to the {@link java.util.Iterator} are that this
- * iterator also as a <code>next()</code> method that </code>accepts an object into which it can
- * place the content if the object is mutable, and that it consolidates the logic in a single
- * <code>next()</code> function, rather than in two different functions such as
- * <code>hasNext()</code> and <code>next()</code>.
+ * A simple iterator interface. The key differences to the {@link java.util.Iterator} are
+ * 
+ * <ul>
+ *   <li>It has two distinct <code>next()</code>, where one variant allows to pass an object that may
+ *       be reused, if the type is mutable.</li>
+ *   <li>It consolidates the logic in a single <code>next()</code> function, rather than
+ *       splitting it over two different functions such as <code>hasNext()</code> and <code>next()</code>
+ *       </li>
+ * </ul>
  * 
  * @param <E> The element type of the collection iterated over.
  */
@@ -32,10 +36,10 @@ public interface MutableObjectIterator<E> {
 	
 	/**
 	 * Gets the next element from the collection. The contents of that next element is put into the
-	 * given target object.
+	 * given reuse object, if the type is mutable.
 	 * 
 	 * @param reuse The target object into which to place next element if E is mutable.
-	 * @return The filled object or <code>null</code> if the iterator is exhausted
+	 * @return The filled object or <code>null</code> if the iterator is exhausted.
 	 * 
 	 * @throws IOException Thrown, if a problem occurred in the underlying I/O layer or in the 
 	 *                     serialization / deserialization logic
@@ -43,9 +47,10 @@ public interface MutableObjectIterator<E> {
 	public E next(E reuse) throws IOException;
 
 	/**
-	 * Gets the next element from the collection. The reader must create a new instance itself.
+	 * Gets the next element from the collection. The iterator implementation
+	 * must obtain a new instance.
 	 *
-	 * @return The object or <code>null</code> if the iterator is exhausted
+	 * @return The object or <code>null</code> if the iterator is exhausted.
 	 *
 	 * @throws IOException Thrown, if a problem occurred in the underlying I/O layer or in the
 	 *                     serialization / deserialization logic

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
index 5b5f2f2..cd6223c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.runtime.io.network.api.MutableReader;
 import org.apache.flink.runtime.operators.RegularPactTask;
 import org.apache.flink.runtime.operators.util.ReaderIterator;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,11 +96,11 @@ public class BroadcastVariableMaterialization<T, C> {
 
 		try {
 			@SuppressWarnings("unchecked")
-			final MutableReader typedReader = (MutableReader) reader;
+			final MutableReader<DeserializationDelegate<T>> typedReader = (MutableReader<DeserializationDelegate<T>>) reader;
+			
 			@SuppressWarnings("unchecked")
 			final TypeSerializer<T> serializer = ((TypeSerializerFactory<T>) serializerFactory).getSerializer();
 
-			@SuppressWarnings("unchecked")
 			final ReaderIterator<T> readerIterator = new ReaderIterator<T>(typedReader, serializer);
 			
 			if (materializer) {

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
index 209fb79..93ae55f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.io;
 
 import java.io.EOFException;
@@ -28,7 +27,7 @@ import org.apache.flink.runtime.operators.hash.HashPartition;
 import org.apache.flink.util.MutableObjectIterator;
 
 /**
- * {@link Iterator} over the buildside entries of a {@link HashPartition}
+ * {@link Iterator} over the build side entries of a {@link HashPartition}
  * 
  * @param <BT>
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
index a3c69a3..b27b6b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
@@ -201,7 +201,7 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
 				}
 			}
 		} else {
-			final NonReusingKeyGroupedIterator<IT2> probeSideInput = new NonReusingKeyGroupedIterator<IT2>(taskContext.<IT2>getInput(0), probeSideSerializer, probeSideComparator);
+			final NonReusingKeyGroupedIterator<IT2> probeSideInput = new NonReusingKeyGroupedIterator<IT2>(taskContext.<IT2>getInput(0), probeSideComparator);
 			if (this.hashTable != null) {
 				final CompactingHashTable<IT1> join = hashTable;
 				final CompactingHashTable<IT1>.HashTableProber<IT2> prober = join.getProber(this

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
index 17fc471..ba0f8f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
@@ -200,7 +200,8 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
 				}
 			}
 		} else {
-			final NonReusingKeyGroupedIterator<IT1> probeSideInput = new NonReusingKeyGroupedIterator<IT1>(taskContext.<IT1>getInput(0), probeSideSerializer, probeSideComparator);
+			final NonReusingKeyGroupedIterator<IT1> probeSideInput = 
+					new NonReusingKeyGroupedIterator<IT1>(taskContext.<IT1>getInput(0), probeSideComparator);
 
 			if (this.hashTable != null) {
 				final CompactingHashTable<IT2> join = hashTable;

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index 8d8d5dc..be0c9c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -177,8 +177,8 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 			if (!sorter.isEmpty()) {
 				this.sortAlgo.sort(sorter);
 
-				final ReusingKeyGroupedIterator<T> keyIter = new ReusingKeyGroupedIterator<T>(sorter.getIterator(), this.serializer, this.groupingComparator);
-
+				final ReusingKeyGroupedIterator<T> keyIter = 
+						new ReusingKeyGroupedIterator<T>(sorter.getIterator(), this.serializer, this.groupingComparator);
 
 				final FlatCombineFunction<T> combiner = this.combiner;
 				final Collector<T> output = this.output;
@@ -192,8 +192,8 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 			if (!sorter.isEmpty()) {
 				this.sortAlgo.sort(sorter);
 
-				final NonReusingKeyGroupedIterator<T> keyIter = new NonReusingKeyGroupedIterator<T>(sorter.getIterator(), this.serializer, this.groupingComparator);
-
+				final NonReusingKeyGroupedIterator<T> keyIter = 
+						new NonReusingKeyGroupedIterator<T>(sorter.getIterator(), this.groupingComparator);
 
 				final FlatCombineFunction<T> combiner = this.combiner;
 				final Collector<T> output = this.output;

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
index 9d9f994..211622c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -120,7 +119,7 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction
 			}
 		}
 		else {
-			final NonReusingKeyGroupedIterator<IT> iter = new NonReusingKeyGroupedIterator<IT>(this.input, this.serializer, this.comparator);
+			final NonReusingKeyGroupedIterator<IT> iter = new NonReusingKeyGroupedIterator<IT>(this.input, this.comparator);
 			// run stub implementation
 			while (this.running && iter.nextKey()) {
 				stub.reduce(iter.getValues(), output);

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
index d6679dd..d750fd9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -84,10 +83,19 @@ public class MapDriver<IT, OT> implements PactDriver<MapFunction<IT, OT>, OT> {
 		final MapFunction<IT, OT> function = this.taskContext.getStub();
 		final Collector<OT> output = this.taskContext.getOutputCollector();
 
-		IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
-
-		while (this.running && ((record = input.next(record)) != null)) {
-			output.collect(function.map(record));
+		if (objectReuseEnabled) {
+			IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
+	
+			while (this.running && ((record = input.next(record)) != null)) {
+				output.collect(function.map(record));
+			}
+		}
+		else {
+			IT record = null;
+			
+			while (this.running && ((record = input.next()) != null)) {
+				output.collect(function.map(record));
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index db36f6d..34aaced 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -1088,9 +1088,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 			} else {
 				return new ExecutionConfig();
 			}
-		} catch (IOException e) {
-			throw new RuntimeException("Could not load ExecutionConfig from Job Configuration: " + e);
-		} catch (ClassNotFoundException e) {
+		} catch (Exception e) {
 			throw new RuntimeException("Could not load ExecutionConfig from Job Configuration: " + e);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
index 65426ef..dde6fe6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.chaining;
 
 import java.io.IOException;
@@ -213,7 +212,7 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
 			if (!sorter.isEmpty()) {
 				this.sortAlgo.sort(sorter);
 				// run the combiner
-				final NonReusingKeyGroupedIterator<T> keyIter = new NonReusingKeyGroupedIterator<T>(sorter.getIterator(), this.serializer, this.groupingComparator);
+				final NonReusingKeyGroupedIterator<T> keyIter = new NonReusingKeyGroupedIterator<T>(sorter.getIterator(), this.groupingComparator);
 
 
 				// cache references on the stack

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index 3dd400f..301aa82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -1220,28 +1220,11 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 
 		@Override
 		public T next(T reuse) throws IOException {
-			if(done || this.table.closed.get()) {
-				return null;
-			} else if(!cache.isEmpty()) {
-				reuse = cache.remove(cache.size()-1);
-				return reuse;
-			} else {
-				while(!done && cache.isEmpty()) {
-					done = !fillCache();
-				}
-				if(!done) {
-					reuse = cache.remove(cache.size()-1);
-					return reuse;
-				} else {
-					return null;
-				}
-			}
+			return next();
 		}
 
 		@Override
 		public T next() throws IOException {
-			// This is just a copy of the above, I wanted to keep the two separate,
-			// in case we change something later. Plus, it keeps the diff clean... :D
 			if(done || this.table.closed.get()) {
 				return null;
 			} else if(!cache.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
index 23a415d..14e4ae6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
@@ -44,8 +44,8 @@ import org.apache.flink.util.MutableObjectIterator;
 
 /**
  * 
- * @tparam BT The type of the build side records.
- * @tparam PT The type of the probe side records.
+ * @param <BT> The type of the build side records.
+ * @param <PT> The type of the probe side records.
  */
 public class HashPartition<BT, PT> extends AbstractPagedInputView implements SeekableDataInputView
 {
@@ -637,19 +637,12 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 			}
 		}
 
-
-		protected final long getPointer()
-		{
+		protected final long getPointer() {
 			return this.currentPointer;
 		}
 		
-		protected final int getCurrentHashCode()
-		{
+		protected final int getCurrentHashCode() {
 			return this.currentHashCode;
 		}
 	}
-
-	
-
-	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index 4f15abf..5d4c881 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -243,10 +243,10 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 				}
 				disposeSortBuffers(true);
 				
-				// set lazy iterator
+				// set result iterator
 				MutableObjectIterator<E> resIter = iterators.isEmpty() ? EmptyMutableObjectIterator.<E>get() :
 						iterators.size() == 1 ? iterators.get(0) : 
-						new MergeIterator<E>(iterators,	this.serializer, this.comparator);
+						new MergeIterator<E>(iterators, this.comparator);
 				
 				setResultIterator(resIter);
 				return;

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
index f3dc50e..759e0e9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.sort;
 
 import java.io.IOException;
@@ -41,73 +40,49 @@ public class MergeIterator<E> implements MutableObjectIterator<E> {
 	
 	private final PartialOrderPriorityQueue<HeadStream<E>> heap;	// heap over the head elements of the stream
 	
-	private final TypeSerializer<E> serializer;
-	
 	/**
 	 * @param iterators
-	 * @param serializer
 	 * @param comparator
 	 * @throws IOException
 	 */
-	public MergeIterator(List<MutableObjectIterator<E>> iterators,
-			TypeSerializer<E> serializer, TypeComparator<E> comparator)
-	throws IOException
-	{
+	public MergeIterator(List<MutableObjectIterator<E>> iterators, TypeComparator<E> comparator) throws IOException {
 		this.heap = new PartialOrderPriorityQueue<HeadStream<E>>(new HeadStreamComparator<E>(), iterators.size());
-		this.serializer = serializer;
 		
 		for (MutableObjectIterator<E> iterator : iterators) {
-			this.heap.add(new HeadStream<E>(iterator, serializer, comparator.duplicate()));
+			this.heap.add(new HeadStream<E>(iterator, comparator.duplicate()));
 		}
 	}
 
 	/**
 	 * Gets the next smallest element, with respect to the definition of order implied by
-	 * the {@link TypeSerializer} provided to this iterator.
+	 * the {@link TypeSerializer} provided to this iterator. This method does in fact not
+	 * reuse the given element (which would here imply potentially expensive copying), 
+	 * but always returns a new element.
 	 * 
-	 * @param reuse The object into which the result is put. The contents of the reuse object
-	 *               is only valid after this method, if the method returned true. Otherwise
-	 *               the contents is undefined.
-	 * @return True, if the iterator had another element, false otherwise. 
+	 * @param reuse Ignored.
+	 * @return The next smallest element, or null, if the iterator is exhausted. 
 	 * 
 	 * @see org.apache.flink.util.MutableObjectIterator#next(java.lang.Object)
 	 */
 	@Override
-	public E next(E reuse) throws IOException
-	{
-		if (this.heap.size() > 0) {
-			// get the smallest element
-			final HeadStream<E> top = this.heap.peek();
-			reuse = this.serializer.copy(top.getHead(), reuse);
-			
-			// read an element
-			if (!top.nextHead()) {
-				this.heap.poll();
-			} else {
-				this.heap.adjustTop();
-			}
-			return reuse;
-		}
-		else {
-			return null;
-		}
+	public E next(E reuse) throws IOException {
+		return next();
 	}
 
 	/**
 	 * Gets the next smallest element, with respect to the definition of order implied by
 	 * the {@link TypeSerializer} provided to this iterator.
 	 *
-	 * @return True, if the iterator had another element, false otherwise.
+	 * @return The next element if the iterator has another element, null otherwise.
 	 *
-	 * @see org.apache.flink.util.MutableObjectIterator#next(java.lang.Object)
+	 * @see org.apache.flink.util.MutableObjectIterator#next()
 	 */
 	@Override
-	public E next() throws IOException
-	{
+	public E next() throws IOException {
 		if (this.heap.size() > 0) {
 			// get the smallest element
 			final HeadStream<E> top = this.heap.peek();
-			E result = this.serializer.copy(top.getHead());
+			E result = top.getHead();
 
 			// read an element
 			if (!top.nextHead()) {
@@ -126,20 +101,17 @@ public class MergeIterator<E> implements MutableObjectIterator<E> {
 	//                      Internal Classes that wrap the sorted input streams
 	// ============================================================================================
 	
-	private static final class HeadStream<E>
-	{
+	private static final class HeadStream<E> {
+		
 		private final MutableObjectIterator<E> iterator;
 		
 		private final TypeComparator<E> comparator;
 		
 		private E head;
 
-		public HeadStream(MutableObjectIterator<E> iterator, TypeSerializer<E> serializer, TypeComparator<E> comparator)
-		throws IOException
-		{
+		public HeadStream(MutableObjectIterator<E> iterator, TypeComparator<E> comparator) throws IOException {
 			this.iterator = iterator;
 			this.comparator = comparator;
-			this.head = serializer.createInstance();
 			
 			if (!nextHead()) {
 				throw new IllegalStateException();
@@ -150,9 +122,8 @@ public class MergeIterator<E> implements MutableObjectIterator<E> {
 			return this.head;
 		}
 
-		public boolean nextHead() throws IOException
-		{
-			if ((this.head = this.iterator.next(this.head)) != null) {
+		public boolean nextHead() throws IOException {
+			if ((this.head = this.iterator.next()) != null) {
 				this.comparator.setReference(this.head);
 				return true;
 			}
@@ -164,11 +135,10 @@ public class MergeIterator<E> implements MutableObjectIterator<E> {
 
 	// --------------------------------------------------------------------------------------------
 	
-	private static final class HeadStreamComparator<E> implements Comparator<HeadStream<E>>
-	{		
+	private static final class HeadStreamComparator<E> implements Comparator<HeadStream<E>> {
+		
 		@Override
-		public int compare(HeadStream<E> o1, HeadStream<E> o2)
-		{
+		public int compare(HeadStream<E> o1, HeadStream<E> o2) {
 			return o2.comparator.compareToReference(o1.comparator);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
index 70b6f9a..c89b5c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
@@ -40,7 +40,6 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
-
 /**
  * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
  * matching through a sort-merge join strategy.
@@ -64,7 +63,7 @@ public class NonReusingMergeMatchIterator<T1, T2, O> implements JoinTaskIterator
 
 	private final TypeSerializer<T2> serializer2;
 
-	private final NonReusingBlockResettableIterator<T2> blockIt;				// for N:M cross products with same key
+	private final NonReusingBlockResettableIterator<T2> blockIt;	// for N:M cross products with same key
 
 	private final List<MemorySegment> memoryForSpillingIterator;
 
@@ -97,8 +96,8 @@ public class NonReusingMergeMatchIterator<T1, T2, O> implements JoinTaskIterator
 		this.memoryManager = memoryManager;
 		this.ioManager = ioManager;
 
-		this.iterator1 = new NonReusingKeyGroupedIterator<T1>(input1, this.serializer1, comparator1.duplicate());
-		this.iterator2 = new NonReusingKeyGroupedIterator<T2>(input2, this.serializer2, comparator2.duplicate());
+		this.iterator1 = new NonReusingKeyGroupedIterator<T1>(input1, comparator1.duplicate());
+		this.iterator2 = new NonReusingKeyGroupedIterator<T2>(input2, comparator2.duplicate());
 
 		final int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1;
 		this.blockIt = new NonReusingBlockResettableIterator<T2>(this.memoryManager, this.serializer2,

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index 459ef82..dabf9bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -1250,13 +1250,13 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 				// set lazy iterator
 				setResultIterator(iterators.isEmpty() ? EmptyMutableObjectIterator.<E>get() :
 						iterators.size() == 1 ? iterators.get(0) : 
-						new MergeIterator<E>(iterators,	this.serializer, this.comparator));
+						new MergeIterator<E>(iterators, this.comparator));
 				return;
 			}			
 			
 			// ------------------- Spilling Phase ------------------------
 			
-			final FileIOChannel.Enumerator enumerator = this.ioManager.createChannelEnumerator();			
+			final FileIOChannel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
 			List<ChannelWithBlockCount> channelIDs = new ArrayList<ChannelWithBlockCount>();
 
 			
@@ -1430,7 +1430,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 				iterators.add(new ChannelReaderInputViewIterator<E>(inView, null, this.serializer));
 			}
 
-			return new MergeIterator<E>(iterators, this.serializer, this.comparator);
+			return new MergeIterator<E>(iterators, this.comparator);
 		}
 
 		/**

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/ReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/ReaderIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/ReaderIterator.java
index 606c50c..85e36a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/ReaderIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/ReaderIterator.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.util;
 
 import java.io.IOException;
@@ -28,13 +27,16 @@ import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate;
 import org.apache.flink.util.MutableObjectIterator;
 
-
 /**
- * A {@link MutableObjectIterator} that wraps a Nephele Reader producing records of a certain type.
+ * A {@link MutableObjectIterator} that wraps a reader from an input channel and
+ * produces the reader's records.
+ * 
+ * The reader supports reading objects with possible reuse of mutable types, and
+ * without reuse of mutable types.
  */
 public final class ReaderIterator<T> implements MutableObjectIterator<T> {
 	
-	private final MutableReader reader;		// the source
+	private final MutableReader<DeserializationDelegate<T>> reader;   // the source
 	
 	private final ReusingDeserializationDelegate<T> reusingDelegate;
 	private final NonReusingDeserializationDelegate<T> nonReusingDelegate;
@@ -51,7 +53,6 @@ public final class ReaderIterator<T> implements MutableObjectIterator<T> {
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public T next(T target) throws IOException {
 		this.reusingDelegate.setInstance(target);
 		try {
@@ -60,7 +61,6 @@ public final class ReaderIterator<T> implements MutableObjectIterator<T> {
 			} else {
 				return null;
 			}
-
 		}
 		catch (InterruptedException e) {
 			throw new IOException("Reader interrupted.", e);
@@ -68,7 +68,6 @@ public final class ReaderIterator<T> implements MutableObjectIterator<T> {
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public T next() throws IOException {
 		try {
 			if (this.reader.next(this.nonReusingDelegate)) {
@@ -76,7 +75,6 @@ public final class ReaderIterator<T> implements MutableObjectIterator<T> {
 			} else {
 				return null;
 			}
-
 		}
 		catch (InterruptedException e) {
 			throw new IOException("Reader interrupted.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/DeserializationDelegate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/DeserializationDelegate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/DeserializationDelegate.java
index 9ca5954..fe941a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/DeserializationDelegate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/DeserializationDelegate.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.plugable;
 import org.apache.flink.core.io.IOReadableWritable;
 
 public interface DeserializationDelegate<T> extends IOReadableWritable {
+	
 	void setInstance(T instance);
 
 	T getInstance();

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyMutableObjectIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyMutableObjectIterator.java
index 12ae5c1..ae3f814 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyMutableObjectIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyMutableObjectIterator.java
@@ -16,15 +16,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.util.MutableObjectIterator;
 
-
 /**
  * An empty mutable object iterator that never returns anything.
- *
  */
 public final class EmptyMutableObjectIterator<E> implements MutableObjectIterator<E> {
 
@@ -64,5 +61,4 @@ public final class EmptyMutableObjectIterator<E> implements MutableObjectIterato
 	public E next() {
 		return null;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java
deleted file mode 100644
index c139aca..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.util.MutableObjectIterator;
-
-/**
- * The KeyValueIterator returns a key and all values that belong to the key (share the same key).
- * A sub-iterator over all values with the same key is provided.
- */
-public final class KeyGroupedMutableObjectIterator<E> {
-	
-	private final MutableObjectIterator<E> iterator;
-	
-	private final TypeSerializer<E> serializer;
-	
-	private final TypeComparator<E> comparator;
-	
-	private E next;
-
-	private ValuesIterator valuesIterator;
-
-	private boolean nextIsFresh;
-
-	/**
-	 * Initializes the KeyGroupedIterator. It requires an iterator which returns its result
-	 * sorted by the key fields.
-	 * 
-	 * @param iterator An iterator over records, which are sorted by the key fields, in any order.
-	 * @param serializer The serializer for the data type iterated over.
-	 * @param comparator The comparator for the data type iterated over.
-	 */
-	public KeyGroupedMutableObjectIterator(MutableObjectIterator<E> iterator,
-			TypeSerializer<E> serializer, TypeComparator<E> comparator)
-	{
-		if (iterator == null || serializer == null || comparator == null) {
-			throw new NullPointerException();
-		}
-		
-		this.iterator = iterator;
-		this.serializer = serializer;
-		this.comparator = comparator;
-	}
-
-	/**
-	 * Moves the iterator to the next key. This method may skip any values that have not yet been returned by the
-	 * iterator created by the {@link #getValues()} method. Hence, if called multiple times it "removes" pairs.
-	 * 
-	 * @return true if the input iterator has an other group of key-value pairs that share the same key.
-	 */
-	public boolean nextKey() throws IOException
-	{
-		// first element
-		if (this.next == null) {
-			this.next = this.serializer.createInstance();
-			if ((this.next = this.iterator.next(this.next)) != null) {
-				this.comparator.setReference(this.next);
-				this.nextIsFresh = false;
-				this.valuesIterator = new ValuesIterator();
-				this.valuesIterator.nextIsUnconsumed = true;
-				return true;
-			} else {
-				// empty input, set everything null
-				this.valuesIterator = null;
-				return false;
-			}
-		}
-
-		// Whole value-iterator was read and a new key is available.
-		if (this.nextIsFresh) {
-			this.nextIsFresh = false;
-			this.comparator.setReference(this.next);
-			this.valuesIterator.nextIsUnconsumed = true;
-			return true;
-		}
-
-		// try to move to next key.
-		// Required if user code / reduce() method did not read the whole value iterator.
-		while (true) {
-			if ((this.next = this.iterator.next(this.next)) != null) {
-				if (!this.comparator.equalToReference(this.next)) {
-					// the keys do not match, so we have a new group. store the current keys
-					this.comparator.setReference(this.next);						
-					this.nextIsFresh = false;
-					this.valuesIterator.nextIsUnconsumed = true;
-					return true;
-				}
-			}
-			else {
-				this.valuesIterator = null;
-				return false;
-			}
-		}
-	}
-
-	/**
-	 * Returns an iterator over all values that belong to the current key. The iterator is initially <code>null</code>
-	 * (before the first call to {@link #nextKey()} and after all keys are consumed. In general, this method returns
-	 * always a non-null value, if a previous call to {@link #nextKey()} return <code>true</code>.
-	 * 
-	 * @return Iterator over all values that belong to the current key.
-	 */
-	public MutableObjectIterator<E> getValues() {
-		return this.valuesIterator;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	private final class ValuesIterator implements MutableObjectIterator<E>
-	{
-		private final TypeSerializer<E> serializer = KeyGroupedMutableObjectIterator.this.serializer;
-		private final TypeComparator<E> comparator = KeyGroupedMutableObjectIterator.this.comparator; 
-		
-		private boolean nextIsUnconsumed = false;
-
-		@Override
-		public E next(E target)
-		{
-			if (KeyGroupedMutableObjectIterator.this.next == null || KeyGroupedMutableObjectIterator.this.nextIsFresh) {
-				return null;
-			}
-			if (this.nextIsUnconsumed) {
-				return this.serializer.copy(KeyGroupedMutableObjectIterator.this.next, target);
-			}
-			
-			try {
-				if ((target = KeyGroupedMutableObjectIterator.this.iterator.next(target)) != null) {
-					// check whether the keys are equal
-					if (!this.comparator.equalToReference(target)) {
-						// moved to the next key, no more values here
-						KeyGroupedMutableObjectIterator.this.next =
-								this.serializer.copy(target, KeyGroupedMutableObjectIterator.this.next);
-						KeyGroupedMutableObjectIterator.this.nextIsFresh = true;
-						return null;
-					}
-					// same key, next value is in "next"
-					return target;
-				}
-				else {
-					// backing iterator is consumed
-					KeyGroupedMutableObjectIterator.this.next = null;
-					return null;
-				}
-			}
-			catch (IOException ioex) {
-				throw new RuntimeException("An error occurred while reading the next record: " + 
-					ioex.getMessage(), ioex);
-			}
-		}
-
-		@Override
-		public E next()
-		{
-			if (KeyGroupedMutableObjectIterator.this.next == null || KeyGroupedMutableObjectIterator.this.nextIsFresh) {
-				return null;
-			}
-			if (this.nextIsUnconsumed) {
-				return this.serializer.copy(KeyGroupedMutableObjectIterator.this.next);
-			}
-
-			E result = null;
-			try {
-				if ((result = KeyGroupedMutableObjectIterator.this.iterator.next()) != null) {
-					// check whether the keys are equal
-					if (!this.comparator.equalToReference(result)) {
-						// moved to the next key, no more values here
-						KeyGroupedMutableObjectIterator.this.next =
-								this.serializer.copy(result);
-						KeyGroupedMutableObjectIterator.this.nextIsFresh = true;
-						return null;
-					}
-					// same key, next value is in "next"
-					return result;
-				}
-				else {
-					// backing iterator is consumed
-					KeyGroupedMutableObjectIterator.this.next = null;
-					return null;
-				}
-			}
-			catch (IOException ioex) {
-				throw new RuntimeException("An error occurred while reading the next record: " +
-						ioex.getMessage(), ioex);
-			}
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
index 43d9bde..3f28cfc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
@@ -23,18 +23,15 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.util.MutableObjectIterator;
 import org.apache.flink.util.TraversableOnceException;
+
 /**
- * The KeyValueIterator returns a key and all values that belong to the key (share the same key).
- * 
+ * The key grouped iterator returns a key and all values that share the same key.
  */
 public final class NonReusingKeyGroupedIterator<E> {
 	
 	private final MutableObjectIterator<E> iterator;
-
-	private final TypeSerializer<E> serializer;
 	
 	private final TypeComparator<E> comparator;
 	
@@ -51,18 +48,14 @@ public final class NonReusingKeyGroupedIterator<E> {
 	 * sorted by the key fields.
 	 * 
 	 * @param iterator An iterator over records, which are sorted by the key fields, in any order.
-	 * @param serializer The serializer for the data type iterated over.
 	 * @param comparator The comparator for the data type iterated over.
 	 */
-	public NonReusingKeyGroupedIterator(MutableObjectIterator<E> iterator, TypeSerializer<E>
-			serializer, TypeComparator<E> comparator)
-	{
-		if (iterator == null || serializer == null || comparator == null) {
+	public NonReusingKeyGroupedIterator(MutableObjectIterator<E> iterator, TypeComparator<E> comparator) {
+		if (iterator == null || comparator == null) {
 			throw new NullPointerException();
 		}
 		
 		this.iterator = iterator;
-		this.serializer = serializer;
 		this.comparator = comparator;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingMutableToRegularIteratorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingMutableToRegularIteratorWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingMutableToRegularIteratorWrapper.java
index 0db1670..c564c93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingMutableToRegularIteratorWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingMutableToRegularIteratorWrapper.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -41,13 +40,9 @@ public class NonReusingMutableToRegularIteratorWrapper<T> implements Iterator<T>
 
 	private boolean iteratorAvailable = true;
 
-	private TypeSerializer<T> serializer;
-
-	public NonReusingMutableToRegularIteratorWrapper(MutableObjectIterator<T> source,
-			TypeSerializer<T> serializer) {
+	public NonReusingMutableToRegularIteratorWrapper(MutableObjectIterator<T> source, TypeSerializer<T> serializer) {
 		this.source = source;
 		this.current = null;
-		this.serializer = serializer;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/main/java/org/apache/flink/runtime/util/RegularToMutableObjectIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/RegularToMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/RegularToMutableObjectIterator.java
index 8eb17c4..ffcddc1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/RegularToMutableObjectIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/RegularToMutableObjectIterator.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.util;
 
 import java.util.Iterator;
@@ -24,7 +23,6 @@ import java.util.Iterator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.util.MutableObjectIterator;
 
-
 public class RegularToMutableObjectIterator<T> implements MutableObjectIterator<T> {
 
 	private final Iterator<T> iterator;
@@ -51,7 +49,7 @@ public class RegularToMutableObjectIterator<T> implements MutableObjectIterator<
 	@Override
 	public T next() {
 		if (this.iterator.hasNext()) {
-			return this.serializer.copy(this.iterator.next());
+			return this.iterator.next();
 		} else {
 			return null;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index 8a89503..dff3dd3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -84,9 +84,9 @@ public class InstanceManagerTest{
 			final JavaTestKit probe2 = new JavaTestKit(system);
 			final JavaTestKit probe3 = new JavaTestKit(system);
 
-			InstanceID i1 = cm.registerTaskManager(probe1.getRef(), ici1, hardwareDescription, 1);
-			InstanceID i2 = cm.registerTaskManager(probe2.getRef(), ici2, hardwareDescription, 2);
-			InstanceID i3 = cm.registerTaskManager(probe3.getRef(), ici3, hardwareDescription, 5);
+			cm.registerTaskManager(probe1.getRef(), ici1, hardwareDescription, 1);
+			cm.registerTaskManager(probe2.getRef(), ici2, hardwareDescription, 2);
+			cm.registerTaskManager(probe3.getRef(), ici3, hardwareDescription, 5);
 			
 			assertEquals(3, cm.getNumberOfRegisteredTaskManagers());
 			assertEquals(8, cm.getTotalNumberOfSlots());

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index 27ece69..c6e3062 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -1406,15 +1406,14 @@ public class HashTableITCase {
 	/**
 	 * An iterator that returns the Key/Value pairs with identical value a given number of times.
 	 */
-	public static final class ConstantsKeyValuePairsIterator implements MutableObjectIterator<Record>
-	{
+	public static final class ConstantsKeyValuePairsIterator implements MutableObjectIterator<Record> {
+		
 		private final IntValue key;
 		private final IntValue value;
 		
 		private int numLeft;
 		
-		public ConstantsKeyValuePairsIterator(int key, int value, int count)
-		{
+		public ConstantsKeyValuePairsIterator(int key, int value, int count) {
 			this.key = new IntValue(key);
 			this.value = new IntValue(value);
 			this.numLeft = count;
@@ -1436,16 +1435,7 @@ public class HashTableITCase {
 
 		@Override
 		public Record next() {
-			if (this.numLeft > 0) {
-				this.numLeft--;
-				Record result = new Record(2);
-				result.setField(0, this.key);
-				result.setField(1, this.value);
-				return result;
-			}
-			else {
-				return null;
-			}
+			return next(new Record(2));
 		}
 	}
 	
@@ -1454,15 +1444,14 @@ public class HashTableITCase {
 	/**
 	 * An iterator that returns the Key/Value pairs with identical value a given number of times.
 	 */
-	private static final class ConstantsIntPairsIterator implements MutableObjectIterator<IntPair>
-	{
+	private static final class ConstantsIntPairsIterator implements MutableObjectIterator<IntPair> {
+		
 		private final int key;
 		private final int value;
 		
 		private int numLeft;
 		
-		public ConstantsIntPairsIterator(int key, int value, int count)
-		{
+		public ConstantsIntPairsIterator(int key, int value, int count) {
 			this.key = key;
 			this.value = value;
 			this.numLeft = count;
@@ -1483,17 +1472,7 @@ public class HashTableITCase {
 
 		@Override
 		public IntPair next() {
-			if (this.numLeft > 0) {
-				this.numLeft--;
-
-				IntPair result = new IntPair();
-				result.setKey(this.key);
-				result.setValue(this.value);
-				return result;
-			}
-			else {
-				return null;
-			}
+			return next(new IntPair());
 		}
 
 	}
@@ -1522,7 +1501,6 @@ public class HashTableITCase {
 			}
 		}
 
-
 		@Override
 		public int compareToReference(Record candidate) {
 			try {
@@ -1531,7 +1509,6 @@ public class HashTableITCase {
 			} catch (NullPointerException npex) {
 				throw new NullKeyFieldException();
 			}
-				
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
index 8db9934..55d01d2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
@@ -304,16 +304,7 @@ public class MassiveStringSortingITCase {
 
 		@Override
 		public Tuple2<String, String[]> next() throws IOException {
-			String line = reader.readLine();
-			if (line == null) {
-				return null;
-			}
-
-			String[] parts = line.split(" ");
-			Tuple2<String, String[]> result = new Tuple2<String, String[]>();
-			result.f0 = parts[0];
-			result.f1 = parts;
-			return result;
+			return next(new Tuple2<String, String[]>());
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
index c221456..07330ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.sort;
 
 import java.util.ArrayList;
@@ -24,9 +23,7 @@ import java.util.Comparator;
 import java.util.List;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
 import org.apache.flink.runtime.operators.sort.MergeIterator;
 import org.apache.flink.runtime.operators.testutils.TestData;
 import org.apache.flink.runtime.operators.testutils.TestData.Key;
@@ -37,10 +34,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-
-public class MergeIteratorTest
-{
-	private TypeSerializer<Record> serializer;
+public class MergeIteratorTest {
 	
 	private TypeComparator<Record> comparator;
 	
@@ -48,23 +42,21 @@ public class MergeIteratorTest
 	@SuppressWarnings("unchecked")
 	@Before
 	public void setup() {
-		this.serializer = RecordSerializer.get();
 		this.comparator = new RecordComparator(new int[] {0}, new Class[] { TestData.Key.class});
 	}
 	
 	
-	private MutableObjectIterator<Record> newIterator(final int[] keys, final String[] values)
-	{
-		return new MutableObjectIterator<Record>()
-		{
+	private MutableObjectIterator<Record> newIterator(final int[] keys, final String[] values) {
+		
+		return new MutableObjectIterator<Record>() {
+			
 			private Key key = new Key();
 			private Value value = new Value();
 			
 			private int current = 0;
 
 			@Override
-			public Record next(Record reuse)
-			{
+			public Record next(Record reuse) {
 				if (current < keys.length) {
 					key.setKey(keys[current]);
 					value.setValue(values[current]);
@@ -79,15 +71,10 @@ public class MergeIteratorTest
 			}
 
 			@Override
-			public Record next()
-			{
+			public Record next() {
 				if (current < keys.length) {
-					key.setKey(keys[current]);
-					value.setValue(values[current]);
+					Record result = new Record(new Key(keys[current]), new Value(values[current]));
 					current++;
-					Record result = new Record(2);
-					result.setField(0, key);
-					result.setField(1, value);
 					return result;
 				}
 				else {
@@ -111,7 +98,7 @@ public class MergeIteratorTest
 		Comparator<TestData.Key> comparator = new TestData.KeyComparator();
 
 		// merge iterator
-		MutableObjectIterator<Record> iterator = new MergeIterator<Record>(iterators, this.serializer, this.comparator);
+		MutableObjectIterator<Record> iterator = new MergeIterator<Record>(iterators, this.comparator);
 
 		// check expected order
 		Record rec1 = new Record();
@@ -157,7 +144,7 @@ public class MergeIteratorTest
 		Comparator<TestData.Key> comparator = new TestData.KeyComparator();
 
 		// merge iterator
-		MutableObjectIterator<Record> iterator = new MergeIterator<Record>(iterators, this.serializer, this.comparator);
+		MutableObjectIterator<Record> iterator = new MergeIterator<Record>(iterators, this.comparator);
 
 		int elementsFound = 1;
 		// check expected order
@@ -201,7 +188,7 @@ public class MergeIteratorTest
 		Comparator<TestData.Key> comparator = new TestData.KeyComparator();
 
 		// merge iterator
-		MutableObjectIterator<Record> iterator = new MergeIterator<Record>(iterators, this.serializer, this.comparator);
+		MutableObjectIterator<Record> iterator = new MergeIterator<Record>(iterators, this.comparator);
 
 		boolean violationFound = false;
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeMatchIteratorITCase.java
index e873416..757b2e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeMatchIteratorITCase.java
@@ -192,8 +192,8 @@ public class NonReusingSortMergeMatchIteratorITCase {
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 			
-			MutableObjectIterator<Record> input1 = new MergeIterator<Record>(inList1, serializer1, comparator1.duplicate());
-			MutableObjectIterator<Record> input2 = new MergeIterator<Record>(inList2, serializer2, comparator2.duplicate());
+			MutableObjectIterator<Record> input1 = new MergeIterator<Record>(inList1, comparator1.duplicate());
+			MutableObjectIterator<Record> input2 = new MergeIterator<Record>(inList2, comparator2.duplicate());
 			
 			// collect expected data
 			final Map<TestData.Key, Collection<Match>> expectedMatchesMap = matchValues(
@@ -218,8 +218,8 @@ public class NonReusingSortMergeMatchIteratorITCase {
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 	
-			input1 = new MergeIterator<Record>(inList1, serializer1, comparator1.duplicate());
-			input2 = new MergeIterator<Record>(inList2, serializer2, comparator2.duplicate());
+			input1 = new MergeIterator<Record>(inList1, comparator1.duplicate());
+			input2 = new MergeIterator<Record>(inList2, comparator2.duplicate());
 			
 			final JoinFunction matcher = new MatchRemovingMatcher(expectedMatchesMap);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeMatchIteratorITCase.java
index dd7248d..474fa3c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeMatchIteratorITCase.java
@@ -192,8 +192,8 @@ public class ReusingSortMergeMatchIteratorITCase {
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 			
-			MutableObjectIterator<Record> input1 = new MergeIterator<Record>(inList1, serializer1, comparator1.duplicate());
-			MutableObjectIterator<Record> input2 = new MergeIterator<Record>(inList2, serializer2, comparator2.duplicate());
+			MutableObjectIterator<Record> input1 = new MergeIterator<Record>(inList1, comparator1.duplicate());
+			MutableObjectIterator<Record> input2 = new MergeIterator<Record>(inList2, comparator2.duplicate());
 			
 			// collect expected data
 			final Map<TestData.Key, Collection<Match>> expectedMatchesMap = matchValues(
@@ -218,8 +218,8 @@ public class ReusingSortMergeMatchIteratorITCase {
 			inList2.add(gen2Iter);
 			inList2.add(const2Iter);
 	
-			input1 = new MergeIterator<Record>(inList1, serializer1, comparator1.duplicate());
-			input2 = new MergeIterator<Record>(inList2, serializer2, comparator2.duplicate());
+			input1 = new MergeIterator<Record>(inList1, comparator1.duplicate());
+			input2 = new MergeIterator<Record>(inList2, comparator2.duplicate());
 			
 			final JoinFunction matcher = new MatchRemovingMatcher(expectedMatchesMap);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 0e169ec..9b9609b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -170,7 +170,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 		this.numFileHandles = numFileHandles;
 	}
 
-	@SuppressWarnings({"unchecked","rawtypes"})
+	@SuppressWarnings("rawtypes")
 	public void testDriver(PactDriver driver, Class stubClass) throws Exception {
 		testDriverInternal(driver, stubClass);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 0629ea0..9a999fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -55,7 +55,6 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MutableObjectIteratorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MutableObjectIteratorWrapper.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MutableObjectIteratorWrapper.java
index 88f16fc..8c07921 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MutableObjectIteratorWrapper.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MutableObjectIteratorWrapper.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.testutils;
 
 import java.io.IOException;
@@ -25,16 +24,14 @@ import java.util.Iterator;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 
-
 /**
  *
  */
-public class MutableObjectIteratorWrapper implements MutableObjectIterator<Record>
-{
+public class MutableObjectIteratorWrapper implements MutableObjectIterator<Record> {
+	
 	private final Iterator<Record> source;
 	
-	public MutableObjectIteratorWrapper(Iterator<Record> source)
-	{
+	public MutableObjectIteratorWrapper(Iterator<Record> source) {
 		this.source = source;
 	}
 
@@ -42,8 +39,7 @@ public class MutableObjectIteratorWrapper implements MutableObjectIterator<Recor
 	@Override
 	public Record next(Record reuse) throws IOException {
 		if (this.source.hasNext()) {
-			this.source.next().copyTo(reuse);
-			return reuse;
+			return this.source.next();
 		}
 		else {
 			return null;
@@ -52,6 +48,7 @@ public class MutableObjectIteratorWrapper implements MutableObjectIterator<Recor
 
 	@Override
 	public Record next() throws IOException {
+		// copy to be on the safe side
 		if (this.source.hasNext()) {
 			Record result = new Record();
 			this.source.next().copyTo(result);
@@ -61,6 +58,4 @@ public class MutableObjectIteratorWrapper implements MutableObjectIterator<Recor
 			return null;
 		}
 	}
-
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/RandomIntPairGenerator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/RandomIntPairGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/RandomIntPairGenerator.java
index 48a512c..19bd396 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/RandomIntPairGenerator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/RandomIntPairGenerator.java
@@ -60,10 +60,7 @@ public class RandomIntPairGenerator implements MutableObjectIterator<IntPair>
 	@Override
 	public IntPair next() {
 		if (this.count++ < this.numRecords) {
-			IntPair result = new IntPair();
-			result.setKey(this.rnd.nextInt());
-			result.setValue(this.rnd.nextInt());
-			return result;
+			return new IntPair(this.rnd.nextInt(), this.rnd.nextInt());
 		} else {
 			return null;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
index 5fe1303..400e798 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.testutils;
 
 import java.util.Comparator;
@@ -31,11 +30,11 @@ import org.apache.flink.util.MutableObjectIterator;
  * Test data utilities classes.
  */
 public final class TestData {
+	
 	/**
 	 * Private constructor (container class should not be instantiated)
 	 */
-	private TestData() {
-	}
+	private TestData() {}
 
 	/**
 	 * Key comparator.
@@ -74,6 +73,7 @@ public final class TestData {
 	 * Value implementation.
 	 */
 	public static class Value extends StringValue {
+		
 		private static final long serialVersionUID = 1L;
 
 		public Value() {
@@ -84,13 +84,8 @@ public final class TestData {
 			super(v);
 		}
 		
-		/*
-		 * (non-Javadoc)
-		 * @see java.lang.Object#equals(java.lang.Object)
-		 */
 		@Override
-		public boolean equals(final Object obj)
-		{
+		public boolean equals(final Object obj) {
 			if (this == obj) {
 				return true;
 			}
@@ -119,7 +114,8 @@ public final class TestData {
 	/**
 	 * Pair generator.
 	 */
-	public static class Generator implements MutableObjectIterator<Record>{
+	public static class Generator implements MutableObjectIterator<Record> {
+		
 		public enum KeyMode {
 			SORTED, RANDOM
 		};
@@ -152,8 +148,7 @@ public final class TestData {
 			this(seed, keyMax, valueLength, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
 		}
 
-		public Generator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode valueMode)
-		{
+		public Generator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode valueMode) {
 			this(seed, keyMax, valueLength, keyMode, valueMode, null);
 		}
 		
@@ -182,14 +177,7 @@ public final class TestData {
 		}
 
 		public Record next() {
-			this.key.setKey(keyMode == KeyMode.SORTED ? ++counter : Math.abs(random.nextInt() % keyMax) + 1);
-			if (this.valueMode != ValueMode.CONSTANT) {
-				this.value.setValue(randomString());
-			}
-			Record result = new Record(2);
-			result.setField(0, this.key);
-			result.setField(1, this.value);
-			return result;
+			return next(new Record(2));
 		}
 
 		public boolean next(org.apache.flink.types.Value[] target) {
@@ -294,8 +282,8 @@ public final class TestData {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public static class ConstantValueIterator implements MutableObjectIterator<Record>
-	{
+	public static class ConstantValueIterator implements MutableObjectIterator<Record> {
+		
 		private final Key key;
 		private final Value value;
 		
@@ -307,10 +295,9 @@ public final class TestData {
 		private int pos;
 		
 		
-		public ConstantValueIterator(int keyValue, String valueValue, int numPairs)
-		{
+		public ConstantValueIterator(int keyValue, String valueValue, int numPairs) {
 			this.key = new Key(keyValue);
-			this.value = new Value();			
+			this.value = new Value();
 			this.valueValue = valueValue;
 			this.numPairs = numPairs;
 		}
@@ -331,20 +318,9 @@ public final class TestData {
 
 		@Override
 		public Record next() {
-			if (pos < this.numPairs) {
-				this.value.setValue(this.valueValue + ' ' + pos);
-				Record result = new Record(2);
-				result.setField(0, this.key);
-				result.setField(1, this.value);
-				pos++;
-				return result;
-			}
-			else {
-				return null;
-			}
+			return next(new Record(2));
 		}
 
-
 		public void reset() {
 			this.pos = 0;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntPairGenerator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntPairGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntPairGenerator.java
index a7e2a7c..99ba7a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntPairGenerator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntPairGenerator.java
@@ -70,34 +70,6 @@ public class UniformIntPairGenerator implements MutableObjectIterator<IntPair>
 
 	@Override
 	public IntPair next() {
-		IntPair result = new IntPair();
-		if(!repeatKey) {
-			if(valCnt >= numVals) {
-				return null;
-			}
-
-			result.setKey(keyCnt++);
-			result.setValue(valCnt);
-
-			if(keyCnt == numKeys) {
-				keyCnt = 0;
-				valCnt++;
-			}
-		} else {
-			if(keyCnt >= numKeys) {
-				return null;
-			}
-
-			result.setKey(keyCnt);
-			result.setValue(valCnt++);
-
-			if(valCnt == numVals) {
-				valCnt = 0;
-				keyCnt++;
-			}
-		}
-
-		return result;
+		return next(new IntPair());
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformRecordGenerator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformRecordGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformRecordGenerator.java
index b628f05..e27bdfa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformRecordGenerator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformRecordGenerator.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.testutils;
 
 import org.apache.flink.types.IntValue;
@@ -25,8 +24,8 @@ import org.apache.flink.util.MutableObjectIterator;
 
 public class UniformRecordGenerator implements MutableObjectIterator<Record> {
 
-	private final IntValue key = new IntValue();
-	private final IntValue value = new IntValue();
+	private IntValue key = new IntValue();
+	private IntValue value = new IntValue();
 	
 	int numKeys;
 	int numVals;
@@ -84,35 +83,8 @@ public class UniformRecordGenerator implements MutableObjectIterator<Record> {
 
 	@Override
 	public Record next() {
-		if(!repeatKey) {
-			if(valCnt >= numVals+startVal) {
-				return null;
-			}
-
-			key.setValue(keyCnt++);
-			value.setValue(valCnt);
-
-			if(keyCnt == numKeys+startKey) {
-				keyCnt = startKey;
-				valCnt++;
-			}
-		} else {
-			if(keyCnt >= numKeys+startKey) {
-				return null;
-			}
-			key.setValue(keyCnt);
-			value.setValue(valCnt++);
-
-			if(valCnt == numVals+startVal) {
-				valCnt = startVal;
-				keyCnt++;
-			}
-		}
-
-		Record result = new Record(2);
-		result.setField(0, this.key);
-		result.setField(1, this.value);
-		result.updateBinaryRepresenation();
-		return result;
+		key = new IntValue();
+		value = new IntValue();
+		return next(new Record(2));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformStringPairGenerator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformStringPairGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformStringPairGenerator.java
index 45a44fa..3607421 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformStringPairGenerator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformStringPairGenerator.java
@@ -25,12 +25,12 @@ import org.apache.flink.util.MutableObjectIterator;
 
 public class UniformStringPairGenerator implements MutableObjectIterator<StringPair> {
 
-	final int numKeys;
-	final int numVals;
+	private final int numKeys;
+	private final int numVals;
 	
-	int keyCnt = 0;
-	int valCnt = 0;
-	boolean repeatKey;
+	private int keyCnt = 0;
+	private int valCnt = 0;
+	private boolean repeatKey;
 	
 	public UniformStringPairGenerator(int numKeys, int numVals, boolean repeatKey) {
 		this.numKeys = numKeys;
@@ -71,35 +71,6 @@ public class UniformStringPairGenerator implements MutableObjectIterator<StringP
 
 	@Override
 	public StringPair next() throws IOException {
-		StringPair result = new StringPair();
-		if(!repeatKey) {
-			if(valCnt >= numVals) {
-				return null;
-			}
-
-			result.setKey(Integer.toString(keyCnt++));
-			result.setValue(Integer.toBinaryString(valCnt));
-
-			if(keyCnt == numKeys) {
-				keyCnt = 0;
-				valCnt++;
-			}
-		} else {
-			if(keyCnt >= numKeys) {
-				return null;
-			}
-
-			result.setKey(Integer.toString(keyCnt));
-			result.setValue(Integer.toBinaryString(valCnt++));
-
-			if(valCnt == numVals) {
-				valCnt = 0;
-				keyCnt++;
-			}
-		}
-
-		return result;
+		return next(new StringPair());
 	}
-
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPair.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPair.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPair.java
index b832a46..93cd062 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPair.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPair.java
@@ -24,8 +24,7 @@ public class StringPair {
 	private String value;
 	
 	
-	public StringPair()
-	{}
+	public StringPair() {}
 	
 	public StringPair(String key, String value) {
 		this.key = key;

http://git-wip-us.apache.org/repos/asf/flink/blob/26c9819e/flink-runtime/src/test/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIteratorTest.java
index 9f651b1..b71fdb0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIteratorTest.java
@@ -24,7 +24,6 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
@@ -94,11 +93,10 @@ public class NonReusingKeyGroupedIteratorTest {
 			}
 		};
 		
-		final RecordSerializer serializer = RecordSerializer.get();
 		@SuppressWarnings("unchecked")
 		final RecordComparator comparator = new RecordComparator(new int[] {0}, new Class[] {IntValue.class});
 		
-		this.psi = new NonReusingKeyGroupedIterator<Record>(this.sourceIter, serializer, comparator);
+		this.psi = new NonReusingKeyGroupedIterator<Record>(this.sourceIter, comparator);
 	}
 
 	@Test


Mime
View raw message