flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [02/13] flink git commit: [FLINK-1285] Make Merge-Join aware of object-reuse setting
Date Thu, 08 Jan 2015 10:58:57 GMT
[FLINK-1285] Make Merge-Join aware of object-reuse setting

This closes #259


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d529749c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d529749c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d529749c

Branch: refs/heads/master
Commit: d529749c8f45af693efffe1f69860dae0bfe70bf
Parents: b7b32a0
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Thu Dec 11 14:58:23 2014 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Jan 7 19:16:10 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/operators/MatchDriver.java    |   7 +-
 .../resettable/BlockResettableIterator.java     | 210 ---------
 .../NonReusingBlockResettableIterator.java      | 204 +++++++++
 .../ReusingBlockResettableIterator.java         | 100 +++++
 .../operators/sort/MergeMatchIterator.java      | 429 ------------------
 .../sort/NonReusingMergeMatchIterator.java      | 424 ++++++++++++++++++
 .../sort/ReusingMergeMatchIterator.java         | 435 +++++++++++++++++++
 .../resettable/BlockResettableIteratorTest.java | 202 ---------
 .../NonReusingBlockResettableIteratorTest.java  | 201 +++++++++
 .../ReusingBlockResettableIteratorTest.java     | 201 +++++++++
 .../NonReusingSortMergeMatchIteratorITCase.java | 371 ++++++++++++++++
 .../ReusingSortMergeMatchIteratorITCase.java    | 371 ++++++++++++++++
 .../sort/SortMergeMatchIteratorITCase.java      | 373 ----------------
 .../operators/util/HashVsSortMiniBenchmark.java |   6 +-
 14 files changed, 2314 insertions(+), 1220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
index 2d051ad..f8e4a29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
@@ -22,6 +22,7 @@ package org.apache.flink.runtime.operators;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
 import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
+import org.apache.flink.runtime.operators.sort.NonReusingMergeMatchIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
@@ -32,7 +33,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
 import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
-import org.apache.flink.runtime.operators.sort.MergeMatchIterator;
+import org.apache.flink.runtime.operators.sort.ReusingMergeMatchIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
@@ -125,7 +126,7 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT
 		if (this.objectReuseEnabled) {
 			switch (ls) {
 				case MERGE:
-					this.matchIterator = new MergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
+					this.matchIterator = new ReusingMergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
 
 					break;
 				case HYBRIDHASH_BUILD_FIRST:
@@ -140,7 +141,7 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT
 		} else {
 			switch (ls) {
 				case MERGE:
-					this.matchIterator = new MergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
+					this.matchIterator = new NonReusingMergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
 
 					break;
 				case HYBRIDHASH_BUILD_FIRST:

http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableIterator.java
deleted file mode 100644
index 0019c8c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableIterator.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.resettable;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.util.ResettableIterator;
-
-/**
- * Implementation of an iterator that fetches a block of data into main memory and offers resettable
- * access to the data in that block.
- * 
- */
-public class BlockResettableIterator<T> extends AbstractBlockResettableIterator<T> implements ResettableIterator<T> {
-	
-	public static final Logger LOG = LoggerFactory.getLogger(BlockResettableIterator.class);
-	
-	// ------------------------------------------------------------------------
-	
-	protected Iterator<T> input;
-	
-	private T nextElement;
-
-	private final T reuseElement;
-	
-	private T leftOverElement;
-	
-	private boolean readPhase;
-	
-	private boolean noMoreBlocks;
-	
-	// ------------------------------------------------------------------------
-	
-	public BlockResettableIterator(MemoryManager memoryManager, Iterator<T> input,
-			TypeSerializer<T> serializer, int numPages, AbstractInvokable ownerTask)
-	throws MemoryAllocationException
-	{
-		this(memoryManager, serializer, numPages, ownerTask);
-		this.input = input;
-	}
-	
-	public BlockResettableIterator(MemoryManager memoryManager,
-			TypeSerializer<T> serializer, int numPages, AbstractInvokable ownerTask)
-	throws MemoryAllocationException
-	{
-		super(serializer, memoryManager, numPages, ownerTask);
-		
-		this.reuseElement = serializer.createInstance();
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	public void reopen(Iterator<T> input) throws IOException {
-		this.input = input;
-		
-		this.noMoreBlocks = false;
-		this.closed = false;
-		
-		nextBlock();
-	}
-	
-	
-
-	@Override
-	public boolean hasNext() {
-		try {
-			if (this.nextElement == null) {
-				if (this.readPhase) {
-					// read phase, get next element from buffer
-					T tmp = getNextRecord(this.reuseElement);
-					if (tmp != null) {
-						this.nextElement = tmp;
-						return true;
-					} else {
-						return false;
-					}
-				} else {
-					if (this.input.hasNext()) {
-						final T next = this.input.next();
-						if (writeNextRecord(next)) {
-							this.nextElement = next;
-							return true;
-						} else {
-							this.leftOverElement = next;
-							return false;
-						}
-					} else {
-						this.noMoreBlocks = true;
-						return false;
-					}
-				}
-			} else {
-				return true;
-			}
-		} catch (IOException ioex) {
-			throw new RuntimeException("Error (de)serializing record in block resettable iterator.", ioex);
-		}
-	}
-	
-
-	@Override
-	public T next() {
-		if (this.nextElement == null) {
-			if (!hasNext()) {
-				throw new NoSuchElementException();
-			}
-		}
-		
-		T out = this.nextElement;
-		this.nextElement = null;
-		return out;
-	}
-	
-
-	@Override
-	public void remove() {
-		throw new UnsupportedOperationException();
-	}
-	
-
-	public void reset() {
-		// a reset always goes to the read phase
-		this.readPhase = true;
-		super.reset();
-	}
-	
-
-	@Override
-	public boolean nextBlock() throws IOException {
-		// check the state
-		if (this.closed) {
-			throw new IllegalStateException("Iterator has been closed.");
-		}
-		
-		// check whether more blocks are available
-		if (this.noMoreBlocks) {
-			return false;
-		}
-		
-		// reset the views in the superclass
-		super.nextBlock();
-		
-		T next = this.leftOverElement;
-		this.leftOverElement = null;
-		if (next == null) {
-			if (this.input.hasNext()) {
-				next = this.input.next();
-			}
-			else {
-				this.noMoreBlocks = true;
-				return false;
-			}
-		}
-		
-		// write the leftover record
-		if (!writeNextRecord(next)) {
-			throw new IOException("BlockResettableIterator could not serialize record into fresh memory block: " +
-					"Record is too large.");
-		}
-		
-		this.nextElement = next;
-		this.readPhase = false;
-		
-		return true;
-	}
-	
-	/**
-	 * Checks, whether the input that is blocked by this iterator, has further elements
-	 * available. This method may be used to forecast (for example at the point where a
-	 * block is full) whether there will be more data (possibly in another block).
-	 * 
-	 * @return True, if there will be more data, false otherwise.
-	 */
-	public boolean hasFurtherInput() {
-		return !this.noMoreBlocks; 
-	}
-	
-
-	public void close() {
-		// suggest that we are in the read phase. because nothing is in the current block,
-		// read requests will fail
-		this.readPhase = true;
-		super.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIterator.java
new file mode 100644
index 0000000..9d581ce
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIterator.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.resettable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.util.ResettableIterator;
+
+/**
+ * Implementation of an iterator that fetches a block of data into main memory and offers resettable
+ * access to the data in that block.
+ * 
+ */
+public class NonReusingBlockResettableIterator<T> extends AbstractBlockResettableIterator<T> implements ResettableIterator<T> {
+	
+	public static final Logger LOG = LoggerFactory.getLogger(NonReusingBlockResettableIterator.class);
+	
+	// ------------------------------------------------------------------------
+	
+	protected Iterator<T> input;
+	
+	protected T nextElement;
+
+	protected T leftOverElement;
+	
+	protected boolean readPhase;
+	
+	protected boolean noMoreBlocks;
+	
+	// ------------------------------------------------------------------------
+	
+	public NonReusingBlockResettableIterator(MemoryManager memoryManager, Iterator<T> input,
+			TypeSerializer<T> serializer, int numPages,
+			AbstractInvokable ownerTask)
+	throws MemoryAllocationException
+	{
+		this(memoryManager, serializer, numPages, ownerTask);
+		this.input = input;
+	}
+	
+	public NonReusingBlockResettableIterator(MemoryManager memoryManager, TypeSerializer<T> serializer, int numPages, AbstractInvokable ownerTask)
+	throws MemoryAllocationException
+	{
+		super(serializer, memoryManager, numPages, ownerTask);
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	public void reopen(Iterator<T> input) throws IOException {
+		this.input = input;
+		
+		this.noMoreBlocks = false;
+		this.closed = false;
+		
+		nextBlock();
+	}
+
+	@Override
+	public boolean hasNext() {
+		try {
+			if (this.nextElement == null) {
+				if (this.readPhase) {
+					// read phase, get next element from buffer
+					T tmp = getNextRecord();
+					if (tmp != null) {
+						this.nextElement = tmp;
+						return true;
+					} else {
+						return false;
+					}
+				} else {
+					if (this.input.hasNext()) {
+						final T next = this.input.next();
+						if (writeNextRecord(next)) {
+							this.nextElement = next;
+							return true;
+						} else {
+							this.leftOverElement = next;
+							return false;
+						}
+					} else {
+						this.noMoreBlocks = true;
+						return false;
+					}
+				}
+			} else {
+				return true;
+			}
+		} catch (IOException ioex) {
+			throw new RuntimeException("Error (de)serializing record in block resettable iterator.", ioex);
+		}
+	}
+	
+
+	@Override
+	public T next() {
+		if (this.nextElement == null) {
+			if (!hasNext()) {
+				throw new NoSuchElementException();
+			}
+		}
+		
+		T out = this.nextElement;
+		this.nextElement = null;
+		return out;
+	}
+	
+
+	@Override
+	public void remove() {
+		throw new UnsupportedOperationException();
+	}
+	
+
+	public void reset() {
+		// a reset always goes to the read phase
+		this.readPhase = true;
+		super.reset();
+	}
+	
+
+	@Override
+	public boolean nextBlock() throws IOException {
+		// check the state
+		if (this.closed) {
+			throw new IllegalStateException("Iterator has been closed.");
+		}
+		
+		// check whether more blocks are available
+		if (this.noMoreBlocks) {
+			return false;
+		}
+		
+		// reset the views in the superclass
+		super.nextBlock();
+		
+		T next = this.leftOverElement;
+		this.leftOverElement = null;
+		if (next == null) {
+			if (this.input.hasNext()) {
+				next = this.input.next();
+			}
+			else {
+				this.noMoreBlocks = true;
+				return false;
+			}
+		}
+		
+		// write the leftover record
+		if (!writeNextRecord(next)) {
+			throw new IOException("BlockResettableIterator could not serialize record into fresh memory block: " +
+					"Record is too large.");
+		}
+		
+		this.nextElement = next;
+		this.readPhase = false;
+		
+		return true;
+	}
+	
+	/**
+	 * Checks, whether the input that is blocked by this iterator, has further elements
+	 * available. This method may be used to forecast (for example at the point where a
+	 * block is full) whether there will be more data (possibly in another block).
+	 * 
+	 * @return True, if there will be more data, false otherwise.
+	 */
+	public boolean hasFurtherInput() {
+		return !this.noMoreBlocks; 
+	}
+	
+
+	public void close() {
+		// suggest that we are in the read phase. because nothing is in the current block,
+		// read requests will fail
+		this.readPhase = true;
+		super.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/ReusingBlockResettableIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/ReusingBlockResettableIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/ReusingBlockResettableIterator.java
new file mode 100644
index 0000000..baa0fb2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/ReusingBlockResettableIterator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.resettable;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Implementation of an iterator that fetches a block of data into main memory and offers resettable
+ * access to the data in that block.
+ * 
+ */
+public class ReusingBlockResettableIterator<T> extends NonReusingBlockResettableIterator<T> {
+
+	public static final Logger LOG = LoggerFactory.getLogger(ReusingBlockResettableIterator.class);
+
+	private final T reuseElement;
+
+	// ------------------------------------------------------------------------
+
+	public ReusingBlockResettableIterator(MemoryManager memoryManager, Iterator<T> input,
+			TypeSerializer<T> serializer, int numPages,
+			AbstractInvokable ownerTask)
+	throws MemoryAllocationException
+	{
+		this(memoryManager, serializer, numPages, ownerTask);
+		this.input = input;
+	}
+
+	public ReusingBlockResettableIterator(MemoryManager memoryManager, TypeSerializer<T>
+			serializer, int numPages, AbstractInvokable ownerTask)
+	throws MemoryAllocationException
+	{
+		super(memoryManager, serializer, numPages, ownerTask);
+		
+		this.reuseElement = serializer.createInstance();
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public boolean hasNext() {
+		try {
+			if (this.nextElement == null) {
+				if (this.readPhase) {
+					// read phase, get next element from buffer
+					T tmp = getNextRecord(this.reuseElement);
+					if (tmp != null) {
+						this.nextElement = tmp;
+						return true;
+					} else {
+						return false;
+					}
+				} else {
+					if (this.input.hasNext()) {
+						final T next = this.input.next();
+						if (writeNextRecord(next)) {
+							this.nextElement = next;
+							return true;
+						} else {
+							this.leftOverElement = next;
+							return false;
+						}
+					} else {
+						this.noMoreBlocks = true;
+						return false;
+					}
+				}
+			} else {
+				return true;
+			}
+		} catch (IOException ioex) {
+			throw new RuntimeException("Error (de)serializing record in block resettable iterator.", ioex);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
deleted file mode 100644
index 675758a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
+++ /dev/null
@@ -1,429 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.resettable.BlockResettableIterator;
-import org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
-import org.apache.flink.runtime.operators.util.JoinTaskIterator;
-import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-
-/**
- * An implementation of the {@link JoinTaskIterator} that realizes the
- * matching through a sort-merge join strategy.
- */
-public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O> {
-	
-	/**
-	 * The log used by this iterator to log messages.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(MergeMatchIterator.class);
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private TypePairComparator<T1, T2> comp;
-	
-	private ReusingKeyGroupedIterator<T1> iterator1;
-
-	private ReusingKeyGroupedIterator<T2> iterator2;
-	
-	private final TypeSerializer<T1> serializer1;
-	
-	private final TypeSerializer<T2> serializer2;
-	
-	private T1 copy1;
-	
-	private T1 spillHeadCopy;
-	
-	private T2 copy2;
-	
-	private T2 blockHeadCopy;
-	
-	private final BlockResettableIterator<T2> blockIt;				// for N:M cross products with same key
-	
-	private final List<MemorySegment> memoryForSpillingIterator;
-	
-	private final MemoryManager memoryManager;
-
-	private final IOManager ioManager;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public MergeMatchIterator(MutableObjectIterator<T1> input1, MutableObjectIterator<T2> input2,
-			TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
-			TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2, TypePairComparator<T1, T2> pairComparator,
-			MemoryManager memoryManager, IOManager ioManager, int numMemoryPages, AbstractInvokable parentTask)
-	throws MemoryAllocationException
-	{
-		if (numMemoryPages < 2) {
-			throw new IllegalArgumentException("Merger needs at least 2 memory pages.");
-		}
-		
-		this.comp = pairComparator;
-		this.serializer1 = serializer1;
-		this.serializer2 = serializer2;
-		
-		this.copy1 = serializer1.createInstance();
-		this.spillHeadCopy = serializer1.createInstance();
-		this.copy2 = serializer2.createInstance();
-		this.blockHeadCopy = serializer2.createInstance();
-		
-		this.memoryManager = memoryManager;
-		this.ioManager = ioManager;
-		
-		this.iterator1 = new ReusingKeyGroupedIterator<T1>(input1, this.serializer1, comparator1.duplicate());
-		this.iterator2 = new ReusingKeyGroupedIterator<T2>(input2, this.serializer2, comparator2.duplicate());
-		
-		final int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1;
-		this.blockIt = new BlockResettableIterator<T2>(this.memoryManager, this.serializer2,
-			(numMemoryPages - numPagesForSpiller), parentTask);
-		this.memoryForSpillingIterator = memoryManager.allocatePages(parentTask, numPagesForSpiller);
-	}
-
-
-	@Override
-	public void open() throws IOException {}
-
-
-	@Override
-	public void close() {
-		if (this.blockIt != null) {
-			try {
-				this.blockIt.close();
-			}
-			catch (Throwable t) {
-				LOG.error("Error closing block memory iterator: " + t.getMessage(), t);
-			}
-		}
-		
-		this.memoryManager.release(this.memoryForSpillingIterator);
-	}
-	
-
-	@Override
-	public void abort() {
-		close();
-	}
-
-	/**
-	 * Calls the <code>JoinFunction#match()</code> method for all two key-value pairs that share the same key and come 
-	 * from different inputs. The output of the <code>match()</code> method is forwarded.
-	 * <p>
-	 * This method first zig-zags between the two sorted inputs in order to find a common
-	 * key, and then calls the match stub with the cross product of the values.
-	 * 
-	 * @throws Exception Forwards all exceptions from the user code and the I/O system.
-	 * 
-	 * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(FlatJoinFunction, Collector)
-	 */
-	@Override
-	public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
-	throws Exception
-	{
-		if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
-			// consume all remaining keys (hack to prevent remaining inputs during iterations, lets get rid of this soon)
-			while (this.iterator1.nextKey());
-			while (this.iterator2.nextKey());
-			
-			return false;
-		}
-
-		final TypePairComparator<T1, T2> comparator = this.comp;
-		comparator.setReference(this.iterator1.getCurrent());
-		T2 current2 = this.iterator2.getCurrent();
-				
-		// zig zag
-		while (true) {
-			// determine the relation between the (possibly composite) keys
-			final int comp = comparator.compareToReference(current2);
-			
-			if (comp == 0) {
-				break;
-			}
-			
-			if (comp < 0) {
-				if (!this.iterator2.nextKey()) {
-					return false;
-				}
-				current2 = this.iterator2.getCurrent();
-			}
-			else {
-				if (!this.iterator1.nextKey()) {
-					return false;
-				}
-				comparator.setReference(this.iterator1.getCurrent());
-			}
-		}
-		
-		// here, we have a common key! call the match function with the cross product of the
-		// values
-		final ReusingKeyGroupedIterator<T1>.ValuesIterator values1 = this.iterator1.getValues();
-		final ReusingKeyGroupedIterator<T2>.ValuesIterator values2 = this.iterator2.getValues();
-		
-		final T1 firstV1 = values1.next();
-		final T2 firstV2 = values2.next();	
-			
-		final boolean v1HasNext = values1.hasNext();
-		final boolean v2HasNext = values2.hasNext();
-
-		// check if one side is already empty
-		// this check could be omitted if we put this in MatchTask.
-		// then we can derive the local strategy (with build side).
-		
-		if (v1HasNext) {
-			if (v2HasNext) {
-				// both sides contain more than one value
-				// TODO: Decide which side to spill and which to block!
-				crossMwithNValues(firstV1, values1, firstV2, values2, matchFunction, collector);
-			} else {
-				crossSecond1withNValues(firstV2, firstV1, values1, matchFunction, collector);
-			}
-		} else {
-			if (v2HasNext) {
-				crossFirst1withNValues(firstV1, firstV2, values2, matchFunction, collector);
-			} else {
-				// both sides contain only one value
-				matchFunction.join(firstV1, firstV2, collector);
-			}
-		}
-		return true;
-	}
-
-	/**
-	 * Crosses a single value from the first input with N values, all sharing a common key.
-	 * Effectively realizes a <i>1:N</i> match (join).
-	 * 
-	 * @param val1 The value form the <i>1</i> side.
-	 * @param firstValN The first of the values from the <i>N</i> side.
-	 * @param valsN Iterator over remaining <i>N</i> side values.
-	 *          
-	 * @throws Exception Forwards all exceptions thrown by the stub.
-	 */
-	private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
-			final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
-	throws Exception
-	{
-		this.copy1 = this.serializer1.copy(val1, this.copy1);
-		matchFunction.join(this.copy1, firstValN, collector);
-		
-		// set copy and match first element
-		boolean more = true;
-		do {
-			final T2 nRec = valsN.next();
-			
-			if (valsN.hasNext()) {
-				this.copy1 = this.serializer1.copy(val1, this.copy1);
-				matchFunction.join(this.copy1, nRec, collector);
-			} else {
-				matchFunction.join(val1, nRec, collector);
-				more = false;
-			}
-		}
-		while (more);
-	}
-	
-	/**
-	 * Crosses a single value from the second side with N values, all sharing a common key.
-	 * Effectively realizes a <i>N:1</i> match (join).
-	 * 
-	 * @param val1 The value form the <i>1</i> side.
-	 * @param firstValN The first of the values from the <i>N</i> side.
-	 * @param valsN Iterator over remaining <i>N</i> side values.
-	 *          
-	 * @throws Exception Forwards all exceptions thrown by the stub.
-	 */
-	private void crossSecond1withNValues(T2 val1, T1 firstValN,
-			Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector)
-	throws Exception
-	{
-		this.copy2 = this.serializer2.copy(val1, this.copy2);
-		matchFunction.join(firstValN, this.copy2, collector);
-		
-		// set copy and match first element
-		boolean more = true;
-		do {
-			final T1 nRec = valsN.next();
-			
-			if (valsN.hasNext()) {
-				this.copy2 = this.serializer2.copy(val1, this.copy2);
-				matchFunction.join(nRec,this.copy2,collector);
-			} else {
-				matchFunction.join(nRec, val1, collector);
-				more = false;
-			}
-		}
-		while (more);
-	}
-	
-	/**
-	 * @param firstV1
-	 * @param spillVals
-	 * @param firstV2
-	 * @param blockVals
-	 */
-	private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
-			final T2 firstV2, final Iterator<T2> blockVals,
-			final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
-	throws Exception
-	{
-		// ==================================================
-		// We have one first (head) element from both inputs (firstV1 and firstV2)
-		// We have an iterator for both inputs.
-		// we make the V1 side the spilling side and the V2 side the blocking side.
-		// In order to get the full cross product without unnecessary spilling, we do the
-		// following:
-		// 1) cross the heads
-		// 2) cross the head of the spilling side against the first block of the blocking side
-		// 3) cross the iterator of the spilling side with the head of the block side
-		// 4) cross the iterator of the spilling side with the first block
-		// ---------------------------------------------------
-		// If the blocking side has more than one block, we really need to make the spilling side fully
-		// resettable. For each further block on the block side, we do:
-		// 5) cross the head of the spilling side with the next block
-		// 6) cross the spilling iterator with the next block.
-		
-		// match the first values first
-		this.copy1 = this.serializer1.copy(firstV1, this.copy1);
-		this.blockHeadCopy = this.serializer2.copy(firstV2, this.blockHeadCopy);
-		
-		// --------------- 1) Cross the heads -------------------
-		matchFunction.join(this.copy1, firstV2, collector);
-		
-		// for the remaining values, we do a block-nested-loops join
-		SpillingResettableIterator<T1> spillIt = null;
-		
-		try {
-			// create block iterator on the second input
-			this.blockIt.reopen(blockVals);
-			
-			// ------------- 2) cross the head of the spilling side with the first block ------------------
-			while (this.blockIt.hasNext()) {
-				final T2 nextBlockRec = this.blockIt.next();
-				this.copy1 = this.serializer1.copy(firstV1, this.copy1);
-				matchFunction.join(this.copy1, nextBlockRec, collector);
-			}
-			this.blockIt.reset();
-			
-			// spilling is required if the blocked input has data beyond the current block.
-			// in that case, create the spilling iterator
-			final Iterator<T1> leftSideIter;
-			final boolean spillingRequired = this.blockIt.hasFurtherInput();
-			if (spillingRequired)
-			{
-				// more data than would fit into one block. we need to wrap the other side in a spilling iterator
-				// create spilling iterator on first input
-				spillIt = new SpillingResettableIterator<T1>(spillVals, this.serializer1,
-						this.memoryManager, this.ioManager, this.memoryForSpillingIterator);
-				leftSideIter = spillIt;
-				spillIt.open();
-				
-				this.spillHeadCopy = this.serializer1.copy(firstV1, this.spillHeadCopy);
-			}
-			else {
-				leftSideIter = spillVals;
-			}
-			
-			// cross the values in the v1 iterator against the current block
-			
-			while (leftSideIter.hasNext()) {
-				final T1 nextSpillVal = leftSideIter.next();
-				this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1);
-				
-				
-				// -------- 3) cross the iterator of the spilling side with the head of the block side --------
-				this.copy2 = this.serializer2.copy(this.blockHeadCopy, this.copy2);
-				matchFunction.join(this.copy1, this.copy2, collector);
-				
-				// -------- 4) cross the iterator of the spilling side with the first block --------
-				while (this.blockIt.hasNext()) {
-					T2 nextBlockRec = this.blockIt.next();
-					
-					// get instances of key and block value
-					this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1);
-					matchFunction.join(this.copy1, nextBlockRec, collector);
-				}
-				// reset block iterator
-				this.blockIt.reset();
-			}
-			
-			// if everything from the block-side fit into a single block, we are done.
-			// note that in this special case, we did not create a spilling iterator at all
-			if (!spillingRequired) {
-				return;
-			}
-			
-			// here we are, because we have more blocks on the block side
-			// loop as long as there are blocks from the blocked input
-			while (this.blockIt.nextBlock())
-			{
-				// rewind the spilling iterator
-				spillIt.reset();
-				
-				// ------------- 5) cross the head of the spilling side with the next block ------------
-				while (this.blockIt.hasNext()) {
-					this.copy1 = this.serializer1.copy(this.spillHeadCopy, this.copy1);
-					final T2 nextBlockVal = blockIt.next();
-					matchFunction.join(this.copy1, nextBlockVal, collector);
-				}
-				this.blockIt.reset();
-				
-				// -------- 6) cross the spilling iterator with the next block. ------------------
-				while (spillIt.hasNext())
-				{
-					// get value from resettable iterator
-					final T1 nextSpillVal = spillIt.next();
-					// cross value with block values
-					while (this.blockIt.hasNext()) {
-						// get instances of key and block value
-						final T2 nextBlockVal = this.blockIt.next();
-						this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1);
-						matchFunction.join(this.copy1, nextBlockVal, collector);
-					}
-					
-					// reset block iterator
-					this.blockIt.reset();
-				}
-				// reset v1 iterator
-				spillIt.reset();
-			}
-		}
-		finally {
-			if (spillIt != null) {
-				this.memoryForSpillingIterator.addAll(spillIt.close());
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
new file mode 100644
index 0000000..70b6f9a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.resettable.NonReusingBlockResettableIterator;
+import org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
+import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+
+/**
+ * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * matching through a sort-merge join strategy.
+ */
+public class NonReusingMergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O> {
+
+	/**
+	 * The log used by this iterator to log messages.
+	 */
+	private static final Logger LOG = LoggerFactory.getLogger(NonReusingMergeMatchIterator.class);
+
+	// --------------------------------------------------------------------------------------------
+
+	private TypePairComparator<T1, T2> comp;
+
+	private NonReusingKeyGroupedIterator<T1> iterator1;
+
+	private NonReusingKeyGroupedIterator<T2> iterator2;
+
+	private final TypeSerializer<T1> serializer1;
+
+	private final TypeSerializer<T2> serializer2;
+
+	private final NonReusingBlockResettableIterator<T2> blockIt;				// for N:M cross products with same key
+
+	private final List<MemorySegment> memoryForSpillingIterator;
+
+	private final MemoryManager memoryManager;
+
+	private final IOManager ioManager;
+
+	// --------------------------------------------------------------------------------------------
+
+	public NonReusingMergeMatchIterator(
+			MutableObjectIterator<T1> input1,
+			MutableObjectIterator<T2> input2,
+			TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
+			TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+			TypePairComparator<T1, T2> pairComparator,
+			MemoryManager memoryManager,
+			IOManager ioManager,
+			int numMemoryPages,
+			AbstractInvokable parentTask)
+	throws MemoryAllocationException
+	{
+		if (numMemoryPages < 2) {
+			throw new IllegalArgumentException("Merger needs at least 2 memory pages.");
+		}
+
+		this.comp = pairComparator;
+		this.serializer1 = serializer1;
+		this.serializer2 = serializer2;
+
+		this.memoryManager = memoryManager;
+		this.ioManager = ioManager;
+
+		this.iterator1 = new NonReusingKeyGroupedIterator<T1>(input1, this.serializer1, comparator1.duplicate());
+		this.iterator2 = new NonReusingKeyGroupedIterator<T2>(input2, this.serializer2, comparator2.duplicate());
+
+		final int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1;
+		this.blockIt = new NonReusingBlockResettableIterator<T2>(this.memoryManager, this.serializer2,
+			(numMemoryPages - numPagesForSpiller), parentTask);
+		this.memoryForSpillingIterator = memoryManager.allocatePages(parentTask, numPagesForSpiller);
+	}
+
+
+	@Override
+	public void open() throws IOException {}
+
+
+	@Override
+	public void close() {
+		if (this.blockIt != null) {
+			try {
+				this.blockIt.close();
+			}
+			catch (Throwable t) {
+				LOG.error("Error closing block memory iterator: " + t.getMessage(), t);
+			}
+		}
+
+		this.memoryManager.release(this.memoryForSpillingIterator);
+	}
+
+
+	@Override
+	public void abort() {
+		close();
+	}
+
+	/**
+	 * Calls the <code>JoinFunction#match()</code> method for all two key-value pairs that share the same key and come
+	 * from different inputs. The output of the <code>match()</code> method is forwarded.
+	 * <p>
+	 * This method first zig-zags between the two sorted inputs in order to find a common
+	 * key, and then calls the match stub with the cross product of the values.
+	 *
+	 * @throws Exception Forwards all exceptions from the user code and the I/O system.
+	 *
+	 * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector)
+	 */
+	@Override
+	public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
+	throws Exception
+	{
+		if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
+			// consume all remaining keys (hack to prevent remaining inputs during iterations, lets get rid of this soon)
+			while (this.iterator1.nextKey());
+			while (this.iterator2.nextKey());
+			
+			return false;
+		}
+
+		final TypePairComparator<T1, T2> comparator = this.comp;
+		comparator.setReference(this.iterator1.getCurrent());
+		T2 current2 = this.iterator2.getCurrent();
+				
+		// zig zag
+		while (true) {
+			// determine the relation between the (possibly composite) keys
+			final int comp = comparator.compareToReference(current2);
+			
+			if (comp == 0) {
+				break;
+			}
+			
+			if (comp < 0) {
+				if (!this.iterator2.nextKey()) {
+					return false;
+				}
+				current2 = this.iterator2.getCurrent();
+			}
+			else {
+				if (!this.iterator1.nextKey()) {
+					return false;
+				}
+				comparator.setReference(this.iterator1.getCurrent());
+			}
+		}
+		
+		// here, we have a common key! call the match function with the cross product of the
+		// values
+		final NonReusingKeyGroupedIterator<T1>.ValuesIterator values1 = this.iterator1.getValues();
+		final NonReusingKeyGroupedIterator<T2>.ValuesIterator values2 = this.iterator2.getValues();
+		
+		final T1 firstV1 = values1.next();
+		final T2 firstV2 = values2.next();	
+			
+		final boolean v1HasNext = values1.hasNext();
+		final boolean v2HasNext = values2.hasNext();
+
+		// check if one side is already empty
+		// this check could be omitted if we put this in MatchTask.
+		// then we can derive the local strategy (with build side).
+		
+		if (v1HasNext) {
+			if (v2HasNext) {
+				// both sides contain more than one value
+				// TODO: Decide which side to spill and which to block!
+				crossMwithNValues(firstV1, values1, firstV2, values2, matchFunction, collector);
+			} else {
+				crossSecond1withNValues(firstV2, firstV1, values1, matchFunction, collector);
+			}
+		} else {
+			if (v2HasNext) {
+				crossFirst1withNValues(firstV1, firstV2, values2, matchFunction, collector);
+			} else {
+				// both sides contain only one value
+				matchFunction.join(firstV1, firstV2, collector);
+			}
+		}
+		return true;
+	}
+
+	/**
+	 * Crosses a single value from the first input with N values, all sharing a common key.
+	 * Effectively realizes a <i>1:N</i> match (join).
+	 * 
+	 * @param val1 The value form the <i>1</i> side.
+	 * @param firstValN The first of the values from the <i>N</i> side.
+	 * @param valsN Iterator over remaining <i>N</i> side values.
+	 *          
+	 * @throws Exception Forwards all exceptions thrown by the stub.
+	 */
+	private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
+			final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
+	throws Exception
+	{
+		T1 copy1 = this.serializer1.copy(val1);
+		matchFunction.join(copy1, firstValN, collector);
+		
+		// set copy and match first element
+		boolean more = true;
+		do {
+			final T2 nRec = valsN.next();
+			
+			if (valsN.hasNext()) {
+				copy1 = this.serializer1.copy(val1);
+				matchFunction.join(copy1, nRec, collector);
+			} else {
+				matchFunction.join(val1, nRec, collector);
+				more = false;
+			}
+		}
+		while (more);
+	}
+	
+	/**
+	 * Crosses a single value from the second side with N values, all sharing a common key.
+	 * Effectively realizes a <i>N:1</i> match (join).
+	 * 
+	 * @param val1 The value form the <i>1</i> side.
+	 * @param firstValN The first of the values from the <i>N</i> side.
+	 * @param valsN Iterator over remaining <i>N</i> side values.
+	 *          
+	 * @throws Exception Forwards all exceptions thrown by the stub.
+	 */
+	private void crossSecond1withNValues(T2 val1, T1 firstValN,
+			Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector)
+	throws Exception
+	{
+		T2 copy2 = this.serializer2.copy(val1);
+		matchFunction.join(firstValN, copy2, collector);
+		
+		// set copy and match first element
+		boolean more = true;
+		do {
+			final T1 nRec = valsN.next();
+			
+			if (valsN.hasNext()) {
+				copy2 = this.serializer2.copy(val1);
+				matchFunction.join(nRec, copy2, collector);
+			} else {
+				matchFunction.join(nRec, val1, collector);
+				more = false;
+			}
+		}
+		while (more);
+	}
+	
+	/**
+	 * @param firstV1
+	 * @param spillVals
+	 * @param firstV2
+	 * @param blockVals
+	 */
+	private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
+			final T2 firstV2, final Iterator<T2> blockVals,
+			final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
+	throws Exception
+	{
+		// ==================================================
+		// We have one first (head) element from both inputs (firstV1 and firstV2)
+		// We have an iterator for both inputs.
+		// we make the V1 side the spilling side and the V2 side the blocking side.
+		// In order to get the full cross product without unnecessary spilling, we do the
+		// following:
+		// 1) cross the heads
+		// 2) cross the head of the spilling side against the first block of the blocking side
+		// 3) cross the iterator of the spilling side with the head of the block side
+		// 4) cross the iterator of the spilling side with the first block
+		// ---------------------------------------------------
+		// If the blocking side has more than one block, we really need to make the spilling side fully
+		// resettable. For each further block on the block side, we do:
+		// 5) cross the head of the spilling side with the next block
+		// 6) cross the spilling iterator with the next block.
+		
+		// match the first values first
+		T1 copy1 = this.serializer1.copy(firstV1);
+		T2 blockHeadCopy = this.serializer2.copy(firstV2);
+		T1 spillHeadCopy = null;
+
+
+		// --------------- 1) Cross the heads -------------------
+		matchFunction.join(copy1, firstV2, collector);
+		
+		// for the remaining values, we do a block-nested-loops join
+		SpillingResettableIterator<T1> spillIt = null;
+		
+		try {
+			// create block iterator on the second input
+			this.blockIt.reopen(blockVals);
+			
+			// ------------- 2) cross the head of the spilling side with the first block ------------------
+			while (this.blockIt.hasNext()) {
+				final T2 nextBlockRec = this.blockIt.next();
+				copy1 = this.serializer1.copy(firstV1);
+				matchFunction.join(copy1, nextBlockRec, collector);
+			}
+			this.blockIt.reset();
+			
+			// spilling is required if the blocked input has data beyond the current block.
+			// in that case, create the spilling iterator
+			final Iterator<T1> leftSideIter;
+			final boolean spillingRequired = this.blockIt.hasFurtherInput();
+			if (spillingRequired)
+			{
+				// more data than would fit into one block. we need to wrap the other side in a spilling iterator
+				// create spilling iterator on first input
+				spillIt = new SpillingResettableIterator<T1>(spillVals, this.serializer1,
+						this.memoryManager, this.ioManager, this.memoryForSpillingIterator);
+				leftSideIter = spillIt;
+				spillIt.open();
+				
+				spillHeadCopy = this.serializer1.copy(firstV1);
+			}
+			else {
+				leftSideIter = spillVals;
+			}
+			
+			// cross the values in the v1 iterator against the current block
+			
+			while (leftSideIter.hasNext()) {
+				final T1 nextSpillVal = leftSideIter.next();
+				copy1 = this.serializer1.copy(nextSpillVal);
+				
+				
+				// -------- 3) cross the iterator of the spilling side with the head of the block side --------
+				T2 copy2 = this.serializer2.copy(blockHeadCopy);
+				matchFunction.join(copy1, copy2, collector);
+				
+				// -------- 4) cross the iterator of the spilling side with the first block --------
+				while (this.blockIt.hasNext()) {
+					T2 nextBlockRec = this.blockIt.next();
+					
+					// get instances of key and block value
+					copy1 = this.serializer1.copy(nextSpillVal);
+					matchFunction.join(copy1, nextBlockRec, collector);
+				}
+				// reset block iterator
+				this.blockIt.reset();
+			}
+			
+			// if everything from the block-side fit into a single block, we are done.
+			// note that in this special case, we did not create a spilling iterator at all
+			if (!spillingRequired) {
+				return;
+			}
+			
+			// here we are, because we have more blocks on the block side
+			// loop as long as there are blocks from the blocked input
+			while (this.blockIt.nextBlock())
+			{
+				// rewind the spilling iterator
+				spillIt.reset();
+				
+				// ------------- 5) cross the head of the spilling side with the next block ------------
+				while (this.blockIt.hasNext()) {
+					copy1 = this.serializer1.copy(spillHeadCopy);
+					final T2 nextBlockVal = blockIt.next();
+					matchFunction.join(copy1, nextBlockVal, collector);
+				}
+				this.blockIt.reset();
+				
+				// -------- 6) cross the spilling iterator with the next block. ------------------
+				while (spillIt.hasNext())
+				{
+					// get value from resettable iterator
+					final T1 nextSpillVal = spillIt.next();
+					// cross value with block values
+					while (this.blockIt.hasNext()) {
+						// get instances of key and block value
+						final T2 nextBlockVal = this.blockIt.next();
+						copy1 = this.serializer1.copy(nextSpillVal);
+						matchFunction.join(copy1, nextBlockVal, collector);
+					}
+					
+					// reset block iterator
+					this.blockIt.reset();
+				}
+				// reset v1 iterator
+				spillIt.reset();
+			}
+		}
+		finally {
+			if (spillIt != null) {
+				this.memoryForSpillingIterator.addAll(spillIt.close());
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
new file mode 100644
index 0000000..66beee1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.resettable.NonReusingBlockResettableIterator;
+import org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
+import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+
+/**
+ * An implementation of the {@link JoinTaskIterator} that realizes the
+ * matching through a sort-merge join strategy.
+ */
+public class ReusingMergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O> {
+	
+	/**
+	 * The log used by this iterator to log messages.
+	 */
+	private static final Logger LOG = LoggerFactory.getLogger(ReusingMergeMatchIterator.class);
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private TypePairComparator<T1, T2> comp;
+	
+	private ReusingKeyGroupedIterator<T1> iterator1;
+
+	private ReusingKeyGroupedIterator<T2> iterator2;
+	
+	private final TypeSerializer<T1> serializer1;
+	
+	private final TypeSerializer<T2> serializer2;
+	
+	private T1 copy1;
+	
+	private T1 spillHeadCopy;
+	
+	private T2 copy2;
+	
+	private T2 blockHeadCopy;
+	
+	private final NonReusingBlockResettableIterator<T2> blockIt;				// for N:M cross products with same key
+	
+	private final List<MemorySegment> memoryForSpillingIterator;
+	
+	private final MemoryManager memoryManager;
+
+	private final IOManager ioManager;
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public ReusingMergeMatchIterator(
+			MutableObjectIterator<T1> input1,
+			MutableObjectIterator<T2> input2,
+			TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
+			TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+			TypePairComparator<T1, T2> pairComparator,
+			MemoryManager memoryManager,
+			IOManager ioManager,
+			int numMemoryPages,
+			AbstractInvokable parentTask)
+	throws MemoryAllocationException
+	{
+		if (numMemoryPages < 2) {
+			throw new IllegalArgumentException("Merger needs at least 2 memory pages.");
+		}
+		
+		this.comp = pairComparator;
+		this.serializer1 = serializer1;
+		this.serializer2 = serializer2;
+		
+		this.copy1 = serializer1.createInstance();
+		this.spillHeadCopy = serializer1.createInstance();
+		this.copy2 = serializer2.createInstance();
+		this.blockHeadCopy = serializer2.createInstance();
+		
+		this.memoryManager = memoryManager;
+		this.ioManager = ioManager;
+		
+		this.iterator1 = new ReusingKeyGroupedIterator<T1>(input1, this.serializer1, comparator1.duplicate());
+		this.iterator2 = new ReusingKeyGroupedIterator<T2>(input2, this.serializer2, comparator2.duplicate());
+		
+		final int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1;
+		this.blockIt = new NonReusingBlockResettableIterator<T2>(this.memoryManager, this.serializer2,
+			(numMemoryPages - numPagesForSpiller), parentTask);
+		this.memoryForSpillingIterator = memoryManager.allocatePages(parentTask, numPagesForSpiller);
+	}
+
+
+	@Override
+	public void open() throws IOException {}
+
+
+	@Override
+	public void close() {
+		if (this.blockIt != null) {
+			try {
+				this.blockIt.close();
+			}
+			catch (Throwable t) {
+				LOG.error("Error closing block memory iterator: " + t.getMessage(), t);
+			}
+		}
+		
+		this.memoryManager.release(this.memoryForSpillingIterator);
+	}
+	
+
+	@Override
+	public void abort() {
+		close();
+	}
+
+	/**
+	 * Calls the <code>JoinFunction#match()</code> method for all two key-value pairs that share the same key and come 
+	 * from different inputs. The output of the <code>match()</code> method is forwarded.
+	 * <p>
+	 * This method first zig-zags between the two sorted inputs in order to find a common
+	 * key, and then calls the match stub with the cross product of the values.
+	 * 
+	 * @throws Exception Forwards all exceptions from the user code and the I/O system.
+	 * 
+	 * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(FlatJoinFunction, Collector)
+	 */
+	@Override
+	public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
+	throws Exception
+	{
+		if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
+			// consume all remaining keys (hack to prevent remaining inputs during iterations, lets get rid of this soon)
+			while (this.iterator1.nextKey());
+			while (this.iterator2.nextKey());
+			
+			return false;
+		}
+
+		final TypePairComparator<T1, T2> comparator = this.comp;
+		comparator.setReference(this.iterator1.getCurrent());
+		T2 current2 = this.iterator2.getCurrent();
+				
+		// zig zag
+		while (true) {
+			// determine the relation between the (possibly composite) keys
+			final int comp = comparator.compareToReference(current2);
+			
+			if (comp == 0) {
+				break;
+			}
+			
+			if (comp < 0) {
+				if (!this.iterator2.nextKey()) {
+					return false;
+				}
+				current2 = this.iterator2.getCurrent();
+			}
+			else {
+				if (!this.iterator1.nextKey()) {
+					return false;
+				}
+				comparator.setReference(this.iterator1.getCurrent());
+			}
+		}
+		
+		// here, we have a common key! call the match function with the cross product of the
+		// values
+		final ReusingKeyGroupedIterator<T1>.ValuesIterator values1 = this.iterator1.getValues();
+		final ReusingKeyGroupedIterator<T2>.ValuesIterator values2 = this.iterator2.getValues();
+		
+		final T1 firstV1 = values1.next();
+		final T2 firstV2 = values2.next();	
+			
+		final boolean v1HasNext = values1.hasNext();
+		final boolean v2HasNext = values2.hasNext();
+
+		// check if one side is already empty
+		// this check could be omitted if we put this in MatchTask.
+		// then we can derive the local strategy (with build side).
+		
+		if (v1HasNext) {
+			if (v2HasNext) {
+				// both sides contain more than one value
+				// TODO: Decide which side to spill and which to block!
+				crossMwithNValues(firstV1, values1, firstV2, values2, matchFunction, collector);
+			} else {
+				crossSecond1withNValues(firstV2, firstV1, values1, matchFunction, collector);
+			}
+		} else {
+			if (v2HasNext) {
+				crossFirst1withNValues(firstV1, firstV2, values2, matchFunction, collector);
+			} else {
+				// both sides contain only one value
+				matchFunction.join(firstV1, firstV2, collector);
+			}
+		}
+		return true;
+	}
+
+	/**
+	 * Crosses a single value from the first input with N values, all sharing a common key.
+	 * Effectively realizes a <i>1:N</i> match (join).
+	 * 
+	 * @param val1 The value form the <i>1</i> side.
+	 * @param firstValN The first of the values from the <i>N</i> side.
+	 * @param valsN Iterator over remaining <i>N</i> side values.
+	 *          
+	 * @throws Exception Forwards all exceptions thrown by the stub.
+	 */
+	private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
+			final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
+	throws Exception
+	{
+		this.copy1 = this.serializer1.copy(val1, this.copy1);
+		matchFunction.join(this.copy1, firstValN, collector);
+		
+		// set copy and match first element
+		boolean more = true;
+		do {
+			final T2 nRec = valsN.next();
+			
+			if (valsN.hasNext()) {
+				this.copy1 = this.serializer1.copy(val1, this.copy1);
+				matchFunction.join(this.copy1, nRec, collector);
+			} else {
+				matchFunction.join(val1, nRec, collector);
+				more = false;
+			}
+		}
+		while (more);
+	}
+	
+	/**
+	 * Crosses a single value from the second side with N values, all sharing a common key.
+	 * Effectively realizes a <i>N:1</i> match (join).
+	 * 
+	 * @param val1 The value form the <i>1</i> side.
+	 * @param firstValN The first of the values from the <i>N</i> side.
+	 * @param valsN Iterator over remaining <i>N</i> side values.
+	 *          
+	 * @throws Exception Forwards all exceptions thrown by the stub.
+	 */
+	private void crossSecond1withNValues(T2 val1, T1 firstValN,
+			Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector)
+	throws Exception
+	{
+		this.copy2 = this.serializer2.copy(val1, this.copy2);
+		matchFunction.join(firstValN, this.copy2, collector);
+		
+		// set copy and match first element
+		boolean more = true;
+		do {
+			final T1 nRec = valsN.next();
+			
+			if (valsN.hasNext()) {
+				this.copy2 = this.serializer2.copy(val1, this.copy2);
+				matchFunction.join(nRec,this.copy2,collector);
+			} else {
+				matchFunction.join(nRec, val1, collector);
+				more = false;
+			}
+		}
+		while (more);
+	}
+	
+	/**
+	 * @param firstV1
+	 * @param spillVals
+	 * @param firstV2
+	 * @param blockVals
+	 */
+	private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
+			final T2 firstV2, final Iterator<T2> blockVals,
+			final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
+	throws Exception
+	{
+		// ==================================================
+		// We have one first (head) element from both inputs (firstV1 and firstV2)
+		// We have an iterator for both inputs.
+		// we make the V1 side the spilling side and the V2 side the blocking side.
+		// In order to get the full cross product without unnecessary spilling, we do the
+		// following:
+		// 1) cross the heads
+		// 2) cross the head of the spilling side against the first block of the blocking side
+		// 3) cross the iterator of the spilling side with the head of the block side
+		// 4) cross the iterator of the spilling side with the first block
+		// ---------------------------------------------------
+		// If the blocking side has more than one block, we really need to make the spilling side fully
+		// resettable. For each further block on the block side, we do:
+		// 5) cross the head of the spilling side with the next block
+		// 6) cross the spilling iterator with the next block.
+		
+		// match the first values first
+		this.copy1 = this.serializer1.copy(firstV1, this.copy1);
+		this.blockHeadCopy = this.serializer2.copy(firstV2, this.blockHeadCopy);
+		
+		// --------------- 1) Cross the heads -------------------
+		matchFunction.join(this.copy1, firstV2, collector);
+		
+		// for the remaining values, we do a block-nested-loops join
+		SpillingResettableIterator<T1> spillIt = null;
+		
+		try {
+			// create block iterator on the second input
+			this.blockIt.reopen(blockVals);
+			
+			// ------------- 2) cross the head of the spilling side with the first block ------------------
+			while (this.blockIt.hasNext()) {
+				final T2 nextBlockRec = this.blockIt.next();
+				this.copy1 = this.serializer1.copy(firstV1, this.copy1);
+				matchFunction.join(this.copy1, nextBlockRec, collector);
+			}
+			this.blockIt.reset();
+			
+			// spilling is required if the blocked input has data beyond the current block.
+			// in that case, create the spilling iterator
+			final Iterator<T1> leftSideIter;
+			final boolean spillingRequired = this.blockIt.hasFurtherInput();
+			if (spillingRequired)
+			{
+				// more data than would fit into one block. we need to wrap the other side in a spilling iterator
+				// create spilling iterator on first input
+				spillIt = new SpillingResettableIterator<T1>(spillVals, this.serializer1,
+						this.memoryManager, this.ioManager, this.memoryForSpillingIterator);
+				leftSideIter = spillIt;
+				spillIt.open();
+				
+				this.spillHeadCopy = this.serializer1.copy(firstV1, this.spillHeadCopy);
+			}
+			else {
+				leftSideIter = spillVals;
+			}
+			
+			// cross the values in the v1 iterator against the current block
+			
+			while (leftSideIter.hasNext()) {
+				final T1 nextSpillVal = leftSideIter.next();
+				this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1);
+				
+				
+				// -------- 3) cross the iterator of the spilling side with the head of the block side --------
+				this.copy2 = this.serializer2.copy(this.blockHeadCopy, this.copy2);
+				matchFunction.join(this.copy1, this.copy2, collector);
+				
+				// -------- 4) cross the iterator of the spilling side with the first block --------
+				while (this.blockIt.hasNext()) {
+					T2 nextBlockRec = this.blockIt.next();
+					
+					// get instances of key and block value
+					this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1);
+					matchFunction.join(this.copy1, nextBlockRec, collector);
+				}
+				// reset block iterator
+				this.blockIt.reset();
+			}
+			
+			// if everything from the block-side fit into a single block, we are done.
+			// note that in this special case, we did not create a spilling iterator at all
+			if (!spillingRequired) {
+				return;
+			}
+			
+			// here we are, because we have more blocks on the block side
+			// loop as long as there are blocks from the blocked input
+			while (this.blockIt.nextBlock())
+			{
+				// rewind the spilling iterator
+				spillIt.reset();
+				
+				// ------------- 5) cross the head of the spilling side with the next block ------------
+				while (this.blockIt.hasNext()) {
+					this.copy1 = this.serializer1.copy(this.spillHeadCopy, this.copy1);
+					final T2 nextBlockVal = blockIt.next();
+					matchFunction.join(this.copy1, nextBlockVal, collector);
+				}
+				this.blockIt.reset();
+				
+				// -------- 6) cross the spilling iterator with the next block. ------------------
+				while (spillIt.hasNext())
+				{
+					// get value from resettable iterator
+					final T1 nextSpillVal = spillIt.next();
+					// cross value with block values
+					while (this.blockIt.hasNext()) {
+						// get instances of key and block value
+						final T2 nextBlockVal = this.blockIt.next();
+						this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1);
+						matchFunction.join(this.copy1, nextBlockVal, collector);
+					}
+					
+					// reset block iterator
+					this.blockIt.reset();
+				}
+				// reset v1 iterator
+				spillIt.reset();
+			}
+		}
+		finally {
+			if (spillIt != null) {
+				this.memoryForSpillingIterator.addAll(spillIt.close());
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/BlockResettableIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/BlockResettableIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/BlockResettableIteratorTest.java
deleted file mode 100644
index c51e53a..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/BlockResettableIteratorTest.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.resettable;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.junit.Assert;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.resettable.BlockResettableIterator;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class BlockResettableIteratorTest
-{
-	private static final int MEMORY_CAPACITY = 3 * 128 * 1024;
-	
-	private static final int NUM_VALUES = 20000;
-	
-	private MemoryManager memman;
-
-	private Iterator<Record> reader;
-
-	private List<Record> objects;
-	
-	private final TypeSerializer<Record> serializer = RecordSerializer.get();
-
-	@Before
-	public void startup() {
-		// set up IO and memory manager
-		this.memman = new DefaultMemoryManager(MEMORY_CAPACITY, 1);
-		
-		// create test objects
-		this.objects = new ArrayList<Record>(20000);
-		for (int i = 0; i < NUM_VALUES; ++i) {
-			this.objects.add(new Record(new IntValue(i)));
-		}
-		
-		// create the reader
-		this.reader = objects.iterator();
-	}
-	
-	@After
-	public void shutdown() {
-		this.objects = null;
-		
-		// check that the memory manager got all segments back
-		if (!this.memman.verifyEmpty()) {
-			Assert.fail("A memory leak has occurred: Not all memory was properly returned to the memory manager.");
-		}
-		
-		this.memman.shutdown();
-		this.memman = null;
-	}
-
-	@Test
-	public void testSerialBlockResettableIterator() throws Exception
-	{
-		final AbstractInvokable memOwner = new DummyInvokable();
-		// create the resettable Iterator
-		final BlockResettableIterator<Record> iterator = new BlockResettableIterator<Record>(
-				this.memman, this.reader, this.serializer, 1, memOwner);
-		// open the iterator
-		iterator.open();
-		
-		// now test walking through the iterator
-		int lower = 0;
-		int upper = 0;
-		do {
-			lower = upper;
-			upper = lower;
-			// find the upper bound
-			while (iterator.hasNext()) {
-				Record target = iterator.next();
-				int val = target.getField(0, IntValue.class).getValue();
-				Assert.assertEquals(upper++, val);
-			}
-			// now reset the buffer a few times
-			for (int i = 0; i < 5; ++i) {
-				iterator.reset();
-				int count = 0;
-				while (iterator.hasNext()) {
-					Record target = iterator.next();
-					int val = target.getField(0, IntValue.class).getValue();
-					Assert.assertEquals(lower + (count++), val);
-				}
-				Assert.assertEquals(upper - lower, count);
-			}
-		} while (iterator.nextBlock());
-		Assert.assertEquals(NUM_VALUES, upper);
-		// close the iterator
-		iterator.close();
-	}
-
-	@Test
-	public void testDoubleBufferedBlockResettableIterator() throws Exception
-	{
-		final AbstractInvokable memOwner = new DummyInvokable();
-		// create the resettable Iterator
-		final BlockResettableIterator<Record> iterator = new BlockResettableIterator<Record>(
-				this.memman, this.reader, this.serializer, 2, memOwner);
-		// open the iterator
-		iterator.open();
-		
-		// now test walking through the iterator
-		int lower = 0;
-		int upper = 0;
-		do {
-			lower = upper;
-			upper = lower;
-			// find the upper bound
-			while (iterator.hasNext()) {
-				Record target = iterator.next();
-				int val = target.getField(0, IntValue.class).getValue();
-				Assert.assertEquals(upper++, val);
-			}
-			// now reset the buffer a few times
-			for (int i = 0; i < 5; ++i) {
-				iterator.reset();
-				int count = 0;
-				while (iterator.hasNext()) {
-					Record target = iterator.next();
-					int val = target.getField(0, IntValue.class).getValue();
-					Assert.assertEquals(lower + (count++), val);
-				}
-				Assert.assertEquals(upper - lower, count);
-			}
-		} while (iterator.nextBlock());
-		Assert.assertEquals(NUM_VALUES, upper);
-		
-		// close the iterator
-		iterator.close();
-	}
-
-	@Test
-	public void testTwelveFoldBufferedBlockResettableIterator() throws Exception
-	{
-		final AbstractInvokable memOwner = new DummyInvokable();
-		// create the resettable Iterator
-		final BlockResettableIterator<Record> iterator = new BlockResettableIterator<Record>(
-				this.memman, this.reader, this.serializer, 12, memOwner);
-		// open the iterator
-		iterator.open();
-		
-		// now test walking through the iterator
-		int lower = 0;
-		int upper = 0;
-		do {
-			lower = upper;
-			upper = lower;
-			// find the upper bound
-			while (iterator.hasNext()) {
-				Record target = iterator.next();
-				int val = target.getField(0, IntValue.class).getValue();
-				Assert.assertEquals(upper++, val);
-			}
-			// now reset the buffer a few times
-			for (int i = 0; i < 5; ++i) {
-				iterator.reset();
-				int count = 0;
-				while (iterator.hasNext()) {
-					Record target = iterator.next();
-					int val = target.getField(0, IntValue.class).getValue();
-					Assert.assertEquals(lower + (count++), val);
-				}
-				Assert.assertEquals(upper - lower, count);
-			}
-		} while (iterator.nextBlock());
-		Assert.assertEquals(NUM_VALUES, upper);
-		
-		// close the iterator
-		iterator.close();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d529749c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIteratorTest.java
new file mode 100644
index 0000000..5641f29
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/NonReusingBlockResettableIteratorTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.resettable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Assert;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class NonReusingBlockResettableIteratorTest
+{
+	private static final int MEMORY_CAPACITY = 3 * 128 * 1024;
+	
+	private static final int NUM_VALUES = 20000;
+	
+	private MemoryManager memman;
+
+	private Iterator<Record> reader;
+
+	private List<Record> objects;
+	
+	private final TypeSerializer<Record> serializer = RecordSerializer.get();
+
+	@Before
+	public void startup() {
+		// set up IO and memory manager
+		this.memman = new DefaultMemoryManager(MEMORY_CAPACITY, 1);
+		
+		// create test objects
+		this.objects = new ArrayList<Record>(20000);
+		for (int i = 0; i < NUM_VALUES; ++i) {
+			this.objects.add(new Record(new IntValue(i)));
+		}
+		
+		// create the reader
+		this.reader = objects.iterator();
+	}
+	
+	@After
+	public void shutdown() {
+		this.objects = null;
+		
+		// check that the memory manager got all segments back
+		if (!this.memman.verifyEmpty()) {
+			Assert.fail("A memory leak has occurred: Not all memory was properly returned to the memory manager.");
+		}
+		
+		this.memman.shutdown();
+		this.memman = null;
+	}
+
+	@Test
+	public void testSerialBlockResettableIterator() throws Exception
+	{
+		final AbstractInvokable memOwner = new DummyInvokable();
+		// create the resettable Iterator
+		final NonReusingBlockResettableIterator<Record> iterator = new NonReusingBlockResettableIterator<Record>(
+				this.memman, this.reader, this.serializer, 1, memOwner);
+		// open the iterator
+		iterator.open();
+		
+		// now test walking through the iterator
+		int lower = 0;
+		int upper = 0;
+		do {
+			lower = upper;
+			upper = lower;
+			// find the upper bound
+			while (iterator.hasNext()) {
+				Record target = iterator.next();
+				int val = target.getField(0, IntValue.class).getValue();
+				Assert.assertEquals(upper++, val);
+			}
+			// now reset the buffer a few times
+			for (int i = 0; i < 5; ++i) {
+				iterator.reset();
+				int count = 0;
+				while (iterator.hasNext()) {
+					Record target = iterator.next();
+					int val = target.getField(0, IntValue.class).getValue();
+					Assert.assertEquals(lower + (count++), val);
+				}
+				Assert.assertEquals(upper - lower, count);
+			}
+		} while (iterator.nextBlock());
+		Assert.assertEquals(NUM_VALUES, upper);
+		// close the iterator
+		iterator.close();
+	}
+
+	@Test
+	public void testDoubleBufferedBlockResettableIterator() throws Exception
+	{
+		final AbstractInvokable memOwner = new DummyInvokable();
+		// create the resettable Iterator
+		final NonReusingBlockResettableIterator<Record> iterator = new NonReusingBlockResettableIterator<Record>(
+				this.memman, this.reader, this.serializer, 2, memOwner);
+		// open the iterator
+		iterator.open();
+		
+		// now test walking through the iterator
+		int lower = 0;
+		int upper = 0;
+		do {
+			lower = upper;
+			upper = lower;
+			// find the upper bound
+			while (iterator.hasNext()) {
+				Record target = iterator.next();
+				int val = target.getField(0, IntValue.class).getValue();
+				Assert.assertEquals(upper++, val);
+			}
+			// now reset the buffer a few times
+			for (int i = 0; i < 5; ++i) {
+				iterator.reset();
+				int count = 0;
+				while (iterator.hasNext()) {
+					Record target = iterator.next();
+					int val = target.getField(0, IntValue.class).getValue();
+					Assert.assertEquals(lower + (count++), val);
+				}
+				Assert.assertEquals(upper - lower, count);
+			}
+		} while (iterator.nextBlock());
+		Assert.assertEquals(NUM_VALUES, upper);
+		
+		// close the iterator
+		iterator.close();
+	}
+
+	@Test
+	public void testTwelveFoldBufferedBlockResettableIterator() throws Exception
+	{
+		final AbstractInvokable memOwner = new DummyInvokable();
+		// create the resettable Iterator
+		final NonReusingBlockResettableIterator<Record> iterator = new NonReusingBlockResettableIterator<Record>(
+				this.memman, this.reader, this.serializer, 12, memOwner);
+		// open the iterator
+		iterator.open();
+		
+		// now test walking through the iterator
+		int lower = 0;
+		int upper = 0;
+		do {
+			lower = upper;
+			upper = lower;
+			// find the upper bound
+			while (iterator.hasNext()) {
+				Record target = iterator.next();
+				int val = target.getField(0, IntValue.class).getValue();
+				Assert.assertEquals(upper++, val);
+			}
+			// now reset the buffer a few times
+			for (int i = 0; i < 5; ++i) {
+				iterator.reset();
+				int count = 0;
+				while (iterator.hasNext()) {
+					Record target = iterator.next();
+					int val = target.getField(0, IntValue.class).getValue();
+					Assert.assertEquals(lower + (count++), val);
+				}
+				Assert.assertEquals(upper - lower, count);
+			}
+		} while (iterator.nextBlock());
+		Assert.assertEquals(NUM_VALUES, upper);
+		
+		// close the iterator
+		iterator.close();
+	}
+
+}


Mime
View raw message