flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [06/13] flink git commit: [FLINK-1285] Make execution mode configurable
Date Thu, 08 Jan 2015 10:59:01 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
new file mode 100644
index 0000000..5012d1e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
+import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.hash.HashTableITCase.ConstantsKeyValuePairsIterator;
+import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
+import org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase.RecordMatch;
+import org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase
+		.RecordMatchRemovingJoin;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.TestData.Key;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.runtime.operators.testutils.UnionIterator;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Test specialized hash join that keeps the build side data (in memory and on hard disk)
+ * This is used for iterative tasks.
+ */
+@SuppressWarnings("deprecation")
+public class NonReusingReOpenableHashTableITCase {
+
+	private static final int PAGE_SIZE = 8 * 1024;
+	private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages.
+
+	private static final long SEED1 = 561349061987311L;
+	private static final long SEED2 = 231434613412342L;
+
+	private static final int NUM_PROBES = 3; // number of reopenings of hash join
+
+	private final AbstractInvokable parentTask = new DummyInvokable();
+
+	private IOManager ioManager;
+	private MemoryManager memoryManager;
+
+	private TypeSerializer<Record> recordSerializer;
+	private TypeComparator<Record> record1Comparator;
+	private TypeComparator<Record> record2Comparator;
+	private TypePairComparator<Record, Record> recordPairComparator;
+
+
+
+
+	private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
+	private TypeSerializer<Record> recordBuildSideAccesssor;
+	private TypeSerializer<Record> recordProbeSideAccesssor;
+	private TypeComparator<Record> recordBuildSideComparator;
+	private TypeComparator<Record> recordProbeSideComparator;
+	private TypePairComparator<Record, Record> pactRecordComparator;
+
+	@SuppressWarnings("unchecked")
+	@Before
+	public void beforeTest()
+	{
+		this.recordSerializer = RecordSerializer.get();
+
+		this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {Key.class});
+		this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {Key.class});
+		this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {Key.class});
+
+
+		final int[] keyPos = new int[] {0};
+		final Class<? extends Key>[] keyType = (Class<? extends Key>[]) new Class[] { IntValue.class };
+
+		this.recordBuildSideAccesssor = RecordSerializer.get();
+		this.recordProbeSideAccesssor = RecordSerializer.get();
+		this.recordBuildSideComparator = new RecordComparator(keyPos, keyType);
+		this.recordProbeSideComparator = new RecordComparator(keyPos, keyType);
+		this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
+
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1, PAGE_SIZE);
+		this.ioManager = new IOManagerAsync();
+	}
+
+	@After
+	public void afterTest()
+	{
+		if (this.ioManager != null) {
+			this.ioManager.shutdown();
+			if (!this.ioManager.isProperlyShutDown()) {
+				Assert.fail("I/O manager failed to properly shut down.");
+			}
+			this.ioManager = null;
+		}
+
+		if (this.memoryManager != null) {
+			Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
+				this.memoryManager.verifyEmpty());
+			this.memoryManager.shutdown();
+			this.memoryManager = null;
+		}
+	}
+
+
+	/**
+	 * Test behavior with overflow buckets (Overflow buckets must be initialized correctly
+	 * if the input is reopened again)
+	 */
+	@Test
+	public void testOverflow() {
+
+		int buildSize = 1000;
+		int probeSize = 1000;
+		try {
+			Generator bgen = new Generator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+			Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+
+			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
+			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+			doTest(buildInput,probeInput, bgen, pgen);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+
+	/**
+	 * Verify proper operation if the build side is spilled to disk.
+	 */
+	@Test
+	public void testDoubleProbeSpilling() {
+
+		int buildSize = 1000;
+		int probeSize = 1000;
+		try {
+			Generator bgen = new Generator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+
+			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
+			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+			doTest(buildInput,probeInput, bgen, pgen);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+
+	/**
+	 * This test case verifies that hybrid hash join is able to handle multiple probe phases
+	 * when the build side fits completely into memory.
+	 */
+	@Test
+	public void testDoubleProbeInMemory() {
+
+		int buildSize = 1000;
+		int probeSize = 1000;
+		try {
+			Generator bgen = new Generator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			Generator pgen = new Generator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+
+			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
+			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+
+			doTest(buildInput,probeInput, bgen, pgen);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+
+	private void doTest(TestData.GeneratorIterator buildInput, TestData.GeneratorIterator probeInput, Generator bgen, Generator pgen) throws Exception {
+		// collect expected data
+		final Map<Key, Collection<RecordMatch>> expectedFirstMatchesMap = NonReusingHashMatchIteratorITCase.matchRecordValues(NonReusingHashMatchIteratorITCase.collectRecordData(buildInput), NonReusingHashMatchIteratorITCase.collectRecordData(probeInput));
+
+		final List<Map<Key, Collection<RecordMatch>>> expectedNMatchesMapList = new ArrayList<Map<Key,Collection<RecordMatch>>>(NUM_PROBES);
+		final JoinFunction[] nMatcher = new RecordMatchRemovingJoin[NUM_PROBES];
+		for(int i = 0; i < NUM_PROBES; i++) {
+			Map<Key, Collection<RecordMatch>> tmp;
+			expectedNMatchesMapList.add(tmp = deepCopy(expectedFirstMatchesMap));
+			nMatcher[i] = new RecordMatchRemovingJoin(tmp);
+		}
+
+		final JoinFunction firstMatcher = new RecordMatchRemovingJoin(expectedFirstMatchesMap);
+
+		final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+
+		// reset the generators
+		bgen.reset();
+		pgen.reset();
+		buildInput.reset();
+		probeInput.reset();
+
+		// compare with iterator values
+		NonReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record> iterator =
+				new NonReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
+						buildInput, probeInput, this.recordSerializer, this.record1Comparator,
+					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+					this.memoryManager, ioManager, this.parentTask, 1.0);
+
+		iterator.open();
+		// do first join with both inputs
+		while (iterator.callWithNextKey(firstMatcher, collector));
+
+		// assert that each expected match was seen for the first input
+		for (Entry<Key, Collection<RecordMatch>> entry : expectedFirstMatchesMap.entrySet()) {
+			if (!entry.getValue().isEmpty()) {
+				Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+			}
+		}
+
+		for(int i = 0; i < NUM_PROBES; i++) {
+			pgen.reset();
+			probeInput.reset();
+			// prepare ..
+			iterator.reopenProbe(probeInput);
+			// .. and do second join
+			while (iterator.callWithNextKey(nMatcher[i], collector));
+
+			// assert that each expected match was seen for the second input
+			for (Entry<Key, Collection<RecordMatch>> entry : expectedNMatchesMapList.get(i).entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+
+		iterator.close();
+	}
+
+	//
+	//
+	//	Tests taken from HahTableITCase!
+	//
+	//
+
+	private final MutableObjectIterator<Record> getProbeInput(final int numKeys,
+			final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
+		MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true);
+		MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
+		MutableObjectIterator<Record> probe3 = new ConstantsKeyValuePairsIterator(repeatedValue2, 23, 5);
+		List<MutableObjectIterator<Record>> probes = new ArrayList<MutableObjectIterator<Record>>();
+		probes.add(probe1);
+		probes.add(probe2);
+		probes.add(probe3);
+		return new UnionIterator<Record>(probes);
+	}
+
+	@Test
+	public void testSpillingHashJoinWithMassiveCollisions() throws IOException
+	{
+		// the following two values are known to have a hash-code collision on the initial level.
+		// we use them to make sure one partition grows over-proportionally large
+		final int REPEATED_VALUE_1 = 40559;
+		final int REPEATED_VALUE_2 = 92882;
+		final int REPEATED_VALUE_COUNT_BUILD = 200000;
+		final int REPEATED_VALUE_COUNT_PROBE = 5;
+
+		final int NUM_KEYS = 1000000;
+		final int BUILD_VALS_PER_KEY = 3;
+		final int PROBE_VALS_PER_KEY = 10;
+
+		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
+		MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+		MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
+		MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
+		List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>();
+		builds.add(build1);
+		builds.add(build2);
+		builds.add(build3);
+		MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
+
+
+
+
+		// allocate the memory for the HashTable
+		List<MemorySegment> memSegments;
+		try {
+			memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896);
+		}
+		catch (MemoryAllocationException maex) {
+			fail("Memory for the Join could not be provided.");
+			return;
+		}
+
+		// create the map for validating the results
+		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
+
+		// ----------------------------------------------------------------------------------------
+
+		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
+				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
+				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
+				memSegments, ioManager);
+
+		for(int probe = 0; probe < NUM_PROBES; probe++) {
+			// create a probe input that gives 10 million pairs with 10 values sharing a key
+			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+			if(probe == 0) {
+				join.open(buildInput, probeInput);
+			} else {
+				join.reopenProbe(probeInput);
+			}
+
+			Record record;
+			final Record recordReuse = new Record();
+
+			while (join.nextRecord())
+			{
+				int numBuildValues = 0;
+
+				final Record probeRec = join.getCurrentProbeRecord();
+				int key = probeRec.getField(0, IntValue.class).getValue();
+
+				HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
+				if ((record = buildSide.next(recordReuse)) != null) {
+					numBuildValues = 1;
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+				}
+				else {
+					fail("No build side values found for a probe key.");
+				}
+				while ((record = buildSide.next(record)) != null) {
+					numBuildValues++;
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+				}
+
+				Long contained = map.get(key);
+				if (contained == null) {
+					contained = Long.valueOf(numBuildValues);
+				}
+				else {
+					contained = Long.valueOf(contained.longValue() + numBuildValues);
+				}
+
+				map.put(key, contained);
+			}
+		}
+
+		join.close();
+
+		Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size());
+		for (Entry<Integer, Long> entry : map.entrySet()) {
+			long val = entry.getValue();
+			int key = entry.getKey();
+
+			if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) {
+				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
+							(PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val);
+			} else {
+				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
+							PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val);
+			}
+		}
+
+
+		// ----------------------------------------------------------------------------------------
+
+		this.memoryManager.release(join.getFreedMemory());
+	}
+
+	/*
+	 * This test is basically identical to the "testSpillingHashJoinWithMassiveCollisions" test, only that the number
+	 * of repeated values (causing bucket collisions) are large enough to make sure that their target partition no longer
+	 * fits into memory by itself and needs to be repartitioned in the recursion again.
+	 */
+	@Test
+	public void testSpillingHashJoinWithTwoRecursions() throws IOException
+	{
+		// the following two values are known to have a hash-code collision on the first recursion level.
+		// we use them to make sure one partition grows over-proportionally large
+		final int REPEATED_VALUE_1 = 40559;
+		final int REPEATED_VALUE_2 = 92882;
+		final int REPEATED_VALUE_COUNT_BUILD = 200000;
+		final int REPEATED_VALUE_COUNT_PROBE = 5;
+
+		final int NUM_KEYS = 1000000;
+		final int BUILD_VALS_PER_KEY = 3;
+		final int PROBE_VALS_PER_KEY = 10;
+
+		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
+		MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+		MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
+		MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
+		List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>();
+		builds.add(build1);
+		builds.add(build2);
+		builds.add(build3);
+		MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
+
+
+		// allocate the memory for the HashTable
+		List<MemorySegment> memSegments;
+		try {
+			memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896);
+		}
+		catch (MemoryAllocationException maex) {
+			fail("Memory for the Join could not be provided.");
+			return;
+		}
+
+		// create the map for validating the results
+		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
+
+		// ----------------------------------------------------------------------------------------
+
+		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
+				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
+				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
+				memSegments, ioManager);
+		for(int probe = 0; probe < NUM_PROBES; probe++) {
+			// create a probe input that gives 10 million pairs with 10 values sharing a key
+			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+			if(probe == 0) {
+				join.open(buildInput, probeInput);
+			} else {
+				join.reopenProbe(probeInput);
+			}
+			Record record;
+			final Record recordReuse = new Record();
+
+			while (join.nextRecord())
+			{
+				int numBuildValues = 0;
+
+				final Record probeRec = join.getCurrentProbeRecord();
+				int key = probeRec.getField(0, IntValue.class).getValue();
+
+				HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
+				if ((record = buildSide.next(recordReuse)) != null) {
+					numBuildValues = 1;
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+				}
+				else {
+					fail("No build side values found for a probe key.");
+				}
+				while ((record = buildSide.next(recordReuse)) != null) {
+					numBuildValues++;
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+				}
+
+				Long contained = map.get(key);
+				if (contained == null) {
+					contained = Long.valueOf(numBuildValues);
+				}
+				else {
+					contained = Long.valueOf(contained.longValue() + numBuildValues);
+				}
+
+				map.put(key, contained);
+			}
+		}
+
+		join.close();
+		Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size());
+		for (Entry<Integer, Long> entry : map.entrySet()) {
+			long val = entry.getValue();
+			int key = entry.getKey();
+
+			if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) {
+				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
+							(PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val);
+			} else {
+				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
+							PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val);
+			}
+		}
+
+
+		// ----------------------------------------------------------------------------------------
+
+		this.memoryManager.release(join.getFreedMemory());
+	}
+
+
+	static Map<Key, Collection<RecordMatch>> deepCopy(Map<Key, Collection<RecordMatch>> expectedSecondMatchesMap) {
+		Map<Key, Collection<RecordMatch>> copy = new HashMap<Key, Collection<RecordMatch>>(expectedSecondMatchesMap.size());
+		for(Entry<Key, Collection<RecordMatch>> entry : expectedSecondMatchesMap.entrySet()) {
+			List<RecordMatch> matches = new ArrayList<RecordMatch>(entry.getValue().size());
+			for(RecordMatch m : entry.getValue()) {
+				matches.add(m);
+			}
+			copy.put(entry.getKey(), matches);
+		}
+		return copy;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
deleted file mode 100644
index 71f1979..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
+++ /dev/null
@@ -1,534 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.hash.HashMatchIteratorITCase.RecordMatch;
-import org.apache.flink.runtime.operators.hash.HashMatchIteratorITCase.RecordMatchRemovingJoin;
-import org.apache.flink.runtime.operators.hash.HashTableITCase.ConstantsKeyValuePairsIterator;
-import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
-import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test specialized hash join that keeps the build side data (in memory and on hard disk)
- * This is used for iterative tasks.
- */
-@SuppressWarnings("deprecation")
-public class ReOpenableHashTableITCase {
-	
-	private static final int PAGE_SIZE = 8 * 1024;
-	private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages.
-
-	private static final long SEED1 = 561349061987311L;
-	private static final long SEED2 = 231434613412342L;
-	
-	private static final int NUM_PROBES = 3; // number of reopenings of hash join
-	
-	private final AbstractInvokable parentTask = new DummyInvokable();
-
-	private IOManager ioManager;
-	private MemoryManager memoryManager;
-	
-	private TypeSerializer<Record> recordSerializer;
-	private TypeComparator<Record> record1Comparator;
-	private TypeComparator<Record> record2Comparator;
-	private TypePairComparator<Record, Record> recordPairComparator;
-	
-	
-	
-	
-	private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
-	private TypeSerializer<Record> recordBuildSideAccesssor;
-	private TypeSerializer<Record> recordProbeSideAccesssor;
-	private TypeComparator<Record> recordBuildSideComparator;
-	private TypeComparator<Record> recordProbeSideComparator;
-	private TypePairComparator<Record, Record> pactRecordComparator;
-
-	@SuppressWarnings("unchecked")
-	@Before
-	public void beforeTest()
-	{
-		this.recordSerializer = RecordSerializer.get();
-		
-		this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
-		this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
-		this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class});
-		
-		
-		final int[] keyPos = new int[] {0};
-		final Class<? extends Key>[] keyType = (Class<? extends Key>[]) new Class[] { IntValue.class };
-		
-		this.recordBuildSideAccesssor = RecordSerializer.get();
-		this.recordProbeSideAccesssor = RecordSerializer.get();
-		this.recordBuildSideComparator = new RecordComparator(keyPos, keyType);
-		this.recordProbeSideComparator = new RecordComparator(keyPos, keyType);
-		this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
-		
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1, PAGE_SIZE);
-		this.ioManager = new IOManagerAsync();
-	}
-
-	@After
-	public void afterTest()
-	{
-		if (this.ioManager != null) {
-			this.ioManager.shutdown();
-			if (!this.ioManager.isProperlyShutDown()) {
-				Assert.fail("I/O manager failed to properly shut down.");
-			}
-			this.ioManager = null;
-		}
-		
-		if (this.memoryManager != null) {
-			Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
-				this.memoryManager.verifyEmpty());
-			this.memoryManager.shutdown();
-			this.memoryManager = null;
-		}
-	}
-	
-	
-	/**
-	 * Test behavior with overflow buckets (Overflow buckets must be initialized correctly 
-	 * if the input is reopened again)
-	 */
-	@Test
-	public void testOverflow() {
-		
-		int buildSize = 1000;
-		int probeSize = 1000;
-		try {
-			Generator bgen = new Generator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
-			Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			
-			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
-			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
-			doTest(buildInput,probeInput, bgen, pgen);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("An exception occurred during the test: " + e.getMessage());
-		}
-	}
-	
-	/**
-	 * Verify proper operation if the build side is spilled to disk.
-	 */
-	@Test
-	public void testDoubleProbeSpilling() {
-		
-		int buildSize = 1000;
-		int probeSize = 1000;
-		try {
-			Generator bgen = new Generator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			
-			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
-			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
-			doTest(buildInput,probeInput, bgen, pgen);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("An exception occurred during the test: " + e.getMessage());
-		}
-	}
-	
-	/**
-	 * This test case verifies that hybrid hash join is able to handle multiple probe phases
-	 * when the build side fits completely into memory.
-	 */
-	@Test
-	public void testDoubleProbeInMemory() {
-		
-		int buildSize = 1000;
-		int probeSize = 1000;
-		try {
-			Generator bgen = new Generator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			Generator pgen = new Generator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			
-			final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
-			final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
-			
-			doTest(buildInput,probeInput, bgen, pgen);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("An exception occurred during the test: " + e.getMessage());
-		}
-	}
-	
-	private void doTest(TestData.GeneratorIterator buildInput, TestData.GeneratorIterator probeInput, Generator bgen, Generator pgen) throws Exception {
-		// collect expected data
-		final Map<TestData.Key, Collection<RecordMatch>> expectedFirstMatchesMap = HashMatchIteratorITCase.matchRecordValues(
-			HashMatchIteratorITCase.collectRecordData(buildInput),
-			HashMatchIteratorITCase.collectRecordData(probeInput));
-		
-		final List<Map<TestData.Key, Collection<RecordMatch>>> expectedNMatchesMapList = new ArrayList<Map<Key,Collection<RecordMatch>>>(NUM_PROBES);
-		final JoinFunction[] nMatcher = new RecordMatchRemovingJoin[NUM_PROBES];
-		for(int i = 0; i < NUM_PROBES; i++) {
-			Map<TestData.Key, Collection<RecordMatch>> tmp;
-			expectedNMatchesMapList.add(tmp = deepCopy(expectedFirstMatchesMap));
-			nMatcher[i] = new RecordMatchRemovingJoin(tmp);
-		}
-		
-		final JoinFunction firstMatcher = new RecordMatchRemovingJoin(expectedFirstMatchesMap);
-		
-		final Collector<Record> collector = new DiscardingOutputCollector<Record>();
-
-		// reset the generators
-		bgen.reset();
-		pgen.reset();
-		buildInput.reset();
-		probeInput.reset();
-
-		// compare with iterator values
-		BuildFirstReOpenableHashMatchIterator<Record, Record, Record> iterator = 
-				new BuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
-						buildInput, probeInput, this.recordSerializer, this.record1Comparator, 
-					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0);
-		
-		iterator.open();
-		// do first join with both inputs
-		while (iterator.callWithNextKey(firstMatcher, collector));
-
-		// assert that each expected match was seen for the first input
-		for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedFirstMatchesMap.entrySet()) {
-			if (!entry.getValue().isEmpty()) {
-				Assert.fail("Collection for key " + entry.getKey() + " is not empty");
-			}
-		}
-		
-		for(int i = 0; i < NUM_PROBES; i++) {
-			pgen.reset();
-			probeInput.reset();
-			// prepare ..
-			iterator.reopenProbe(probeInput);
-			// .. and do second join
-			while (iterator.callWithNextKey(nMatcher[i], collector));
-			
-			// assert that each expected match was seen for the second input
-			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedNMatchesMapList.get(i).entrySet()) {
-				if (!entry.getValue().isEmpty()) {
-					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
-				}
-			}
-		}
-		
-		iterator.close();
-	}
-	
-	//
-	//
-	//	Tests taken from HahTableITCase!
-	//
-	//
-	
-	private final MutableObjectIterator<Record> getProbeInput(final int numKeys,
-			final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
-		MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true);
-		MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
-		MutableObjectIterator<Record> probe3 = new ConstantsKeyValuePairsIterator(repeatedValue2, 23, 5);
-		List<MutableObjectIterator<Record>> probes = new ArrayList<MutableObjectIterator<Record>>();
-		probes.add(probe1);
-		probes.add(probe2);
-		probes.add(probe3);
-		return new UnionIterator<Record>(probes);
-	}
-	
-	@Test
-	public void testSpillingHashJoinWithMassiveCollisions() throws IOException
-	{
-		// the following two values are known to have a hash-code collision on the initial level.
-		// we use them to make sure one partition grows over-proportionally large
-		final int REPEATED_VALUE_1 = 40559;
-		final int REPEATED_VALUE_2 = 92882;
-		final int REPEATED_VALUE_COUNT_BUILD = 200000;
-		final int REPEATED_VALUE_COUNT_PROBE = 5;
-		
-		final int NUM_KEYS = 1000000;
-		final int BUILD_VALS_PER_KEY = 3;
-		final int PROBE_VALS_PER_KEY = 10;
-		
-		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
-		MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-		MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
-		MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
-		List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>();
-		builds.add(build1);
-		builds.add(build2);
-		builds.add(build3);
-		MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
-	
-		
-		
-
-		// allocate the memory for the HashTable
-		List<MemorySegment> memSegments;
-		try {
-			memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896);
-		}
-		catch (MemoryAllocationException maex) {
-			fail("Memory for the Join could not be provided.");
-			return;
-		}
-		
-		// create the map for validating the results
-		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
-		
-		// ----------------------------------------------------------------------------------------
-		
-		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
-				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, 
-				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
-				memSegments, ioManager);
-		
-		for(int probe = 0; probe < NUM_PROBES; probe++) {
-			// create a probe input that gives 10 million pairs with 10 values sharing a key
-			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
-			if(probe == 0) {
-				join.open(buildInput, probeInput);
-			} else {
-				join.reopenProbe(probeInput);
-			}
-		
-			Record record;
-			final Record recordReuse = new Record();
-
-			while (join.nextRecord())
-			{
-				int numBuildValues = 0;
-		
-				final Record probeRec = join.getCurrentProbeRecord();
-				int key = probeRec.getField(0, IntValue.class).getValue();
-				
-				HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
-				if ((record = buildSide.next(recordReuse)) != null) {
-					numBuildValues = 1;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); 
-				}
-				else {
-					fail("No build side values found for a probe key.");
-				}
-				while ((record = buildSide.next(record)) != null) {
-					numBuildValues++;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
-				}
-				
-				Long contained = map.get(key);
-				if (contained == null) {
-					contained = Long.valueOf(numBuildValues);
-				}
-				else {
-					contained = Long.valueOf(contained.longValue() + numBuildValues);
-				}
-				
-				map.put(key, contained);
-			}
-		}
-		
-		join.close();
-		
-		Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size());
-		for (Map.Entry<Integer, Long> entry : map.entrySet()) {
-			long val = entry.getValue();
-			int key = entry.getKey();
-	
-			if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) {
-				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, 
-							(PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val);
-			} else {
-				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, 
-							PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val);
-			}
-		}
-		
-		
-		// ----------------------------------------------------------------------------------------
-		
-		this.memoryManager.release(join.getFreedMemory());
-	}
-	
-	/*
-	 * This test is basically identical to the "testSpillingHashJoinWithMassiveCollisions" test, only that the number
-	 * of repeated values (causing bucket collisions) are large enough to make sure that their target partition no longer
-	 * fits into memory by itself and needs to be repartitioned in the recursion again.
-	 */
-	@Test
-	public void testSpillingHashJoinWithTwoRecursions() throws IOException
-	{
-		// the following two values are known to have a hash-code collision on the first recursion level.
-		// we use them to make sure one partition grows over-proportionally large
-		final int REPEATED_VALUE_1 = 40559;
-		final int REPEATED_VALUE_2 = 92882;
-		final int REPEATED_VALUE_COUNT_BUILD = 200000;
-		final int REPEATED_VALUE_COUNT_PROBE = 5;
-		
-		final int NUM_KEYS = 1000000;
-		final int BUILD_VALS_PER_KEY = 3;
-		final int PROBE_VALS_PER_KEY = 10;
-		
-		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
-		MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-		MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
-		MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
-		List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>();
-		builds.add(build1);
-		builds.add(build2);
-		builds.add(build3);
-		MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
-	
-
-		// allocate the memory for the HashTable
-		List<MemorySegment> memSegments;
-		try {
-			memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896);
-		}
-		catch (MemoryAllocationException maex) {
-			fail("Memory for the Join could not be provided.");
-			return;
-		}
-		
-		// create the map for validating the results
-		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
-		
-		// ----------------------------------------------------------------------------------------
-		
-		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
-				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, 
-				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
-				memSegments, ioManager);
-		for(int probe = 0; probe < NUM_PROBES; probe++) {
-			// create a probe input that gives 10 million pairs with 10 values sharing a key
-			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
-			if(probe == 0) {
-				join.open(buildInput, probeInput);
-			} else {
-				join.reopenProbe(probeInput);
-			}
-			Record record;
-			final Record recordReuse = new Record();
-
-			while (join.nextRecord())
-			{	
-				int numBuildValues = 0;
-				
-				final Record probeRec = join.getCurrentProbeRecord();
-				int key = probeRec.getField(0, IntValue.class).getValue();
-				
-				HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
-				if ((record = buildSide.next(recordReuse)) != null) {
-					numBuildValues = 1;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); 
-				}
-				else {
-					fail("No build side values found for a probe key.");
-				}
-				while ((record = buildSide.next(recordReuse)) != null) {
-					numBuildValues++;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
-				}
-				
-				Long contained = map.get(key);
-				if (contained == null) {
-					contained = Long.valueOf(numBuildValues);
-				}
-				else {
-					contained = Long.valueOf(contained.longValue() + numBuildValues);
-				}
-				
-				map.put(key, contained);
-			}
-		}
-		
-		join.close();
-		Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size());
-		for (Map.Entry<Integer, Long> entry : map.entrySet()) {
-			long val = entry.getValue();
-			int key = entry.getKey();
-	
-			if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) {
-				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, 
-							(PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val);
-			} else {
-				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, 
-							PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val);
-			}
-		}
-		
-		
-		// ----------------------------------------------------------------------------------------
-		
-		this.memoryManager.release(join.getFreedMemory());
-	}
-	
-	
-	static Map<Key, Collection<RecordMatch>> deepCopy(Map<Key, Collection<RecordMatch>> expectedSecondMatchesMap) {
-		Map<Key, Collection<RecordMatch>> copy = new HashMap<Key, Collection<RecordMatch>>(expectedSecondMatchesMap.size());
-		for(Map.Entry<Key, Collection<RecordMatch>> entry : expectedSecondMatchesMap.entrySet()) {
-			List<RecordMatch> matches = new ArrayList<RecordMatch>(entry.getValue().size());
-			for(RecordMatch m : entry.getValue()) {
-				matches.add(m);
-			}
-			copy.put(entry.getKey(), matches);
-		}
-		return copy;
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
new file mode 100644
index 0000000..18cd8d0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
@@ -0,0 +1,778 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
+import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
+import org.apache.flink.runtime.operators.testutils.UnionIterator;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.types.IntPair;
+import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
+import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.NullKeyFieldException;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "deprecation"})
+public class ReusingHashMatchIteratorITCase {
+	
+	private static final int MEMORY_SIZE = 16000000;		// total memory
+
+	private static final int INPUT_1_SIZE = 20000;
+	private static final int INPUT_2_SIZE = 1000;
+
+	private static final long SEED1 = 561349061987311L;
+	private static final long SEED2 = 231434613412342L;
+	
+	private final AbstractInvokable parentTask = new DummyInvokable();
+
+	private IOManager ioManager;
+	private MemoryManager memoryManager;
+	
+	private TypeSerializer<Record> recordSerializer;
+	private TypeComparator<Record> record1Comparator;
+	private TypeComparator<Record> record2Comparator;
+	private TypePairComparator<Record, Record> recordPairComparator;
+	
+	private TypeSerializer<IntPair> pairSerializer;
+	private TypeComparator<IntPair> pairComparator;
+	private TypePairComparator<IntPair, Record> pairRecordPairComparator;
+	private TypePairComparator<Record, IntPair> recordPairPairComparator;
+
+
+	@SuppressWarnings("unchecked")
+	@Before
+	public void beforeTest() {
+		this.recordSerializer = RecordSerializer.get();
+		
+		this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
+		this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
+		
+		this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class});
+		
+		this.pairSerializer = new IntPairSerializer();
+		this.pairComparator = new IntPairComparator();
+		this.pairRecordPairComparator = new IntPairRecordPairComparator();
+		this.recordPairPairComparator = new RecordIntPairPairComparator();
+		
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+		this.ioManager = new IOManagerAsync();
+	}
+
+	@After
+	public void afterTest() {
+		if (this.ioManager != null) {
+			this.ioManager.shutdown();
+			if (!this.ioManager.isProperlyShutDown()) {
+				Assert.fail("I/O manager failed to properly shut down.");
+			}
+			this.ioManager = null;
+		}
+		
+		if (this.memoryManager != null) {
+			Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
+				this.memoryManager.verifyEmpty());
+			this.memoryManager.shutdown();
+			this.memoryManager = null;
+		}
+	}
+
+
+	@Test
+	public void testBuildFirst() {
+		try {
+			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			
+			final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			
+			// collect expected data
+			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
+				collectRecordData(input1),
+				collectRecordData(input2));
+			
+			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+	
+			// reset the generators
+			generator1.reset();
+			generator2.reset();
+			input1.reset();
+			input2.reset();
+	
+			// compare with iterator values
+			ReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator =
+					new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+						input1, input2, this.recordSerializer, this.record1Comparator, 
+						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+						this.memoryManager, ioManager, this.parentTask, 1.0);
+			
+			iterator.open();
+			
+			while (iterator.callWithNextKey(matcher, collector));
+			
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBuildFirstWithHighNumberOfCommonKeys()
+	{
+		// the size of the left and right inputs
+		final int INPUT_1_SIZE = 200;
+		final int INPUT_2_SIZE = 100;
+		
+		final int INPUT_1_DUPLICATES = 10;
+		final int INPUT_2_DUPLICATES = 2000;
+		final int DUPLICATE_KEY = 13;
+		
+		try {
+			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			
+			final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			
+			final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+			final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+			
+			final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>();
+			inList1.add(gen1Iter);
+			inList1.add(const1Iter);
+			
+			final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>();
+			inList2.add(gen2Iter);
+			inList2.add(const2Iter);
+			
+			MutableObjectIterator<Record> input1 = new UnionIterator<Record>(inList1);
+			MutableObjectIterator<Record> input2 = new UnionIterator<Record>(inList2);
+			
+			
+			// collect expected data
+			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
+				collectRecordData(input1),
+				collectRecordData(input2));
+			
+			// re-create the whole thing for actual processing
+			
+			// reset the generators and iterators
+			generator1.reset();
+			generator2.reset();
+			const1Iter.reset();
+			const2Iter.reset();
+			gen1Iter.reset();
+			gen2Iter.reset();
+			
+			inList1.clear();
+			inList1.add(gen1Iter);
+			inList1.add(const1Iter);
+			
+			inList2.clear();
+			inList2.add(gen2Iter);
+			inList2.add(const2Iter);
+	
+			input1 = new UnionIterator<Record>(inList1);
+			input2 = new UnionIterator<Record>(inList2);
+			
+			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+	
+			ReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator =
+					new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+						input1, input2, this.recordSerializer, this.record1Comparator, 
+						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+						this.memoryManager, ioManager, this.parentTask, 1.0);
+
+			iterator.open();
+			
+			while (iterator.callWithNextKey(matcher, collector));
+			
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBuildSecond() {
+		try {
+			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			
+			final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			
+			// collect expected data
+			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
+				collectRecordData(input1),
+				collectRecordData(input2));
+			
+			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+	
+			// reset the generators
+			generator1.reset();
+			generator2.reset();
+			input1.reset();
+			input2.reset();
+	
+			// compare with iterator values			
+			ReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator =
+				new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
+					input1, input2, this.recordSerializer, this.record1Comparator, 
+					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+					this.memoryManager, ioManager, this.parentTask, 1.0);
+
+			iterator.open();
+			
+			while (iterator.callWithNextKey(matcher, collector));
+			
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBuildSecondWithHighNumberOfCommonKeys()
+	{
+		// the size of the left and right inputs
+		final int INPUT_1_SIZE = 200;
+		final int INPUT_2_SIZE = 100;
+		
+		final int INPUT_1_DUPLICATES = 10;
+		final int INPUT_2_DUPLICATES = 2000;
+		final int DUPLICATE_KEY = 13;
+		
+		try {
+			Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			
+			final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			
+			final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+			final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+			
+			final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>();
+			inList1.add(gen1Iter);
+			inList1.add(const1Iter);
+			
+			final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>();
+			inList2.add(gen2Iter);
+			inList2.add(const2Iter);
+			
+			MutableObjectIterator<Record> input1 = new UnionIterator<Record>(inList1);
+			MutableObjectIterator<Record> input2 = new UnionIterator<Record>(inList2);
+			
+			
+			// collect expected data
+			final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
+				collectRecordData(input1),
+				collectRecordData(input2));
+			
+			// re-create the whole thing for actual processing
+			
+			// reset the generators and iterators
+			generator1.reset();
+			generator2.reset();
+			const1Iter.reset();
+			const2Iter.reset();
+			gen1Iter.reset();
+			gen2Iter.reset();
+			
+			inList1.clear();
+			inList1.add(gen1Iter);
+			inList1.add(const1Iter);
+			
+			inList2.clear();
+			inList2.add(gen2Iter);
+			inList2.add(const2Iter);
+	
+			input1 = new UnionIterator<Record>(inList1);
+			input2 = new UnionIterator<Record>(inList2);
+			
+			final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
+			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+	
+			ReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator =
+				new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
+					input1, input2, this.recordSerializer, this.record1Comparator, 
+					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+					this.memoryManager, ioManager, this.parentTask, 1.0);
+			
+			iterator.open();
+			
+			while (iterator.callWithNextKey(matcher, collector));
+			
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBuildFirstWithMixedDataTypes() {
+		try {
+			MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
+			
+			final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			
+			// collect expected data
+			final Map<TestData.Key, Collection<RecordIntPairMatch>> expectedMatchesMap = matchRecordIntPairValues(
+				collectIntPairData(input1),
+				collectRecordData(input2));
+			
+			final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
+			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+	
+			// reset the generators
+			input1 = new UniformIntPairGenerator(500, 40, false);
+			generator2.reset();
+			input2.reset();
+	
+			// compare with iterator values
+			ReusingBuildSecondHashMatchIterator<IntPair, Record, Record> iterator =
+					new ReusingBuildSecondHashMatchIterator<IntPair, Record, Record>(
+						input1, input2, this.pairSerializer, this.pairComparator,
+						this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
+						this.memoryManager, this.ioManager, this.parentTask, 1.0);
+			
+			iterator.open();
+			
+			while (iterator.callWithNextKey(matcher, collector));
+			
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBuildSecondWithMixedDataTypes() {
+		try {
+			MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
+			
+			final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+			
+			// collect expected data
+			final Map<TestData.Key, Collection<RecordIntPairMatch>> expectedMatchesMap = matchRecordIntPairValues(
+				collectIntPairData(input1),
+				collectRecordData(input2));
+			
+			final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
+			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+	
+			// reset the generators
+			input1 = new UniformIntPairGenerator(500, 40, false);
+			generator2.reset();
+			input2.reset();
+	
+			// compare with iterator values
+			ReusingBuildFirstHashMatchIterator<IntPair, Record, Record> iterator =
+					new ReusingBuildFirstHashMatchIterator<IntPair, Record, Record>(
+						input1, input2, this.pairSerializer, this.pairComparator, 
+						this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
+						this.memoryManager, this.ioManager, this.parentTask, 1.0);
+			
+			iterator.open();
+			
+			while (iterator.callWithNextKey(matcher, collector));
+			
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//                                    Utilities
+	// --------------------------------------------------------------------------------------------
+
+	
+	
+	static Map<TestData.Key, Collection<RecordMatch>> matchRecordValues(
+			Map<TestData.Key, Collection<TestData.Value>> leftMap,
+			Map<TestData.Key, Collection<TestData.Value>> rightMap)
+	{
+		Map<TestData.Key, Collection<RecordMatch>> map = new HashMap<TestData.Key, Collection<RecordMatch>>();
+
+		for (TestData.Key key : leftMap.keySet()) {
+			Collection<TestData.Value> leftValues = leftMap.get(key);
+			Collection<TestData.Value> rightValues = rightMap.get(key);
+
+			if (rightValues == null) {
+				continue;
+			}
+
+			if (!map.containsKey(key)) {
+				map.put(key, new ArrayList<RecordMatch>());
+			}
+
+			Collection<RecordMatch> matchedValues = map.get(key);
+
+			for (TestData.Value leftValue : leftValues) {
+				for (TestData.Value rightValue : rightValues) {
+					matchedValues.add(new RecordMatch(leftValue, rightValue));
+				}
+			}
+		}
+
+		return map;
+	}
+	
+	static Map<TestData.Key, Collection<RecordIntPairMatch>> matchRecordIntPairValues(
+		Map<Integer, Collection<Integer>> leftMap,
+		Map<TestData.Key, Collection<TestData.Value>> rightMap)
+	{
+		final Map<TestData.Key, Collection<RecordIntPairMatch>> map = new HashMap<TestData.Key, Collection<RecordIntPairMatch>>();
+	
+		for (Integer i : leftMap.keySet()) {
+			
+			final TestData.Key key = new TestData.Key(i.intValue());
+			
+			final Collection<Integer> leftValues = leftMap.get(i);
+			final Collection<TestData.Value> rightValues = rightMap.get(key);
+	
+			if (rightValues == null) {
+				continue;
+			}
+	
+			if (!map.containsKey(key)) {
+				map.put(key, new ArrayList<RecordIntPairMatch>());
+			}
+	
+			final Collection<RecordIntPairMatch> matchedValues = map.get(key);
+	
+			for (Integer v : leftValues) {
+				for (TestData.Value val : rightValues) {
+					matchedValues.add(new RecordIntPairMatch(v, val));
+				}
+			}
+		}
+	
+		return map;
+	}
+
+	
+	static Map<TestData.Key, Collection<TestData.Value>> collectRecordData(MutableObjectIterator<Record> iter)
+	throws Exception
+	{
+		Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
+		Record pair = new Record();
+		
+		while ((pair = iter.next(pair)) != null) {
+
+			TestData.Key key = pair.getField(0, TestData.Key.class);
+			if (!map.containsKey(key)) {
+				map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
+			}
+
+			Collection<TestData.Value> values = map.get(key);
+			values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
+		}
+
+		return map;
+	}
+	
+	static Map<Integer, Collection<Integer>> collectIntPairData(MutableObjectIterator<IntPair> iter)
+	throws Exception
+	{
+		Map<Integer, Collection<Integer>> map = new HashMap<Integer, Collection<Integer>>();
+		IntPair pair = new IntPair();
+		
+		while ((pair = iter.next(pair)) != null) {
+
+			final int key = pair.getKey();
+			final int value = pair.getValue();
+			if (!map.containsKey(key)) {
+				map.put(key, new ArrayList<Integer>());
+			}
+
+			Collection<Integer> values = map.get(key);
+			values.add(value);
+		}
+
+		return map;
+	}
+
+	/**
+	 * Private class used for storage of the expected matches in a hash-map.
+	 */
+	static class RecordMatch {
+		
+		private final Value left;
+		private final Value right;
+
+		public RecordMatch(Value left, Value right) {
+			this.left = left;
+			this.right = right;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			RecordMatch o = (RecordMatch) obj;
+			return this.left.equals(o.left) && this.right.equals(o.right);
+		}
+		
+		@Override
+		public int hashCode() {
+			return this.left.hashCode() ^ this.right.hashCode();
+		}
+
+		@Override
+		public String toString() {
+			return left + ", " + right;
+		}
+	}
+	
+	/**
+	 * Private class used for storage of the expected matches in a hash-map.
+	 */
+	static class RecordIntPairMatch
+	{
+		private final int left;
+		private final Value right;
+
+		public RecordIntPairMatch(int left, Value right) {
+			this.left = left;
+			this.right = right;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			RecordIntPairMatch o = (RecordIntPairMatch) obj;
+			return this.left == o.left && this.right.equals(o.right);
+		}
+		
+		@Override
+		public int hashCode() {
+			return this.left ^ this.right.hashCode();
+		}
+
+		@Override
+		public String toString() {
+			return left + ", " + right;
+		}
+	}
+	
+	static final class RecordMatchRemovingJoin extends JoinFunction
+	{
+		private final Map<TestData.Key, Collection<RecordMatch>> toRemoveFrom;
+		
+		protected RecordMatchRemovingJoin(Map<TestData.Key, Collection<RecordMatch>> map) {
+			this.toRemoveFrom = map;
+		}
+		
+		@Override
+		public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception
+		{
+			TestData.Key key = rec1.getField(0, TestData.Key.class);
+			TestData.Value value1 = rec1.getField(1, TestData.Value.class);
+			TestData.Value value2 = rec2.getField(1, TestData.Value.class);
+			//System.err.println("rec1 key = "+key+"  rec2 key= "+rec2.getField(0, TestData.Key.class));
+			Collection<RecordMatch> matches = this.toRemoveFrom.get(key);
+			if (matches == null) {
+				Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
+			}
+			
+			Assert.assertTrue("Produced match was not contained: " + key + " - " + value1 + ":" + value2,
+				matches.remove(new RecordMatch(value1, value2)));
+			
+			if (matches.isEmpty()) {
+				this.toRemoveFrom.remove(key);
+			}
+		}
+	}
+	
+	static final class RecordIntPairMatchRemovingMatcher extends AbstractRichFunction implements FlatJoinFunction<IntPair, Record, Record>
+	{
+		private final Map<TestData.Key, Collection<RecordIntPairMatch>> toRemoveFrom;
+		
+		protected RecordIntPairMatchRemovingMatcher(Map<TestData.Key, Collection<RecordIntPairMatch>> map) {
+			this.toRemoveFrom = map;
+		}
+		
+		@Override
+		public void join(IntPair rec1, Record rec2, Collector<Record> out) throws Exception
+		{
+			final int k = rec1.getKey();
+			final int v = rec1.getValue(); 
+			
+			final TestData.Key key = rec2.getField(0, TestData.Key.class);
+			final TestData.Value value = rec2.getField(1, TestData.Value.class);
+			
+			Assert.assertTrue("Key does not match for matching IntPair Record combination.", k == key.getKey()); 
+			
+			Collection<RecordIntPairMatch> matches = this.toRemoveFrom.get(key);
+			if (matches == null) {
+				Assert.fail("Match " + key + " - " + v + ":" + value + " is unexpected.");
+			}
+			
+			Assert.assertTrue("Produced match was not contained: " + key + " - " + v + ":" + value,
+				matches.remove(new RecordIntPairMatch(v, value)));
+			
+			if (matches.isEmpty()) {
+				this.toRemoveFrom.remove(key);
+			}
+		}
+	}
+	
+	static final class IntPairRecordPairComparator extends TypePairComparator<IntPair, Record>
+	{
+		private int reference;
+		
+		@Override
+		public void setReference(IntPair reference) {
+			this.reference = reference.getKey();	
+		}
+
+		@Override
+		public boolean equalToReference(Record candidate) {
+			try {
+				final IntValue i = candidate.getField(0, IntValue.class);
+				return i.getValue() == this.reference;
+			} catch (NullPointerException npex) {
+				throw new NullKeyFieldException();
+			}
+		}
+
+		@Override
+		public int compareToReference(Record candidate) {
+			try {
+				final IntValue i = candidate.getField(0, IntValue.class);
+				return i.getValue() - this.reference;
+			} catch (NullPointerException npex) {
+				throw new NullKeyFieldException();
+			}
+		}
+	}
+	
+	static final class RecordIntPairPairComparator extends TypePairComparator<Record, IntPair>
+	{
+		private int reference;
+		
+		@Override
+		public void setReference(Record reference) {
+			this.reference = reference.getField(0, IntValue.class).getValue();
+		}
+
+		@Override
+		public boolean equalToReference(IntPair candidate) {
+			return this.reference == candidate.getKey();
+		}
+
+		@Override
+		public int compareToReference(IntPair candidate) {
+			return candidate.getKey() - this.reference;
+		}
+	}
+}


Mime
View raw message