flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...
Date Thu, 21 Jan 2016 11:46:01 GMT
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50390858
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
---
    @@ -1534,6 +1534,106 @@ public void testBucketsNotFulfillSegment() throws Exception {
     		join.close();
     		this.memManager.release(join.getFreedMemory());
     	}
    +
    +	@Test
    +	public void testHashWithBuildSideOuterJoin1() throws Exception {
    +		final int NUM_KEYS = 20000;
    +		final int BUILD_VALS_PER_KEY = 1;
    +		final int PROBE_VALS_PER_KEY = 1;
    +
    +		// create a build input that gives 40000 pairs with 1 values sharing the same key
    +		MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(2 * NUM_KEYS,
BUILD_VALS_PER_KEY, false);
    +
    +		// create a probe input that gives 20000 pairs with 1 values sharing a key
    +		MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS,
PROBE_VALS_PER_KEY, true);
    +
    +		// allocate the memory for the HashTable
    +		List<MemorySegment> memSegments;
    +		try {
    +			// 33 is minimum number of pages required to perform hash join this inputs
    +			memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
    +		}
    +		catch (MemoryAllocationException maex) {
    +			fail("Memory for the Join could not be provided.");
    +			return;
    +		}
    +
    +		// ----------------------------------------------------------------------------------------
    +
    +		final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair,
IntPair>(
    +			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
    +			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
    +			memSegments, ioManager);
    +		join.open(buildInput, probeInput, true);
    +
    +		final IntPair recordReuse = new IntPair();
    +		int numRecordsInJoinResult = 0;
    +
    +		while (join.nextRecord()) {
    +			MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
    +			while (buildSide.next(recordReuse) != null) {
    +				numRecordsInJoinResult++;
    +			}
    +		}
    +		Assert.assertEquals("Wrong number of records in join result.", 2 * NUM_KEYS * BUILD_VALS_PER_KEY
* PROBE_VALS_PER_KEY, numRecordsInJoinResult);
    +
    +		join.close();
    +		this.memManager.release(join.getFreedMemory());
    +	}
    +	
    +	@Test
    +	public void testHashWithBuildSideOuterJoin2() throws Exception {
    +		final int NUM_KEYS = 40000;
    +		final int BUILD_VALS_PER_KEY = 2;
    +		final int PROBE_VALS_PER_KEY = 1;
    +		
    +		// The keys of probe and build sides are overlapped, so there would be none unmatched
build elements
    +		// after probe phase.
    +		
    +		// create a build input that gives 40000 pairs with 2 values sharing the same key
    +		MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(NUM_KEYS,
BUILD_VALS_PER_KEY, false);
    +		
    +		// create a probe input that gives 20000 pairs with 1 values sharing a key
    +		MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS,
PROBE_VALS_PER_KEY, true);
    +		
    +		// allocate the memory for the HashTable
    +		List<MemorySegment> memSegments;
    +		try {
    +			// 33 is minimum number of pages required to perform hash join this inputs
    +			memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
    +		}
    +		catch (MemoryAllocationException maex) {
    +			fail("Memory for the Join could not be provided.");
    +			return;
    +		}
    +		
    +		// ----------------------------------------------------------------------------------------
    +		
    +		final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair,
IntPair>(
    +			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
    +			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
    +			memSegments, ioManager);
    +		join.open(buildInput, probeInput, true);
    +		
    +		final IntPair recordReuse = new IntPair();
    +		int numRecordsInJoinResult = 0;
    +		
    +		while (join.nextRecord()) {
    +			MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
    +			IntPair next = buildSide.next(recordReuse);
    --- End diff --
    
    OK, maybe add a brief comment to make clear which behavior is tested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message