flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [05/13] flink git commit: [FLINK-1285] Make execution mode configurable
Date Thu, 08 Jan 2015 10:59:00 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
new file mode 100644
index 0000000..fd2f906
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
+import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.hash.ReusingHashMatchIteratorITCase.RecordMatch;
+import org.apache.flink.runtime.operators.hash.ReusingHashMatchIteratorITCase.RecordMatchRemovingJoin;
+import org.apache.flink.runtime.operators.hash.HashTableITCase.ConstantsKeyValuePairsIterator;
+import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.runtime.operators.testutils.UnionIterator;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator;
+import org.apache.flink.runtime.operators.testutils.TestData.Key;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test specialized hash join that keeps the build side data (in memory and on hard disk)
+ * This is used for iterative tasks.
+ */
+@SuppressWarnings("deprecation")
+public class ReusingReOpenableHashTableITCase {
+	
+	private static final int PAGE_SIZE = 8 * 1024;
+	private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages.
+
+	private static final long SEED1 = 561349061987311L;
+	private static final long SEED2 = 231434613412342L;
+	
+	private static final int NUM_PROBES = 3; // number of reopenings of hash join
+	
+	private final AbstractInvokable parentTask = new DummyInvokable();
+
+	private IOManager ioManager;
+	private MemoryManager memoryManager;
+	
+	private TypeSerializer<Record> recordSerializer;
+	private TypeComparator<Record> record1Comparator;
+	private TypeComparator<Record> record2Comparator;
+	private TypePairComparator<Record, Record> recordPairComparator;
+	
+	
+	
+	
+	private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
+	private TypeSerializer<Record> recordBuildSideAccesssor;
+	private TypeSerializer<Record> recordProbeSideAccesssor;
+	private TypeComparator<Record> recordBuildSideComparator;
+	private TypeComparator<Record> recordProbeSideComparator;
+	private TypePairComparator<Record, Record> pactRecordComparator;
+
+	@SuppressWarnings("unchecked")
+	@Before
+	public void beforeTest()
+	{
+		this.recordSerializer = RecordSerializer.get();
+		
+		this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
+		this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
+		this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class});
+		
+		
+		final int[] keyPos = new int[] {0};
+		final Class<? extends Key>[] keyType = (Class<? extends Key>[]) new Class[] { IntValue.class };
+		
+		this.recordBuildSideAccesssor = RecordSerializer.get();
+		this.recordProbeSideAccesssor = RecordSerializer.get();
+		this.recordBuildSideComparator = new RecordComparator(keyPos, keyType);
+		this.recordProbeSideComparator = new RecordComparator(keyPos, keyType);
+		this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
+		
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1, PAGE_SIZE);
+		this.ioManager = new IOManagerAsync();
+	}
+
+	@After
+	public void afterTest()
+	{
+		if (this.ioManager != null) {
+			this.ioManager.shutdown();
+			if (!this.ioManager.isProperlyShutDown()) {
+				Assert.fail("I/O manager failed to properly shut down.");
+			}
+			this.ioManager = null;
+		}
+		
+		if (this.memoryManager != null) {
+			Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
+				this.memoryManager.verifyEmpty());
+			this.memoryManager.shutdown();
+			this.memoryManager = null;
+		}
+	}
+	
+	
+	/**
+	 * Test behavior with overflow buckets (Overflow buckets must be initialized correctly 
+	 * if the input is reopened again)
+	 */
+	@Test
+	public void testOverflow() {
+		
+		int buildSize = 1000;
+		int probeSize = 1000;
+		try {
+			Generator bgen = new Generator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+			Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			
+			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
+			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+			doTest(buildInput,probeInput, bgen, pgen);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	/**
+	 * Verify proper operation if the build side is spilled to disk.
+	 */
+	@Test
+	public void testDoubleProbeSpilling() {
+		
+		int buildSize = 1000;
+		int probeSize = 1000;
+		try {
+			Generator bgen = new Generator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			
+			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
+			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+			doTest(buildInput,probeInput, bgen, pgen);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	/**
+	 * This test case verifies that hybrid hash join is able to handle multiple probe phases
+	 * when the build side fits completely into memory.
+	 */
+	@Test
+	public void testDoubleProbeInMemory() {
+		
+		int buildSize = 1000;
+		int probeSize = 1000;
+		try {
+			Generator bgen = new Generator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			Generator pgen = new Generator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			
+			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
+			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+			
+			doTest(buildInput,probeInput, bgen, pgen);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	private void doTest(TestData.GeneratorIterator buildInput, TestData.GeneratorIterator probeInput, Generator bgen, Generator pgen) throws Exception {
+		// collect expected data
+		final Map<TestData.Key, Collection<RecordMatch>> expectedFirstMatchesMap = ReusingHashMatchIteratorITCase.matchRecordValues(ReusingHashMatchIteratorITCase.collectRecordData(buildInput), ReusingHashMatchIteratorITCase.collectRecordData(probeInput));
+		
+		final List<Map<TestData.Key, Collection<RecordMatch>>> expectedNMatchesMapList = new ArrayList<Map<Key,Collection<RecordMatch>>>(NUM_PROBES);
+		final JoinFunction[] nMatcher = new RecordMatchRemovingJoin[NUM_PROBES];
+		for(int i = 0; i < NUM_PROBES; i++) {
+			Map<TestData.Key, Collection<RecordMatch>> tmp;
+			expectedNMatchesMapList.add(tmp = deepCopy(expectedFirstMatchesMap));
+			nMatcher[i] = new RecordMatchRemovingJoin(tmp);
+		}
+		
+		final JoinFunction firstMatcher = new RecordMatchRemovingJoin(expectedFirstMatchesMap);
+		
+		final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+
+		// reset the generators
+		bgen.reset();
+		pgen.reset();
+		buildInput.reset();
+		probeInput.reset();
+
+		// compare with iterator values
+		ReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record> iterator =
+				new ReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
+						buildInput, probeInput, this.recordSerializer, this.record1Comparator, 
+					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+					this.memoryManager, ioManager, this.parentTask, 1.0);
+		
+		iterator.open();
+		// do first join with both inputs
+		while (iterator.callWithNextKey(firstMatcher, collector));
+
+		// assert that each expected match was seen for the first input
+		for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedFirstMatchesMap.entrySet()) {
+			if (!entry.getValue().isEmpty()) {
+				Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+			}
+		}
+		
+		for(int i = 0; i < NUM_PROBES; i++) {
+			pgen.reset();
+			probeInput.reset();
+			// prepare ..
+			iterator.reopenProbe(probeInput);
+			// .. and do second join
+			while (iterator.callWithNextKey(nMatcher[i], collector));
+			
+			// assert that each expected match was seen for the second input
+			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedNMatchesMapList.get(i).entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		
+		iterator.close();
+	}
+	
+	//
+	//
+	//	Tests taken from HahTableITCase!
+	//
+	//
+	
+	private final MutableObjectIterator<Record> getProbeInput(final int numKeys,
+			final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
+		MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true);
+		MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
+		MutableObjectIterator<Record> probe3 = new ConstantsKeyValuePairsIterator(repeatedValue2, 23, 5);
+		List<MutableObjectIterator<Record>> probes = new ArrayList<MutableObjectIterator<Record>>();
+		probes.add(probe1);
+		probes.add(probe2);
+		probes.add(probe3);
+		return new UnionIterator<Record>(probes);
+	}
+	
+	@Test
+	public void testSpillingHashJoinWithMassiveCollisions() throws IOException
+	{
+		// the following two values are known to have a hash-code collision on the initial level.
+		// we use them to make sure one partition grows over-proportionally large
+		final int REPEATED_VALUE_1 = 40559;
+		final int REPEATED_VALUE_2 = 92882;
+		final int REPEATED_VALUE_COUNT_BUILD = 200000;
+		final int REPEATED_VALUE_COUNT_PROBE = 5;
+		
+		final int NUM_KEYS = 1000000;
+		final int BUILD_VALS_PER_KEY = 3;
+		final int PROBE_VALS_PER_KEY = 10;
+		
+		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
+		MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+		MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
+		MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
+		List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>();
+		builds.add(build1);
+		builds.add(build2);
+		builds.add(build3);
+		MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
+	
+		
+		
+
+		// allocate the memory for the HashTable
+		List<MemorySegment> memSegments;
+		try {
+			memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896);
+		}
+		catch (MemoryAllocationException maex) {
+			fail("Memory for the Join could not be provided.");
+			return;
+		}
+		
+		// create the map for validating the results
+		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
+		
+		// ----------------------------------------------------------------------------------------
+		
+		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
+				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, 
+				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
+				memSegments, ioManager);
+		
+		for(int probe = 0; probe < NUM_PROBES; probe++) {
+			// create a probe input that gives 10 million pairs with 10 values sharing a key
+			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+			if(probe == 0) {
+				join.open(buildInput, probeInput);
+			} else {
+				join.reopenProbe(probeInput);
+			}
+		
+			Record record;
+			final Record recordReuse = new Record();
+
+			while (join.nextRecord())
+			{
+				int numBuildValues = 0;
+		
+				final Record probeRec = join.getCurrentProbeRecord();
+				int key = probeRec.getField(0, IntValue.class).getValue();
+				
+				HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
+				if ((record = buildSide.next(recordReuse)) != null) {
+					numBuildValues = 1;
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); 
+				}
+				else {
+					fail("No build side values found for a probe key.");
+				}
+				while ((record = buildSide.next(record)) != null) {
+					numBuildValues++;
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+				}
+				
+				Long contained = map.get(key);
+				if (contained == null) {
+					contained = Long.valueOf(numBuildValues);
+				}
+				else {
+					contained = Long.valueOf(contained.longValue() + numBuildValues);
+				}
+				
+				map.put(key, contained);
+			}
+		}
+		
+		join.close();
+		
+		Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size());
+		for (Map.Entry<Integer, Long> entry : map.entrySet()) {
+			long val = entry.getValue();
+			int key = entry.getKey();
+	
+			if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) {
+				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, 
+							(PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val);
+			} else {
+				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, 
+							PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val);
+			}
+		}
+		
+		
+		// ----------------------------------------------------------------------------------------
+		
+		this.memoryManager.release(join.getFreedMemory());
+	}
+	
+	/*
+	 * This test is basically identical to the "testSpillingHashJoinWithMassiveCollisions" test, only that the number
+	 * of repeated values (causing bucket collisions) are large enough to make sure that their target partition no longer
+	 * fits into memory by itself and needs to be repartitioned in the recursion again.
+	 */
+	@Test
+	public void testSpillingHashJoinWithTwoRecursions() throws IOException
+	{
+		// the following two values are known to have a hash-code collision on the first recursion level.
+		// we use them to make sure one partition grows over-proportionally large
+		final int REPEATED_VALUE_1 = 40559;
+		final int REPEATED_VALUE_2 = 92882;
+		final int REPEATED_VALUE_COUNT_BUILD = 200000;
+		final int REPEATED_VALUE_COUNT_PROBE = 5;
+		
+		final int NUM_KEYS = 1000000;
+		final int BUILD_VALS_PER_KEY = 3;
+		final int PROBE_VALS_PER_KEY = 10;
+		
+		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
+		MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+		MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
+		MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
+		List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>();
+		builds.add(build1);
+		builds.add(build2);
+		builds.add(build3);
+		MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
+	
+
+		// allocate the memory for the HashTable
+		List<MemorySegment> memSegments;
+		try {
+			memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896);
+		}
+		catch (MemoryAllocationException maex) {
+			fail("Memory for the Join could not be provided.");
+			return;
+		}
+		
+		// create the map for validating the results
+		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
+		
+		// ----------------------------------------------------------------------------------------
+		
+		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
+				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, 
+				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
+				memSegments, ioManager);
+		for(int probe = 0; probe < NUM_PROBES; probe++) {
+			// create a probe input that gives 10 million pairs with 10 values sharing a key
+			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+			if(probe == 0) {
+				join.open(buildInput, probeInput);
+			} else {
+				join.reopenProbe(probeInput);
+			}
+			Record record;
+			final Record recordReuse = new Record();
+
+			while (join.nextRecord())
+			{	
+				int numBuildValues = 0;
+				
+				final Record probeRec = join.getCurrentProbeRecord();
+				int key = probeRec.getField(0, IntValue.class).getValue();
+				
+				HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
+				if ((record = buildSide.next(recordReuse)) != null) {
+					numBuildValues = 1;
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); 
+				}
+				else {
+					fail("No build side values found for a probe key.");
+				}
+				while ((record = buildSide.next(recordReuse)) != null) {
+					numBuildValues++;
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+				}
+				
+				Long contained = map.get(key);
+				if (contained == null) {
+					contained = Long.valueOf(numBuildValues);
+				}
+				else {
+					contained = Long.valueOf(contained.longValue() + numBuildValues);
+				}
+				
+				map.put(key, contained);
+			}
+		}
+		
+		join.close();
+		Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size());
+		for (Map.Entry<Integer, Long> entry : map.entrySet()) {
+			long val = entry.getValue();
+			int key = entry.getKey();
+	
+			if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) {
+				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, 
+							(PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val);
+			} else {
+				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, 
+							PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val);
+			}
+		}
+		
+		
+		// ----------------------------------------------------------------------------------------
+		
+		this.memoryManager.release(join.getFreedMemory());
+	}
+	
+	
+	static Map<Key, Collection<RecordMatch>> deepCopy(Map<Key, Collection<RecordMatch>> expectedSecondMatchesMap) {
+		Map<Key, Collection<RecordMatch>> copy = new HashMap<Key, Collection<RecordMatch>>(expectedSecondMatchesMap.size());
+		for(Map.Entry<Key, Collection<RecordMatch>> entry : expectedSecondMatchesMap.entrySet()) {
+			List<RecordMatch> matches = new ArrayList<RecordMatch>(entry.getValue().size());
+			for(RecordMatch m : entry.getValue()) {
+				matches.add(m);
+			}
+			copy.put(entry.getKey(), matches);
+		}
+		return copy;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index a9d17c5..0ba9823 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -44,7 +44,7 @@ import org.apache.flink.runtime.operators.testutils.TestData;
 import org.apache.flink.runtime.operators.testutils.TestData.Key;
 import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
 import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
@@ -336,7 +336,7 @@ public class CombiningUnilateralSortMergerITCase {
 	
 	private static Iterator<Integer> getReducingIterator(MutableObjectIterator<Record> data, TypeSerializer<Record> serializer, TypeComparator<Record> comparator) {
 		
-		final KeyGroupedIterator<Record> groupIter = new KeyGroupedIterator<Record>(data, serializer, comparator);
+		final ReusingKeyGroupedIterator<Record> groupIter = new ReusingKeyGroupedIterator<Record>(data, serializer, comparator);
 		
 		return new Iterator<Integer>() {
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
index f2a3fc7..f4ceed3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
@@ -279,6 +279,11 @@ public class MassiveStringValueSortingITCase {
 			reuse.setValue(line);
 			return reuse;
 		}
+
+		@Override
+		public StringValue next() throws IOException {
+			return next(new StringValue());
+		}
 	}
 	
 	private static final class StringValueTupleReaderMutableObjectIterator implements MutableObjectIterator<Tuple2<StringValue, StringValue[]>> {
@@ -306,6 +311,11 @@ public class MassiveStringValueSortingITCase {
 			
 			return reuse;
 		}
+
+		@Override
+		public Tuple2<StringValue, StringValue[]> next() throws IOException {
+			return next(new Tuple2<StringValue, StringValue[]>(new StringValue(), new StringValue[0]));
+		}
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java
new file mode 100644
index 0000000..1a6884e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
+import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ */
+public class NonReusingSortMergeCoGroupIteratorITCase
+{
+	// the size of the left and right inputs
+	private static final int INPUT_1_SIZE = 20000;
+
+	private static final int INPUT_2_SIZE = 1000;
+
+	// random seeds for the left and right input data generators
+	private static final long SEED1 = 561349061987311L;
+
+	private static final long SEED2 = 231434613412342L;
+
+	// left and right input data generators
+	private Generator generator1;
+
+	private Generator generator2;
+
+	// left and right input RecordReader mocks
+	private MutableObjectIterator<Record> reader1;
+
+	private MutableObjectIterator<Record> reader2;
+	
+	
+	private TypeSerializer<Record> serializer1;
+	private TypeSerializer<Record> serializer2;
+	private TypeComparator<Record> comparator1;
+	private TypeComparator<Record> comparator2;
+	private TypePairComparator<Record, Record> pairComparator;
+
+
+	@SuppressWarnings("unchecked")
+	@Before
+	public void beforeTest() {
+		this.serializer1 = RecordSerializer.get();
+		this.serializer2 = RecordSerializer.get();
+		this.comparator1 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
+		this.comparator2 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
+		this.pairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]{TestData.Key.class});
+	}
+	
+	@Test
+	public void testMerge() {
+		try {
+			
+			generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+			generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+
+			reader1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
+			reader2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+
+			// collect expected data
+			Map<TestData.Key, Collection<TestData.Value>> expectedValuesMap1 = collectData(generator1, INPUT_1_SIZE);
+			Map<TestData.Key, Collection<TestData.Value>> expectedValuesMap2 = collectData(generator2, INPUT_2_SIZE);
+			Map<TestData.Key, List<Collection<TestData.Value>>> expectedCoGroupsMap = coGroupValues(expectedValuesMap1, expectedValuesMap2);
+	
+			// reset the generators
+			generator1.reset();
+			generator2.reset();
+	
+			// compare with iterator values
+			NonReusingSortMergeCoGroupIterator<Record, Record> iterator =	new NonReusingSortMergeCoGroupIterator<Record, Record>(
+					this.reader1, this.reader2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
+					this.pairComparator);
+	
+			iterator.open();
+			
+			final TestData.Key key = new TestData.Key();
+			while (iterator.next())
+			{
+				Iterator<Record> iter1 = iterator.getValues1().iterator();
+				Iterator<Record> iter2 = iterator.getValues2().iterator();
+				
+				TestData.Value v1 = null;
+				TestData.Value v2 = null;
+				
+				if (iter1.hasNext()) {
+					Record rec = iter1.next();
+					rec.getFieldInto(0, key);
+					v1 = rec.getField(1, TestData.Value.class);
+				}
+				else if (iter2.hasNext()) {
+					Record rec = iter2.next();
+					rec.getFieldInto(0, key);
+					v2 = rec.getField(1, TestData.Value.class);
+				}
+				else {
+					Assert.fail("No input on both sides.");
+				}
+	
+				// assert that matches for this key exist
+				Assert.assertTrue("No matches for key " + key, expectedCoGroupsMap.containsKey(key));
+				
+				Collection<TestData.Value> expValues1 = expectedCoGroupsMap.get(key).get(0);
+				Collection<TestData.Value> expValues2 = expectedCoGroupsMap.get(key).get(1);
+				
+				if (v1 != null) {
+					expValues1.remove(v1);
+				}
+				else {
+					expValues2.remove(v2);
+				}
+				
+				while(iter1.hasNext()) {
+					Record rec = iter1.next();
+					Assert.assertTrue("Value not in expected set of first input", expValues1.remove(rec.getField(1, TestData.Value.class)));
+				}
+				Assert.assertTrue("Expected set of first input not empty", expValues1.isEmpty());
+				
+				while(iter2.hasNext()) {
+					Record rec = iter2.next();
+					Assert.assertTrue("Value not in expected set of second input", expValues2.remove(rec.getField(1, TestData.Value.class)));
+				}
+				Assert.assertTrue("Expected set of second input not empty", expValues2.isEmpty());
+	
+				expectedCoGroupsMap.remove(key);
+			}
+			iterator.close();
+	
+			Assert.assertTrue("Expected key set not empty", expectedCoGroupsMap.isEmpty());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	private Map<TestData.Key, List<Collection<TestData.Value>>> coGroupValues(
+			Map<TestData.Key, Collection<TestData.Value>> leftMap,
+			Map<TestData.Key, Collection<TestData.Value>> rightMap)
+	{
+		Map<TestData.Key, List<Collection<TestData.Value>>> map = new HashMap<TestData.Key, List<Collection<TestData.Value>>>(1000);
+
+		Set<TestData.Key> keySet = new HashSet<TestData.Key>(leftMap.keySet());
+		keySet.addAll(rightMap.keySet());
+		
+		for (TestData.Key key : keySet) {
+			Collection<TestData.Value> leftValues = leftMap.get(key);
+			Collection<TestData.Value> rightValues = rightMap.get(key);
+			ArrayList<Collection<TestData.Value>> list = new ArrayList<Collection<TestData.Value>>(2);
+			
+			if (leftValues == null) {
+				list.add(new ArrayList<TestData.Value>(0));
+			} else {
+				list.add(leftValues);
+			}
+			
+			if (rightValues == null) {
+				list.add(new ArrayList<TestData.Value>(0));
+			} else {
+				list.add(rightValues);
+			}
+			
+			map.put(key, list);
+		}
+		return map;
+	}
+
+	private Map<TestData.Key, Collection<TestData.Value>> collectData(Generator iter, int num)
+	throws Exception
+	{
+		Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
+		Record pair = new Record();
+		
+		for (int i = 0; i < num; i++) {
+			iter.next(pair);
+			TestData.Key key = pair.getField(0, TestData.Key.class);
+			
+			if (!map.containsKey(key)) {
+				map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
+			}
+
+			Collection<TestData.Value> values = map.get(key);
+			values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
+		}
+		return map;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java
new file mode 100644
index 0000000..a487a65
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.sort;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
+import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ */
+public class ReusingSortMergeCoGroupIteratorITCase
+{
+	// the size of the left and right inputs
+	private static final int INPUT_1_SIZE = 20000;
+
+	private static final int INPUT_2_SIZE = 1000;
+
+	// random seeds for the left and right input data generators
+	private static final long SEED1 = 561349061987311L;
+
+	private static final long SEED2 = 231434613412342L;
+
+	// left and right input data generators
+	private Generator generator1;
+
+	private Generator generator2;
+
+	// left and right input RecordReader mocks
+	private MutableObjectIterator<Record> reader1;
+
+	private MutableObjectIterator<Record> reader2;
+	
+	
+	private TypeSerializer<Record> serializer1;
+	private TypeSerializer<Record> serializer2;
+	private TypeComparator<Record> comparator1;
+	private TypeComparator<Record> comparator2;
+	private TypePairComparator<Record, Record> pairComparator;
+
+
+	@SuppressWarnings("unchecked")
+	@Before
+	public void beforeTest() {
+		this.serializer1 = RecordSerializer.get();
+		this.serializer2 = RecordSerializer.get();
+		this.comparator1 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
+		this.comparator2 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
+		this.pairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]{TestData.Key.class});
+	}
+	
+	@Test
+	public void testMerge() {
+		try {
+			
+			generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+			generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+
+			reader1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
+			reader2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+
+			// collect expected data
+			Map<TestData.Key, Collection<TestData.Value>> expectedValuesMap1 = collectData(generator1, INPUT_1_SIZE);
+			Map<TestData.Key, Collection<TestData.Value>> expectedValuesMap2 = collectData(generator2, INPUT_2_SIZE);
+			Map<TestData.Key, List<Collection<TestData.Value>>> expectedCoGroupsMap = coGroupValues(expectedValuesMap1, expectedValuesMap2);
+	
+			// reset the generators
+			generator1.reset();
+			generator2.reset();
+	
+			// compare with iterator values
+			ReusingSortMergeCoGroupIterator<Record, Record> iterator =	new ReusingSortMergeCoGroupIterator<Record, Record>(
+					this.reader1, this.reader2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
+					this.pairComparator);
+	
+			iterator.open();
+			
+			final TestData.Key key = new TestData.Key();
+			while (iterator.next())
+			{
+				Iterator<Record> iter1 = iterator.getValues1().iterator();
+				Iterator<Record> iter2 = iterator.getValues2().iterator();
+				
+				TestData.Value v1 = null;
+				TestData.Value v2 = null;
+				
+				if (iter1.hasNext()) {
+					Record rec = iter1.next();
+					rec.getFieldInto(0, key);
+					v1 = rec.getField(1, TestData.Value.class);
+				}
+				else if (iter2.hasNext()) {
+					Record rec = iter2.next();
+					rec.getFieldInto(0, key);
+					v2 = rec.getField(1, TestData.Value.class);
+				}
+				else {
+					Assert.fail("No input on both sides.");
+				}
+	
+				// assert that matches for this key exist
+				Assert.assertTrue("No matches for key " + key, expectedCoGroupsMap.containsKey(key));
+				
+				Collection<TestData.Value> expValues1 = expectedCoGroupsMap.get(key).get(0);
+				Collection<TestData.Value> expValues2 = expectedCoGroupsMap.get(key).get(1);
+				
+				if (v1 != null) {
+					expValues1.remove(v1);
+				}
+				else {
+					expValues2.remove(v2);
+				}
+				
+				while(iter1.hasNext()) {
+					Record rec = iter1.next();
+					Assert.assertTrue("Value not in expected set of first input", expValues1.remove(rec.getField(1, TestData.Value.class)));
+				}
+				Assert.assertTrue("Expected set of first input not empty", expValues1.isEmpty());
+				
+				while(iter2.hasNext()) {
+					Record rec = iter2.next();
+					Assert.assertTrue("Value not in expected set of second input", expValues2.remove(rec.getField(1, TestData.Value.class)));
+				}
+				Assert.assertTrue("Expected set of second input not empty", expValues2.isEmpty());
+	
+				expectedCoGroupsMap.remove(key);
+			}
+			iterator.close();
+	
+			Assert.assertTrue("Expected key set not empty", expectedCoGroupsMap.isEmpty());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	private Map<TestData.Key, List<Collection<TestData.Value>>> coGroupValues(
+			Map<TestData.Key, Collection<TestData.Value>> leftMap,
+			Map<TestData.Key, Collection<TestData.Value>> rightMap)
+	{
+		Map<TestData.Key, List<Collection<TestData.Value>>> map = new HashMap<TestData.Key, List<Collection<TestData.Value>>>(1000);
+
+		Set<TestData.Key> keySet = new HashSet<TestData.Key>(leftMap.keySet());
+		keySet.addAll(rightMap.keySet());
+		
+		for (TestData.Key key : keySet) {
+			Collection<TestData.Value> leftValues = leftMap.get(key);
+			Collection<TestData.Value> rightValues = rightMap.get(key);
+			ArrayList<Collection<TestData.Value>> list = new ArrayList<Collection<TestData.Value>>(2);
+			
+			if (leftValues == null) {
+				list.add(new ArrayList<TestData.Value>(0));
+			} else {
+				list.add(leftValues);
+			}
+			
+			if (rightValues == null) {
+				list.add(new ArrayList<TestData.Value>(0));
+			} else {
+				list.add(rightValues);
+			}
+			
+			map.put(key, list);
+		}
+		return map;
+	}
+
+	private Map<TestData.Key, Collection<TestData.Value>> collectData(Generator iter, int num)
+	throws Exception
+	{
+		Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
+		Record pair = new Record();
+		
+		for (int i = 0; i < num; i++) {
+			iter.next(pair);
+			TestData.Key key = pair.getField(0, TestData.Key.class);
+			
+			if (!map.containsKey(key)) {
+				map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
+			}
+
+			Collection<TestData.Value> values = map.get(key);
+			values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
+		}
+		return map;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java
deleted file mode 100644
index 9d6fafb..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- */
-public class SortMergeCoGroupIteratorITCase
-{
-	// the size of the left and right inputs
-	private static final int INPUT_1_SIZE = 20000;
-
-	private static final int INPUT_2_SIZE = 1000;
-
-	// random seeds for the left and right input data generators
-	private static final long SEED1 = 561349061987311L;
-
-	private static final long SEED2 = 231434613412342L;
-
-	// left and right input data generators
-	private Generator generator1;
-
-	private Generator generator2;
-
-	// left and right input RecordReader mocks
-	private MutableObjectIterator<Record> reader1;
-
-	private MutableObjectIterator<Record> reader2;
-	
-	
-	private TypeSerializer<Record> serializer1;
-	private TypeSerializer<Record> serializer2;
-	private TypeComparator<Record> comparator1;
-	private TypeComparator<Record> comparator2;
-	private TypePairComparator<Record, Record> pairComparator;
-
-
-	@SuppressWarnings("unchecked")
-	@Before
-	public void beforeTest() {
-		this.serializer1 = RecordSerializer.get();
-		this.serializer2 = RecordSerializer.get();
-		this.comparator1 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
-		this.comparator2 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
-		this.pairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]{TestData.Key.class});
-	}
-	
-	@Test
-	public void testMerge() {
-		try {
-			
-			generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-			generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-
-			reader1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			reader2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
-
-			// collect expected data
-			Map<TestData.Key, Collection<TestData.Value>> expectedValuesMap1 = collectData(generator1, INPUT_1_SIZE);
-			Map<TestData.Key, Collection<TestData.Value>> expectedValuesMap2 = collectData(generator2, INPUT_2_SIZE);
-			Map<TestData.Key, List<Collection<TestData.Value>>> expectedCoGroupsMap = coGroupValues(expectedValuesMap1, expectedValuesMap2);
-	
-			// reset the generators
-			generator1.reset();
-			generator2.reset();
-	
-			// compare with iterator values
-			SortMergeCoGroupIterator<Record, Record> iterator =	new SortMergeCoGroupIterator<Record, Record>(
-					this.reader1, this.reader2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
-					this.pairComparator);
-	
-			iterator.open();
-			
-			final TestData.Key key = new TestData.Key();
-			while (iterator.next())
-			{
-				Iterator<Record> iter1 = iterator.getValues1().iterator();
-				Iterator<Record> iter2 = iterator.getValues2().iterator();
-				
-				TestData.Value v1 = null;
-				TestData.Value v2 = null;
-				
-				if (iter1.hasNext()) {
-					Record rec = iter1.next();
-					rec.getFieldInto(0, key);
-					v1 = rec.getField(1, TestData.Value.class);
-				}
-				else if (iter2.hasNext()) {
-					Record rec = iter2.next();
-					rec.getFieldInto(0, key);
-					v2 = rec.getField(1, TestData.Value.class);
-				}
-				else {
-					Assert.fail("No input on both sides.");
-				}
-	
-				// assert that matches for this key exist
-				Assert.assertTrue("No matches for key " + key, expectedCoGroupsMap.containsKey(key));
-				
-				Collection<TestData.Value> expValues1 = expectedCoGroupsMap.get(key).get(0);
-				Collection<TestData.Value> expValues2 = expectedCoGroupsMap.get(key).get(1);
-				
-				if (v1 != null) {
-					expValues1.remove(v1);
-				}
-				else {
-					expValues2.remove(v2);
-				}
-				
-				while(iter1.hasNext()) {
-					Record rec = iter1.next();
-					Assert.assertTrue("Value not in expected set of first input", expValues1.remove(rec.getField(1, TestData.Value.class)));
-				}
-				Assert.assertTrue("Expected set of first input not empty", expValues1.isEmpty());
-				
-				while(iter2.hasNext()) {
-					Record rec = iter2.next();
-					Assert.assertTrue("Value not in expected set of second input", expValues2.remove(rec.getField(1, TestData.Value.class)));
-				}
-				Assert.assertTrue("Expected set of second input not empty", expValues2.isEmpty());
-	
-				expectedCoGroupsMap.remove(key);
-			}
-			iterator.close();
-	
-			Assert.assertTrue("Expected key set not empty", expectedCoGroupsMap.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("An exception occurred during the test: " + e.getMessage());
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	private Map<TestData.Key, List<Collection<TestData.Value>>> coGroupValues(
-			Map<TestData.Key, Collection<TestData.Value>> leftMap,
-			Map<TestData.Key, Collection<TestData.Value>> rightMap)
-	{
-		Map<TestData.Key, List<Collection<TestData.Value>>> map = new HashMap<TestData.Key, List<Collection<TestData.Value>>>(1000);
-
-		Set<TestData.Key> keySet = new HashSet<TestData.Key>(leftMap.keySet());
-		keySet.addAll(rightMap.keySet());
-		
-		for (TestData.Key key : keySet) {
-			Collection<TestData.Value> leftValues = leftMap.get(key);
-			Collection<TestData.Value> rightValues = rightMap.get(key);
-			ArrayList<Collection<TestData.Value>> list = new ArrayList<Collection<TestData.Value>>(2);
-			
-			if (leftValues == null) {
-				list.add(new ArrayList<TestData.Value>(0));
-			} else {
-				list.add(leftValues);
-			}
-			
-			if (rightValues == null) {
-				list.add(new ArrayList<TestData.Value>(0));
-			} else {
-				list.add(rightValues);
-			}
-			
-			map.put(key, list);
-		}
-		return map;
-	}
-
-	private Map<TestData.Key, Collection<TestData.Value>> collectData(Generator iter, int num)
-	throws Exception
-	{
-		Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
-		Record pair = new Record();
-		
-		for (int i = 0; i < num; i++) {
-			iter.next(pair);
-			TestData.Key key = pair.getField(0, TestData.Key.class);
-			
-			if (!map.containsKey(key)) {
-				map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
-			}
-
-			Collection<TestData.Value> values = map.get(key);
-			values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
-		}
-		return map;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 02206f6..0e169ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -18,9 +18,14 @@
 
 package org.apache.flink.runtime.operators.testutils;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -43,7 +48,10 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class DriverTestBase<S extends Function> implements PactTaskContext<S, Record> {
 	
 	protected static final long DEFAULT_PER_SORT_MEM = 16 * 1024 * 1024;
@@ -79,12 +87,14 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 	private PactDriver<S, Record> driver;
 	
 	private volatile boolean running;
+
+	private ExecutionConfig executionConfig;
 	
-	protected DriverTestBase(long memory, int maxNumSorters) {
-		this(memory, maxNumSorters, DEFAULT_PER_SORT_MEM);
+	protected DriverTestBase(ExecutionConfig executionConfig, long memory, int maxNumSorters) {
+		this(executionConfig, memory, maxNumSorters, DEFAULT_PER_SORT_MEM);
 	}
 	
-	protected DriverTestBase(long memory, int maxNumSorters, long perSortMemory) {
+	protected DriverTestBase(ExecutionConfig executionConfig, long memory, int maxNumSorters, long perSortMemory) {
 		if (memory < 0 || maxNumSorters < 0 || perSortMemory < 0) {
 			throw new IllegalArgumentException();
 		}
@@ -104,6 +114,27 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 		
 		this.config = new Configuration();
 		this.taskConfig = new TaskConfig(this.config);
+
+		this.executionConfig = executionConfig;
+	}
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Object[]> configs = new LinkedList<Object[]>();
+
+		ExecutionConfig withReuse = new ExecutionConfig();
+		withReuse.enableObjectReuse();
+
+		ExecutionConfig withoutReuse = new ExecutionConfig();
+		withoutReuse.disableObjectReuse();
+
+		Object[] a = { withoutReuse };
+		configs.add(a);
+		Object[] b = { withReuse };
+		configs.add(b);
+
+		return configs;
 	}
 
 	public void addInput(MutableObjectIterator<Record> input) {
@@ -141,16 +172,21 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 
 	@SuppressWarnings({"unchecked","rawtypes"})
 	public void testDriver(PactDriver driver, Class stubClass) throws Exception {
-		
+		testDriverInternal(driver, stubClass);
+	}
+
+	@SuppressWarnings({"unchecked","rawtypes"})
+	public void testDriverInternal(PactDriver driver, Class stubClass) throws Exception {
+
 		this.driver = driver;
 		driver.setup(this);
-		
+
 		this.stub = (S)stubClass.newInstance();
-		
+
 		// regular running logic
 		this.running = true;
 		boolean stubOpen = false;
-		
+
 		try {
 			// run the data preparation
 			try {
@@ -159,7 +195,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 			catch (Throwable t) {
 				throw new Exception("The data preparation caused an error: " + t.getMessage(), t);
 			}
-			
+
 			// open stub implementation
 			try {
 				FunctionUtils.openFunction(this.stub, getTaskConfig().getStubParameters());
@@ -168,16 +204,16 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 			catch (Throwable t) {
 				throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t);
 			}
-			
+
 			// run the user code
 			driver.run();
-			
+
 			// close. We close here such that a regular close throwing an exception marks a task as failed.
 			if (this.running) {
 				FunctionUtils.closeFunction (this.stub);
 				stubOpen = false;
 			}
-			
+
 			this.output.close();
 		}
 		catch (Exception ex) {
@@ -188,7 +224,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 				}
 				catch (Throwable t) {}
 			}
-			
+
 			// if resettable driver invoke treardown
 			if (this.driver instanceof ResettablePactDriver) {
 				final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver;
@@ -198,18 +234,18 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 					throw new Exception("Error while shutting down an iterative operator: " + t.getMessage(), t);
 				}
 			}
-			
+
 			// drop exception, if the task was canceled
 			if (this.running) {
 				throw ex;
 			}
-			
+
 		}
 		finally {
 			driver.cleanup();
 		}
 	}
-	
+
 	@SuppressWarnings({"unchecked","rawtypes"})
 	public void testResettableDriver(ResettablePactDriver driver, Class stubClass, int iterations) throws Exception {
 
@@ -242,6 +278,13 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 	public TaskConfig getTaskConfig() {
 		return this.taskConfig;
 	}
+
+
+
+	@Override
+	public ExecutionConfig getExecutionConfig() {
+		return executionConfig;
+	}
 	
 	@Override
 	public ClassLoader getUserCodeClassLoader() {

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 63a168b..0629ea0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -55,6 +55,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -192,8 +193,15 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 		public InputChannelResult readRecord(DeserializationDelegate<Record> target) throws IOException, InterruptedException {
 
 			Record reuse = target != null ? target.getInstance() : null;
+
+			// Handle NonReusingDeserializationDelegate, which by default
+			// does not have a Record instance
+			if (reuse == null && target != null) {
+				reuse = new Record();
+				target.setInstance(reuse);
+			}
 			
-			if ((reuse = it.next(reuse)) != null) {
+			if (it.next(reuse) != null) {
 				// everything comes from the same source channel and buffer in this mock
 				notifyRecordIsAvailable(0);
 				return InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER;

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index ba38776..e760a1e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -30,8 +30,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
 import org.apache.flink.runtime.operators.sort.MergeMatchIterator;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
@@ -183,8 +183,8 @@ public class HashVsSortMiniBenchmark {
 			long start = System.nanoTime();
 			
 			// compare with iterator values
-			final BuildFirstHashMatchIterator<Record, Record, Record> iterator = 
-					new BuildFirstHashMatchIterator<Record, Record, Record>(
+			final ReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator =
+					new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
 						input1, input2, this.serializer1.getSerializer(), this.comparator1, 
 							this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
 							this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE);
@@ -222,8 +222,8 @@ public class HashVsSortMiniBenchmark {
 			long start = System.nanoTime();
 			
 			// compare with iterator values
-			BuildSecondHashMatchIterator<Record, Record, Record> iterator = 
-					new BuildSecondHashMatchIterator<Record, Record, Record>(
+			ReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator =
+					new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
 						input1, input2, this.serializer1.getSerializer(), this.comparator1, 
 						this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
 						this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE);

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java
deleted file mode 100644
index 3d1a80b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test for the safe key grouped iterator, which advances in windows containing the same key and provides a sub-iterator
- * over the records with the same key.
- */
-public class KeyGroupedIteratorImmutableTest {
-	
-	private MutableObjectIterator<Record> sourceIter;		// the iterator that provides the input
-	
-	private KeyGroupedIteratorImmutable<Record> psi;					// the grouping iterator, progressing in key steps
-	
-	@Before
-	public void setup()
-	{
-		final ArrayList<IntStringPair> source = new ArrayList<IntStringPair>();
-		
-		// add elements to the source
-		source.add(new IntStringPair(new IntValue(1), new StringValue("A")));
-		source.add(new IntStringPair(new IntValue(2), new StringValue("B")));
-		source.add(new IntStringPair(new IntValue(3), new StringValue("C")));
-		source.add(new IntStringPair(new IntValue(3), new StringValue("D")));
-		source.add(new IntStringPair(new IntValue(4), new StringValue("E")));
-		source.add(new IntStringPair(new IntValue(4), new StringValue("F")));
-		source.add(new IntStringPair(new IntValue(4), new StringValue("G")));
-		source.add(new IntStringPair(new IntValue(5), new StringValue("H")));
-		source.add(new IntStringPair(new IntValue(5), new StringValue("I")));
-		source.add(new IntStringPair(new IntValue(5), new StringValue("J")));
-		source.add(new IntStringPair(new IntValue(5), new StringValue("K")));
-		source.add(new IntStringPair(new IntValue(5), new StringValue("L")));
-		
-		
-		this.sourceIter = new MutableObjectIterator<Record>() {
-			final Iterator<IntStringPair> it = source.iterator();
-			
-			@Override
-			public Record next(Record reuse) throws IOException {
-				if (it.hasNext()) {
-					IntStringPair pair = it.next();
-					reuse.setField(0, pair.getInteger());
-					reuse.setField(1, pair.getString());
-					return reuse;
-				}
-				else {
-					return null;
-				}
-			}
-
-			@Override
-			public Record next() throws IOException {
-				if (it.hasNext()) {
-					IntStringPair pair = it.next();
-					Record result = new Record(2);
-					result.setField(0, pair.getInteger());
-					result.setField(1, pair.getString());
-					return result;
-				}
-				else {
-					return null;
-				}
-			}
-		};
-		
-		final RecordSerializer serializer = RecordSerializer.get();
-		@SuppressWarnings("unchecked")
-		final RecordComparator comparator = new RecordComparator(new int[] {0}, new Class[] {IntValue.class});
-		
-		this.psi = new KeyGroupedIteratorImmutable<Record>(this.sourceIter, serializer, comparator);
-	}
-
-	@Test
-	public void testNextKeyOnly() throws Exception
-	{
-		try {
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-			Assert.assertNull("KeyGroupedIterator must not have another value.", this.psi.getValues());
-			
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test encountered an unexpected exception.");
-		}
-	}
-	
-	@Test
-	public void testFullIterationThroughAllValues() throws IOException
-	{
-		try {
-			// Key 1, Value A
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			// Key 2, Value B
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			// Key 3, Values C, D
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			try {
-				this.psi.getValues().next();
-				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
-			}
-			catch (NoSuchElementException nseex) {}
-			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
-			try {
-				this.psi.getValues().next();
-				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
-			}
-			catch (NoSuchElementException nseex) {}
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			// Key 4, Values E, F, G
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("F"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("G"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			// Key 5, Values H, I, J, K, L
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("J"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("K"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("L"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			try {
-				this.psi.getValues().next();
-				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
-			}
-			catch (NoSuchElementException nseex) {}
-			Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			try {
-				this.psi.getValues().next();
-				Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next().");
-			}
-			catch (NoSuchElementException nseex) {}
-			
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test encountered an unexpected exception.");
-		}
-	}
-	
-	@Test
-	public void testMixedProgress() throws Exception
-	{
-		try {
-			// Progression only via nextKey() and hasNext() - Key 1, Value A
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			
-			// Progression only through nextKey() - Key 2, Value B
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			
-			// Progression first though haNext() and next(), then through hasNext() - Key 3, Values C, D
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			
-			// Progression first via next() only, then hasNext() only Key 4, Values E, F, G
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			
-			// Key 5, Values H, I, J, K, L
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5))));
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext());
-			
-			// end
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-			Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey());
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test encountered an unexpected exception.");
-		}
-	}
-	
-	@Test
-	public void testHasNextDoesNotOverweiteCurrentRecord() throws Exception
-	{
-		try {
-			Iterator<Record> valsIter = null;
-			Record rec = null;
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			valsIter = this.psi.getValues();
-			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
-			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
-			rec = valsIter.next();
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class));			
-			Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			valsIter = this.psi.getValues();
-			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
-			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
-			rec = valsIter.next();
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext());
-			
-			Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey());
-			valsIter = this.psi.getValues();
-			Assert.assertNotNull("Returned Iterator must not be null", valsIter);
-			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
-			rec = valsIter.next();
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
-			Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class));
-			rec = valsIter.next();
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
-			Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue());
-			Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class));
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test encountered an unexpected exception.");
-		}
-	}
-	
-	private static final class IntStringPair
-	{
-		private final IntValue integer;
-		private final StringValue string;
-
-		IntStringPair(IntValue integer, StringValue string) {
-			this.integer = integer;
-			this.string = string;
-		}
-
-		public IntValue getInteger() {
-			return integer;
-		}
-
-		public StringValue getString() {
-			return string;
-		}
-	}
-}


Mime
View raw message