flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [11/13] flink git commit: [FLINK-1137] Enhance MutableObjectIterator with non-reuse next()
Date Thu, 08 Jan 2015 10:59:06 GMT
[FLINK-1137] Enhance MutableObjectIterator with non-reuse next()

This is in preparation for configurable object-reuse mode. We previously
referred to this as mutable object vs. mutable object safe mode or some
such thing.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3832d7b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3832d7b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3832d7b7

Branch: refs/heads/master
Commit: 3832d7b7a0216e5cf6e5100bb56b5a703d4fb79e
Parents: 2499294
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Dec 2 17:34:46 2014 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Jan 7 19:16:10 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/io/CoReaderIterator.java    |  9 +--
 .../flink/streaming/util/MockContext.java       | 11 ++++
 .../flink/util/MutableObjectIterator.java       | 20 +++++--
 .../BroadcastVariableMaterialization.java       |  8 +--
 .../io/disk/ChannelReaderInputViewIterator.java | 14 +++++
 .../runtime/io/disk/InputViewIterator.java      |  9 +++
 .../iterative/io/HashPartitionIterator.java     | 33 +++++++++++
 .../flink/runtime/operators/DataSinkTask.java   |  5 +-
 .../operators/hash/CompactingHashTable.java     | 21 +++++++
 .../runtime/operators/hash/HashPartition.java   | 23 +++++++-
 .../operators/hash/MutableHashTable.java        | 50 +++++++++++++++++
 .../AbstractBlockResettableIterator.java        | 10 ++++
 .../BlockResettableMutableObjectIterator.java   | 36 +++++++++++-
 ...SpillingResettableMutableObjectIterator.java | 35 +++++++++++-
 .../operators/sort/FixedLengthRecordSorter.java | 27 +++++++++
 .../runtime/operators/sort/MergeIterator.java   | 29 ++++++++++
 .../operators/sort/NormalizedKeySorter.java     | 30 ++++++++++
 .../runtime/operators/util/ReaderIterator.java  | 33 +++++++++--
 .../plugable/DeserializationDelegate.java       | 39 +------------
 .../NonReusingDeserializationDelegate.java      | 57 +++++++++++++++++++
 .../ReusingDeserializationDelegate.java         | 59 ++++++++++++++++++++
 .../util/EmptyMutableObjectIterator.java        | 11 ++++
 .../util/KeyGroupedMutableObjectIterator.java   | 37 ++++++++++++
 .../util/RegularToMutableObjectIterator.java    |  9 +++
 .../runtime/operators/hash/HashTableITCase.java | 30 ++++++++++
 .../sort/MassiveStringSortingITCase.java        | 19 +++++++
 .../operators/sort/MergeIteratorTest.java       | 17 ++++++
 .../operators/sort/MockRecordReader.java        | 26 +++++++++
 .../DelayingInfinitiveInputIterator.java        | 10 +++-
 .../testutils/InfiniteInputIterator.java        |  9 +++
 .../testutils/MutableObjectIteratorWrapper.java | 13 +++++
 .../testutils/RandomIntPairGenerator.java       | 12 ++++
 .../runtime/operators/testutils/TestData.java   | 40 ++++++++++++-
 .../testutils/UniformIntPairGenerator.java      | 33 +++++++++++
 .../testutils/UniformRecordGenerator.java       | 34 +++++++++++
 .../testutils/UniformStringPairGenerator.java   | 33 +++++++++++
 .../operators/testutils/UnionIterator.java      | 18 ++++++
 .../util/KeyGroupedIteratorImmutableTest.java   | 14 +++++
 .../runtime/util/KeyGroupedIteratorTest.java    | 15 +++++
 .../misc/MassiveCaseClassSortingITCase.scala    | 10 ++++
 40 files changed, 884 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
index e4110e7..ed90c03 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate;
 
 /**
  * A CoReaderIterator wraps a {@link CoRecordReader} producing records of two
@@ -31,15 +32,15 @@ public class CoReaderIterator<T1, T2> {
 	private final CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader; // the
 																									// source
 
-	protected final DeserializationDelegate<T1> delegate1;
-	protected final DeserializationDelegate<T2> delegate2;
+	protected final ReusingDeserializationDelegate<T1> delegate1;
+	protected final ReusingDeserializationDelegate<T2> delegate2;
 
 	public CoReaderIterator(
 			CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader,
 			TypeSerializer<T1> serializer1, TypeSerializer<T2> serializer2) {
 		this.reader = reader;
-		this.delegate1 = new DeserializationDelegate<T1>(serializer1);
-		this.delegate2 = new DeserializationDelegate<T2>(serializer2);
+		this.delegate1 = new ReusingDeserializationDelegate<T1>(serializer1);
+		this.delegate2 = new ReusingDeserializationDelegate<T2>(serializer2);
 	}
 
 	public int next(T1 target1, T2 target2) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 87bedb2..5537052 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -72,6 +72,17 @@ public class MockContext<IN, OUT> implements StreamTaskContext<OUT> {
 			}
 			return reuse;
 		}
+
+		@Override
+		public StreamRecord<IN> next() throws IOException {
+			if (listIterator.hasNext()) {
+				StreamRecord<IN> result = new StreamRecord<IN>();
+				result.setObject(listIterator.next());
+				return result;
+			} else {
+				 return null;
+			}
+		}
 	}
 
 	public List<OUT> getOutputs() {

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java b/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java
index b7b41d4..ea5ed78 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java
@@ -21,16 +21,18 @@ import java.io.IOException;
 
 /**
  * A simple iterator interface. The key differences to the {@link java.util.Iterator} are that this
- * iterator accepts an object into which it can place the content if the object is mutable, and that
- * it consolidates the logic in a single <code>next()</code> function, rather than in two different
- * functions such as <code>hasNext()</code> and <code>next()</code>.
+ * iterator also as a <code>next()</code> method that </code>accepts an object into which it can
+ * place the content if the object is mutable, and that it consolidates the logic in a single
+ * <code>next()</code> function, rather than in two different functions such as
+ * <code>hasNext()</code> and <code>next()</code>.
  * 
  * @param <E> The element type of the collection iterated over.
  */
 public interface MutableObjectIterator<E> {
 	
 	/**
-	 * Gets the next element from the collection. The contents of that next element is put into the given target object.
+	 * Gets the next element from the collection. The contents of that next element is put into the
+	 * given target object.
 	 * 
 	 * @param reuse The target object into which to place next element if E is mutable.
 	 * @return The filled object or <code>null</code> if the iterator is exhausted
@@ -39,4 +41,14 @@ public interface MutableObjectIterator<E> {
 	 *                     serialization / deserialization logic
 	 */
 	public E next(E reuse) throws IOException;
+
+	/**
+	 * Gets the next element from the collection. The reader must create a new instance itself.
+	 *
+	 * @return The object or <code>null</code> if the iterator is exhausted
+	 *
+	 * @throws IOException Thrown, if a problem occurred in the underlying I/O layer or in the
+	 *                     serialization / deserialization logic
+	 */
+	public E next() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
index f7aebb6..5b5f2f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.runtime.io.network.api.MutableReader;
 import org.apache.flink.runtime.operators.RegularPactTask;
 import org.apache.flink.runtime.operators.util.ReaderIterator;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,10 +95,11 @@ public class BroadcastVariableMaterialization<T, C> {
 
 		try {
 			@SuppressWarnings("unchecked")
-			final MutableReader<DeserializationDelegate<T>> typedReader = (MutableReader<DeserializationDelegate<T>>) reader;
+			final MutableReader typedReader = (MutableReader) reader;
 			@SuppressWarnings("unchecked")
 			final TypeSerializer<T> serializer = ((TypeSerializerFactory<T>) serializerFactory).getSerializer();
-			
+
+			@SuppressWarnings("unchecked")
 			final ReaderIterator<T> readerIterator = new ReaderIterator<T>(typedReader, serializer);
 			
 			if (materializer) {
@@ -111,7 +111,7 @@ public class BroadcastVariableMaterialization<T, C> {
 				ArrayList<T> data = new ArrayList<T>();
 				
 				T element;
-				while ((element = readerIterator.next(serializer.createInstance())) != null) {
+				while ((element = readerIterator.next()) != null) {
 					data.add(element);
 				}
 				

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
index f38aa25..6007db9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
@@ -92,4 +92,18 @@ public class ChannelReaderInputViewIterator<E> implements MutableObjectIterator<
 			return null;
 		}
 	}
+
+	@Override
+	public E next() throws IOException
+	{
+		try {
+			return this.accessors.deserialize(this.inView);
+		} catch (EOFException eofex) {
+			final List<MemorySegment> freeMem = this.inView.close();
+			if (this.freeMemTarget != null) {
+				this.freeMemTarget.addAll(freeMem);
+			}
+			return null;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/InputViewIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/InputViewIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/InputViewIterator.java
index 74562c2..4f37549 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/InputViewIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/InputViewIterator.java
@@ -45,4 +45,13 @@ public class InputViewIterator<E> implements MutableObjectIterator<E>
 			return null;
 		}
 	}
+
+	@Override
+	public E next() throws IOException {
+		try {
+			return this.serializer.deserialize(this.inputView);
+		} catch (EOFException e) {
+			return null;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
index 2daf5cf..209fb79 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
@@ -65,6 +65,24 @@ public class HashPartitionIterator<BT, PT> implements MutableObjectIterator<BT>
 		return reuse;
 	}
 
+	@Override
+	public BT next() throws IOException {
+		if (currentPartition == null) {
+			if (!partitions.hasNext()) {
+				return null;
+			}
+			currentPartition = partitions.next();
+			currentPartition.setReadPosition(0);
+		}
+
+		try {
+			return serializer.deserialize(currentPartition);
+		} catch (EOFException e) {
+			return advanceAndRead();
+		}
+
+	}
+
 	/* jump to the next partition and continue reading from that */
 	private BT advanceAndRead(BT reuse) throws IOException {
 		if (!partitions.hasNext()) {
@@ -81,4 +99,19 @@ public class HashPartitionIterator<BT, PT> implements MutableObjectIterator<BT>
 		return reuse;
 	}
 
+	/* jump to the next partition and continue reading from that */
+	private BT advanceAndRead() throws IOException {
+		if (!partitions.hasNext()) {
+			return null;
+		}
+		currentPartition = partitions.next();
+		currentPartition.setReadPosition(0);
+
+		try {
+			return serializer.deserialize(currentPartition);
+		} catch (EOFException e) {
+			return advanceAndRead();
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 74c625f..610ab06 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.runtime.operators;
 
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
@@ -38,7 +39,6 @@ import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.util.CloseableInputProvider;
 import org.apache.flink.runtime.operators.util.ReaderIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.util.MutableObjectIterator;
 
 /**
@@ -334,9 +334,8 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 		
 		this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader());
 		
-		MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
 		@SuppressWarnings({ "rawtypes" })
-		final MutableObjectIterator<?> iter = new ReaderIterator(reader, this.inputTypeSerializerFactory.getSerializer());
+		final MutableObjectIterator<?> iter = new ReaderIterator(inputReader, this.inputTypeSerializerFactory.getSerializer());
 		this.reader = (MutableObjectIterator<IT>)iter;
 		
 		// final sanity check

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index 4f10e0b..4c39f28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -1238,6 +1238,27 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			}
 		}
 
+		@Override
+		public T next() throws IOException {
+			// This is just a copy of the above, I wanted to keep the two separate,
+			// in case we change something later. Plus, it keeps the diff clean... :D
+			if(done || this.table.closed.get()) {
+				return null;
+			} else if(!cache.isEmpty()) {
+				return cache.remove(cache.size()-1);
+			} else {
+				while(!done && cache.isEmpty()) {
+					done = !fillCache();
+				}
+				if(!done) {
+					return cache.remove(cache.size()-1);
+				} else {
+					return null;
+				}
+			}
+		}
+
+
 		/**
 		 * utility function that inserts all entries from a bucket and its overflow buckets into the cache
 		 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
index 08acd16..23a415d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
@@ -44,8 +44,8 @@ import org.apache.flink.util.MutableObjectIterator;
 
 /**
  * 
- * @param <BT> The type of the build side records.
- * @param <PT> The type of the probe side records.
+ * @tparam BT The type of the build side records.
+ * @tparam PT The type of the probe side records.
  */
 public class HashPartition<BT, PT> extends AbstractPagedInputView implements SeekableDataInputView
 {
@@ -620,7 +620,24 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 				return null;
 			}
 		}
-		
+
+		public final BT next() throws IOException
+		{
+			final int pos = getCurrentPositionInSegment();
+			final int buffer = HashPartition.this.currentBufferNum;
+
+			this.currentPointer = (((long) buffer) << HashPartition.this.segmentSizeBits) + pos;
+
+			try {
+				BT result = HashPartition.this.buildSideSerializer.deserialize(HashPartition.this);
+				this.currentHashCode = this.comparator.hash(result);
+				return result;
+			} catch (EOFException eofex) {
+				return null;
+			}
+		}
+
+
 		protected final long getPointer()
 		{
 			return this.currentPointer;

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 67f1ea2..e69ef17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -1386,6 +1386,56 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 				this.numInSegment = 0;
 			}
 		}
+
+		public BT next() {
+			// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
+			while (true) {
+
+				while (this.numInSegment < this.countInSegment) {
+
+					final int thisCode = this.bucket.getInt(this.posInSegment);
+					this.posInSegment += HASH_CODE_LEN;
+
+					// check if the hash code matches
+					if (thisCode == this.searchHashCode) {
+						// get the pointer to the pair
+						final long pointer = this.bucket.getLong(this.bucketInSegmentOffset +
+								BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
+						this.numInSegment++;
+
+						// deserialize the key to check whether it is really equal, or whether we had only a hash collision
+						try {
+							this.partition.setReadPosition(pointer);
+							BT result = this.accessor.deserialize(this.partition);
+							if (this.comparator.equalToReference(result)) {
+								this.lastPointer = pointer;
+								return result;
+							}
+						}
+						catch (IOException ioex) {
+							throw new RuntimeException("Error deserializing key or value from the hashtable: " +
+									ioex.getMessage(), ioex);
+						}
+					}
+					else {
+						this.numInSegment++;
+					}
+				}
+
+				// this segment is done. check if there is another chained bucket
+				final long forwardPointer = this.bucket.getLong(this.bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
+				if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
+					return null;
+				}
+
+				final int overflowSegNum = (int) (forwardPointer >>> 32);
+				this.bucket = this.overflowSegments[overflowSegNum];
+				this.bucketInSegmentOffset = (int) (forwardPointer & 0xffffffff);
+				this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
+				this.posInSegment = this.bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
+				this.numInSegment = 0;
+			}
+		}
 		
 		public void writeBack(BT value) throws IOException {
 			final SeekableDataOutputView outView = this.partition.getWriteView();

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java
index 421e0c7..730d19a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java
@@ -173,4 +173,14 @@ abstract class AbstractBlockResettableIterator<T> implements MemoryBlockIterator
 			return null;
 		}
 	}
+
+	protected T getNextRecord() throws IOException {
+		if (this.numRecordsReturned < this.numRecordsInBuffer) {
+			this.numRecordsReturned++;
+			return this.serializer.deserialize(this.readView);
+		} else {
+			return null;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableMutableObjectIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableMutableObjectIterator.java
index 4fe7dbb..abce462 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableMutableObjectIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableMutableObjectIterator.java
@@ -103,7 +103,41 @@ public class BlockResettableMutableObjectIterator<T> extends AbstractBlockResett
 			}
 		}
 	}
-	
+
+	@Override
+	public T next() throws IOException {
+		// check for the left over element
+		if (this.readPhase) {
+			return getNextRecord();
+		} else {
+			// writing phase. check for leftover first
+			T result = null;
+			if (this.leftOverReturned) {
+				// get next record
+				if ((result = this.input.next()) != null) {
+					if (writeNextRecord(result)) {
+						return result;
+					} else {
+						// did not fit into memory, keep as leftover
+						this.leftOverRecord = this.serializer.copy(result);
+						this.leftOverReturned = false;
+						this.fullWriteBuffer = true;
+						return null;
+					}
+				} else {
+					this.noMoreBlocks = true;
+					return null;
+				}
+			} else if (this.fullWriteBuffer) {
+				return null;
+			} else {
+				this.leftOverReturned = true;
+				return this.leftOverRecord;
+			}
+		}
+	}
+
+
 
 	public void reset() {
 		// a reset always goes to the read phase

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIterator.java
index 1d0b540..5467ae9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIterator.java
@@ -159,7 +159,40 @@ public class SpillingResettableMutableObjectIterator<T> implements ResettableMut
 			}
 		}
 	}
-	
+
+	@Override
+	public T next() throws IOException {
+		T result = null;
+		if (this.inView != null) {
+			// reading, any subsequent pass
+			if (this.currentElementNum < this.elementCount) {
+				try {
+					result = this.serializer.deserialize(this.inView);
+				} catch (IOException e) {
+					throw new RuntimeException("SpillingIterator: Error reading element from buffer.", e);
+				}
+				this.currentElementNum++;
+				return result;
+			} else {
+				return null;
+			}
+		} else {
+			// writing pass (first)
+			if ((result = this.input.next()) != null) {
+				try {
+					this.serializer.serialize(result, this.buffer);
+				} catch (IOException e) {
+					throw new RuntimeException("SpillingIterator: Error writing element to buffer.", e);
+				}
+				this.elementCount++;
+				return result;
+			} else {
+				return null;
+			}
+		}
+	}
+
+
 	public void consumeAndCacheRemainingData() throws IOException {
 		// check that we are in the first pass and that more input data is left
 		if (this.inView == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
index 46399e9..f05694b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
@@ -358,6 +358,33 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> {
 					return null;
 				}
 			}
+
+			@Override
+			public T next() {
+				if (this.currentTotal < this.numTotal) {
+
+					if (this.currentInSegment >= this.numPerSegment) {
+						this.currentInSegment = 0;
+						this.currentSegmentIndex++;
+						this.in.set(sortBuffer.get(this.currentSegmentIndex), 0);
+					}
+
+					this.currentTotal++;
+					this.currentInSegment++;
+
+					try {
+						// This might blow up in our face, but we ignore the readWithNormalization/
+						// writeWithNormalization methods for now.
+						return this.comp.readWithKeyDenormalization(null, this.in);
+					}
+					catch (IOException ioe) {
+						throw new RuntimeException(ioe);
+					}
+				}
+				else {
+					return null;
+				}
+			}
 		};
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
index 7a5012d..f3dc50e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
@@ -93,6 +93,35 @@ public class MergeIterator<E> implements MutableObjectIterator<E> {
 		}
 	}
 
+	/**
+	 * Gets the next smallest element, with respect to the definition of order implied by
+	 * the {@link TypeSerializer} provided to this iterator.
+	 *
+	 * @return True, if the iterator had another element, false otherwise.
+	 *
+	 * @see org.apache.flink.util.MutableObjectIterator#next(java.lang.Object)
+	 */
+	@Override
+	public E next() throws IOException
+	{
+		if (this.heap.size() > 0) {
+			// get the smallest element
+			final HeadStream<E> top = this.heap.peek();
+			E result = this.serializer.copy(top.getHead());
+
+			// read an element
+			if (!top.nextHead()) {
+				this.heap.poll();
+			} else {
+				this.heap.adjustTop();
+			}
+			return result;
+		}
+		else {
+			return null;
+		}
+	}
+
 	// ============================================================================================
 	//                      Internal Classes that wrap the sorted input streams
 	// ============================================================================================

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
index c69474a..c382708 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
@@ -322,6 +322,11 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
 		this.recordBuffer.setReadPosition(pointer);
 		return this.serializer.deserialize(reuse, this.recordBuffer);
 	}
+
+	private final T getRecordFromBuffer(long pointer) throws IOException {
+		this.recordBuffer.setReadPosition(pointer);
+		return this.serializer.deserialize(this.recordBuffer);
+	}
 	
 	private final int compareRecords(long pointer1, long pointer2) {
 		this.recordBuffer.setReadPosition(pointer1);
@@ -431,6 +436,31 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
 					return null;
 				}
 			}
+
+			@Override
+			public T next()
+			{
+				if (this.current < this.size) {
+					this.current++;
+					if (this.currentOffset > lastIndexEntryOffset) {
+						this.currentOffset = 0;
+						this.currentIndexSegment = sortIndex.get(++this.currentSegment);
+					}
+
+					long pointer = this.currentIndexSegment.getLong(this.currentOffset);
+					this.currentOffset += indexEntrySize;
+
+					try {
+						return getRecordFromBuffer(pointer);
+					}
+					catch (IOException ioe) {
+						throw new RuntimeException(ioe);
+					}
+				}
+				else {
+					return null;
+				}
+			}
 		};
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/ReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/ReaderIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/ReaderIterator.java
index 0d29d5e..606c50c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/ReaderIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/ReaderIterator.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.network.api.MutableReader;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate;
 import org.apache.flink.util.MutableObjectIterator;
 
 
@@ -32,9 +34,10 @@ import org.apache.flink.util.MutableObjectIterator;
  */
 public final class ReaderIterator<T> implements MutableObjectIterator<T> {
 	
-	private final MutableReader<DeserializationDelegate<T>> reader;		// the source
+	private final MutableReader reader;		// the source
 	
-	private final DeserializationDelegate<T> delegate;
+	private final ReusingDeserializationDelegate<T> reusingDelegate;
+	private final NonReusingDeserializationDelegate<T> nonReusingDelegate;
 
 	/**
 	 * Creates a new iterator, wrapping the given reader.
@@ -43,15 +46,33 @@ public final class ReaderIterator<T> implements MutableObjectIterator<T> {
 	 */
 	public ReaderIterator(MutableReader<DeserializationDelegate<T>> reader, TypeSerializer<T> serializer) {
 		this.reader = reader;
-		this.delegate = new DeserializationDelegate<T>(serializer);
+		this.reusingDelegate = new ReusingDeserializationDelegate<T>(serializer);
+		this.nonReusingDelegate = new NonReusingDeserializationDelegate<T>(serializer);
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	public T next(T target) throws IOException {
-		this.delegate.setInstance(target);
+		this.reusingDelegate.setInstance(target);
 		try {
-			if (this.reader.next(this.delegate)) {
-				return this.delegate.getInstance();
+			if (this.reader.next(this.reusingDelegate)) {
+				return this.reusingDelegate.getInstance();
+			} else {
+				return null;
+			}
+
+		}
+		catch (InterruptedException e) {
+			throw new IOException("Reader interrupted.", e);
+		}
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public T next() throws IOException {
+		try {
+			if (this.reader.next(this.nonReusingDelegate)) {
+				return this.nonReusingDelegate.getInstance();
 			} else {
 				return null;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/DeserializationDelegate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/DeserializationDelegate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/DeserializationDelegate.java
index fbe91ed..9ca5954 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/DeserializationDelegate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/DeserializationDelegate.java
@@ -15,45 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
 package org.apache.flink.runtime.plugable;
 
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-
-public class DeserializationDelegate<T> implements IOReadableWritable {
-	
-	private T instance;
-	
-	private final TypeSerializer<T> serializer;
-	
-
-	public DeserializationDelegate(TypeSerializer<T> serializer) {
-		this.serializer = serializer;
-	}
-
-	
-	public void setInstance(T instance) {
-		this.instance = instance;
-	}
-
-	public T getInstance() {
-		return instance;
-	}
 
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		throw new IllegalStateException("Serialization method called on DeserializationDelegate.");
-	}
+public interface DeserializationDelegate<T> extends IOReadableWritable {
+	void setInstance(T instance);
 
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.instance = this.serializer.deserialize(this.instance, in);
-	}
+	T getInstance();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/NonReusingDeserializationDelegate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/NonReusingDeserializationDelegate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/NonReusingDeserializationDelegate.java
new file mode 100644
index 0000000..859f354
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/NonReusingDeserializationDelegate.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.plugable;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+
+public class NonReusingDeserializationDelegate<T> implements DeserializationDelegate<T> {
+
+	private T instance;
+
+	private final TypeSerializer<T> serializer;
+
+
+	public NonReusingDeserializationDelegate(TypeSerializer<T> serializer) {
+		this.serializer = serializer;
+	}
+	
+	public void setInstance(T instance) {
+		this.instance = instance;
+	}
+
+	public T getInstance() {
+		return instance;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		throw new IllegalStateException("Serialization method called on DeserializationDelegate.");
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.instance = this.serializer.deserialize(in);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/ReusingDeserializationDelegate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/ReusingDeserializationDelegate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/ReusingDeserializationDelegate.java
new file mode 100644
index 0000000..f3c254b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/ReusingDeserializationDelegate.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.plugable;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+
+public class ReusingDeserializationDelegate<T> implements DeserializationDelegate<T> {
+	
+	private T instance;
+	
+	private final TypeSerializer<T> serializer;
+	
+
+	public ReusingDeserializationDelegate(TypeSerializer<T> serializer) {
+		this.serializer = serializer;
+	}
+	
+	@Override
+	public void setInstance(T instance) {
+		this.instance = instance;
+	}
+
+	@Override
+	public T getInstance() {
+		return instance;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		throw new IllegalStateException("Serialization method called on DeserializationDelegate.");
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.instance = this.serializer.deserialize(this.instance, in);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyMutableObjectIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyMutableObjectIterator.java
index 71fb30f..12ae5c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyMutableObjectIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyMutableObjectIterator.java
@@ -54,4 +54,15 @@ public final class EmptyMutableObjectIterator<E> implements MutableObjectIterato
 	public E next(E target) {
 		return null;
 	}
+
+	/**
+	 * Always returns null.
+	 *
+	 * @see MutableObjectIterator#next()
+	 */
+	@Override
+	public E next() {
+		return null;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java
index 88097cd..c139aca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java
@@ -167,5 +167,42 @@ public final class KeyGroupedMutableObjectIterator<E> {
 					ioex.getMessage(), ioex);
 			}
 		}
+
+		@Override
+		public E next()
+		{
+			if (KeyGroupedMutableObjectIterator.this.next == null || KeyGroupedMutableObjectIterator.this.nextIsFresh) {
+				return null;
+			}
+			if (this.nextIsUnconsumed) {
+				return this.serializer.copy(KeyGroupedMutableObjectIterator.this.next);
+			}
+
+			E result = null;
+			try {
+				if ((result = KeyGroupedMutableObjectIterator.this.iterator.next()) != null) {
+					// check whether the keys are equal
+					if (!this.comparator.equalToReference(result)) {
+						// moved to the next key, no more values here
+						KeyGroupedMutableObjectIterator.this.next =
+								this.serializer.copy(result);
+						KeyGroupedMutableObjectIterator.this.nextIsFresh = true;
+						return null;
+					}
+					// same key, next value is in "next"
+					return result;
+				}
+				else {
+					// backing iterator is consumed
+					KeyGroupedMutableObjectIterator.this.next = null;
+					return null;
+				}
+			}
+			catch (IOException ioex) {
+				throw new RuntimeException("An error occurred while reading the next record: " +
+						ioex.getMessage(), ioex);
+			}
+		}
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/main/java/org/apache/flink/runtime/util/RegularToMutableObjectIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/RegularToMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/RegularToMutableObjectIterator.java
index f2fea80..8eb17c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/RegularToMutableObjectIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/RegularToMutableObjectIterator.java
@@ -47,4 +47,13 @@ public class RegularToMutableObjectIterator<T> implements MutableObjectIterator<
 			return null;
 		}
 	}
+
+	@Override
+	public T next() {
+		if (this.iterator.hasNext()) {
+			return this.serializer.copy(this.iterator.next());
+		} else {
+			return null;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index 877c5cc..27ece69 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -1433,6 +1433,20 @@ public class HashTableITCase {
 				return null;
 			}
 		}
+
+		@Override
+		public Record next() {
+			if (this.numLeft > 0) {
+				this.numLeft--;
+				Record result = new Record(2);
+				result.setField(0, this.key);
+				result.setField(1, this.value);
+				return result;
+			}
+			else {
+				return null;
+			}
+		}
 	}
 	
 	// ============================================================================================
@@ -1466,6 +1480,22 @@ public class HashTableITCase {
 				return null;
 			}
 		}
+
+		@Override
+		public IntPair next() {
+			if (this.numLeft > 0) {
+				this.numLeft--;
+
+				IntPair result = new IntPair();
+				result.setKey(this.key);
+				result.setValue(this.value);
+				return result;
+			}
+			else {
+				return null;
+			}
+		}
+
 	}
 	
 	// ============================================================================================

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
index 9dec847..8db9934 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
@@ -274,6 +274,11 @@ public class MassiveStringSortingITCase {
 		public String next(String reuse) throws IOException {
 			return reader.readLine();
 		}
+
+		@Override
+		public String next() throws IOException {
+			return reader.readLine();
+		}
 	}
 	
 	private static final class StringTupleReaderMutableObjectIterator implements MutableObjectIterator<Tuple2<String, String[]>> {
@@ -296,6 +301,20 @@ public class MassiveStringSortingITCase {
 			reuse.f1 = parts;
 			return reuse;
 		}
+
+		@Override
+		public Tuple2<String, String[]> next() throws IOException {
+			String line = reader.readLine();
+			if (line == null) {
+				return null;
+			}
+
+			String[] parts = line.split(" ");
+			Tuple2<String, String[]> result = new Tuple2<String, String[]>();
+			result.f0 = parts[0];
+			result.f1 = parts;
+			return result;
+		}
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
index 7d681d8..c221456 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
@@ -77,6 +77,23 @@ public class MergeIteratorTest
 					return null;
 				}
 			}
+
+			@Override
+			public Record next()
+			{
+				if (current < keys.length) {
+					key.setKey(keys[current]);
+					value.setValue(values[current]);
+					current++;
+					Record result = new Record(2);
+					result.setField(0, key);
+					result.setField(1, value);
+					return result;
+				}
+				else {
+					return null;
+				}
+			}
 		};
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MockRecordReader.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MockRecordReader.java
index 738e7fc..e5b8ed0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MockRecordReader.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MockRecordReader.java
@@ -65,6 +65,32 @@ public class MockRecordReader implements MutableObjectIterator<Record> {
 		}
 	}
 
+	@Override
+	public Record next() {
+		Record r = null;
+		while (r == null) {
+			try {
+				r = queue.take();
+			} catch (InterruptedException iex) {
+				throw new RuntimeException("Reader was interrupted.");
+			}
+		}
+
+		if (r == SENTINEL) {
+			// put the sentinel back, to ensure that repeated calls do not block
+			try {
+				queue.put(r);
+			} catch (InterruptedException e) {
+				throw new RuntimeException("Reader was interrupted.");
+			}
+			return null;
+		} else {
+			Record result = new Record(r.getNumFields());
+			r.copyTo(result);
+			return result;
+		}
+	}
+
 	public void emit(Record element) throws InterruptedException {
 		queue.put(element.createCopy());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingInfinitiveInputIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingInfinitiveInputIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingInfinitiveInputIterator.java
index 7e61a18..f21d53f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingInfinitiveInputIterator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingInfinitiveInputIterator.java
@@ -37,5 +37,13 @@ public class DelayingInfinitiveInputIterator extends InfiniteInputIterator
 		catch (InterruptedException e) { }
 		return super.next(reuse);
 	}
-	
+
+	@Override
+	public Record next() {
+		try {
+			Thread.sleep(delay);
+		}
+		catch (InterruptedException e) { }
+		return super.next();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteInputIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteInputIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteInputIterator.java
index 2ae103b..5c10f5e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteInputIterator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteInputIterator.java
@@ -37,4 +37,13 @@ public class InfiniteInputIterator implements MutableObjectIterator<Record>
 		reuse.setField(1, val2);
 		return reuse;
 	}
+
+	@Override
+	public Record next() {
+		Record result = new Record(2);
+		result.setField(0, val1);
+		result.setField(1, val2);
+		return result;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MutableObjectIteratorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MutableObjectIteratorWrapper.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MutableObjectIteratorWrapper.java
index 8a01f12..88f16fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MutableObjectIteratorWrapper.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MutableObjectIteratorWrapper.java
@@ -50,4 +50,17 @@ public class MutableObjectIteratorWrapper implements MutableObjectIterator<Recor
 		}
 	}
 
+	@Override
+	public Record next() throws IOException {
+		if (this.source.hasNext()) {
+			Record result = new Record();
+			this.source.next().copyTo(result);
+			return result;
+		}
+		else {
+			return null;
+		}
+	}
+
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/RandomIntPairGenerator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/RandomIntPairGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/RandomIntPairGenerator.java
index 57b5351..48a512c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/RandomIntPairGenerator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/RandomIntPairGenerator.java
@@ -56,6 +56,18 @@ public class RandomIntPairGenerator implements MutableObjectIterator<IntPair>
 			return null;
 		}
 	}
+
+	@Override
+	public IntPair next() {
+		if (this.count++ < this.numRecords) {
+			IntPair result = new IntPair();
+			result.setKey(this.rnd.nextInt());
+			result.setValue(this.rnd.nextInt());
+			return result;
+		} else {
+			return null;
+		}
+	}
 	
 	public void reset() {
 		this.rnd = new Random(this.seed);

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
index c28e542..5fe1303 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
@@ -181,6 +181,17 @@ public final class TestData {
 			return reuse;
 		}
 
+		public Record next() {
+			this.key.setKey(keyMode == KeyMode.SORTED ? ++counter : Math.abs(random.nextInt() % keyMax) + 1);
+			if (this.valueMode != ValueMode.CONSTANT) {
+				this.value.setValue(randomString());
+			}
+			Record result = new Record(2);
+			result.setField(0, this.key);
+			result.setField(1, this.value);
+			return result;
+		}
+
 		public boolean next(org.apache.flink.types.Value[] target) {
 			this.key.setKey(keyMode == KeyMode.SORTED ? ++counter : Math.abs(random.nextInt() % keyMax) + 1);
 			// TODO change this to something proper
@@ -264,6 +275,17 @@ public final class TestData {
 				return null;
 			}
 		}
+
+		@Override
+		public Record next() {
+			if (counter < numberOfRecords) {
+				counter++;
+				return generator.next();
+			}
+			else {
+				return null;
+			}
+		}
 		
 		public void reset() {
 			this.counter = 0;
@@ -306,7 +328,23 @@ public final class TestData {
 				return null;
 			}
 		}
-		
+
+		@Override
+		public Record next() {
+			if (pos < this.numPairs) {
+				this.value.setValue(this.valueValue + ' ' + pos);
+				Record result = new Record(2);
+				result.setField(0, this.key);
+				result.setField(1, this.value);
+				pos++;
+				return result;
+			}
+			else {
+				return null;
+			}
+		}
+
+
 		public void reset() {
 			this.pos = 0;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntPairGenerator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntPairGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntPairGenerator.java
index 5d820b9..a7e2a7c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntPairGenerator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntPairGenerator.java
@@ -67,4 +67,37 @@ public class UniformIntPairGenerator implements MutableObjectIterator<IntPair>
 		
 		return target;
 	}
+
+	@Override
+	public IntPair next() {
+		IntPair result = new IntPair();
+		if(!repeatKey) {
+			if(valCnt >= numVals) {
+				return null;
+			}
+
+			result.setKey(keyCnt++);
+			result.setValue(valCnt);
+
+			if(keyCnt == numKeys) {
+				keyCnt = 0;
+				valCnt++;
+			}
+		} else {
+			if(keyCnt >= numKeys) {
+				return null;
+			}
+
+			result.setKey(keyCnt);
+			result.setValue(valCnt++);
+
+			if(valCnt == numVals) {
+				valCnt = 0;
+				keyCnt++;
+			}
+		}
+
+		return result;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformRecordGenerator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformRecordGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformRecordGenerator.java
index 8e35053..b628f05 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformRecordGenerator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformRecordGenerator.java
@@ -81,4 +81,38 @@ public class UniformRecordGenerator implements MutableObjectIterator<Record> {
 		reuse.updateBinaryRepresenation();
 		return reuse;
 	}
+
+	@Override
+	public Record next() {
+		if(!repeatKey) {
+			if(valCnt >= numVals+startVal) {
+				return null;
+			}
+
+			key.setValue(keyCnt++);
+			value.setValue(valCnt);
+
+			if(keyCnt == numKeys+startKey) {
+				keyCnt = startKey;
+				valCnt++;
+			}
+		} else {
+			if(keyCnt >= numKeys+startKey) {
+				return null;
+			}
+			key.setValue(keyCnt);
+			value.setValue(valCnt++);
+
+			if(valCnt == numVals+startVal) {
+				valCnt = startVal;
+				keyCnt++;
+			}
+		}
+
+		Record result = new Record(2);
+		result.setField(0, this.key);
+		result.setField(1, this.value);
+		result.updateBinaryRepresenation();
+		return result;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformStringPairGenerator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformStringPairGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformStringPairGenerator.java
index ef697a8..45a44fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformStringPairGenerator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformStringPairGenerator.java
@@ -69,4 +69,37 @@ public class UniformStringPairGenerator implements MutableObjectIterator<StringP
 		return target;
 	}
 
+	@Override
+	public StringPair next() throws IOException {
+		StringPair result = new StringPair();
+		if(!repeatKey) {
+			if(valCnt >= numVals) {
+				return null;
+			}
+
+			result.setKey(Integer.toString(keyCnt++));
+			result.setValue(Integer.toBinaryString(valCnt));
+
+			if(keyCnt == numKeys) {
+				keyCnt = 0;
+				valCnt++;
+			}
+		} else {
+			if(keyCnt >= numKeys) {
+				return null;
+			}
+
+			result.setKey(Integer.toString(keyCnt));
+			result.setValue(Integer.toBinaryString(valCnt++));
+
+			if(valCnt == numVals) {
+				valCnt = 0;
+				keyCnt++;
+			}
+		}
+
+		return result;
+	}
+
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java
index 824e96b..3a76ebd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java
@@ -57,4 +57,22 @@ public class UnionIterator<E> implements MutableObjectIterator<E>
 			}
 		}
 	}
+
+	@Override
+	public E next() throws IOException
+	{
+		E targetStaging = this.currentSource.next();
+		if (targetStaging != null) {
+			return targetStaging;
+		} else {
+			if (this.nextSources.size() > 0) {
+				this.currentSource = this.nextSources.remove(0);
+				return next();
+			}
+			else {
+				return null;
+			}
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java
index 54a1492..3d1a80b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java
@@ -78,6 +78,20 @@ public class KeyGroupedIteratorImmutableTest {
 					return null;
 				}
 			}
+
+			@Override
+			public Record next() throws IOException {
+				if (it.hasNext()) {
+					IntStringPair pair = it.next();
+					Record result = new Record(2);
+					result.setField(0, pair.getInteger());
+					result.setField(1, pair.getString());
+					return result;
+				}
+				else {
+					return null;
+				}
+			}
 		};
 		
 		final RecordSerializer serializer = RecordSerializer.get();

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
index 85ca9a9..39ff077 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java
@@ -79,6 +79,21 @@ public class KeyGroupedIteratorTest {
 					return null;
 				}
 			}
+
+			@Override
+			public Record next() throws IOException {
+				if (it.hasNext()) {
+					IntStringPair pair = it.next();
+					Record result = new Record(2);
+					result.setField(0, pair.getInteger());
+					result.setField(1, pair.getString());
+					return result;
+				}
+				else {
+					return null;
+				}
+			}
+
 		};
 		
 		final RecordSerializer serializer = RecordSerializer.get();

http://git-wip-us.apache.org/repos/asf/flink/blob/3832d7b7/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
index a34c7d8..7c6bcaf 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
@@ -216,6 +216,16 @@ class StringTupleReader(val reader: BufferedReader) extends MutableObjectIterato
     val parts = line.split(" ")
     StringTuple(parts(0), parts(1), parts)
   }
+
+  override def next(): StringTuple = {
+    val line = reader.readLine()
+    if (line == null) {
+      return null
+    }
+    val parts = line.split(" ")
+    StringTuple(parts(0), parts(1), parts)
+  }
+
 }
 
 class DummyInvokable extends AbstractInvokable {


Mime
View raw message