flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fabian Hueske (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3385) Fix outer join skipping unprobed partitions
Date Wed, 10 Feb 2016 19:59:18 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15141551#comment-15141551
] 

Fabian Hueske commented on FLINK-3385:
--------------------------------------

Thanks [~greghogan]! This is obviously a serious issue with the build-side outer join.
The {{HashTableITCase}} has a few tests that enforce spilling to disk by providing only little
memory to the hash table. It should be easy to port your code in to that test class.

This bug should be fixed for the upcoming 1.0.0 release. 
Do you want to do it?

> Fix outer join skipping unprobed partitions
> -------------------------------------------
>
>                 Key: FLINK-3385
>                 URL: https://issues.apache.org/jira/browse/FLINK-3385
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Runtime
>            Reporter: Greg Hogan
>            Priority: Blocker
>             Fix For: 1.0.0
>
>
> {{MutableHashTable.nextRecord}} performs three steps for a build-side outer join:
> {code}
> 	public boolean nextRecord() throws IOException {
> 		if (buildSideOuterJoin) {
> 			return processProbeIter() || processUnmatchedBuildIter() || prepareNextPartition();
> 		} else {
> 			return processProbeIter() || prepareNextPartition();
> 		}
> 	}
> {code}
> {{MutableHashTable.processUnmatchedBuildIter}} eventually calls through to {{MutableHashTable.moveToNextBucket}}
which is unable to process spilled partitions:
> {code}
> 			if (p.isInMemory()) {
> 				...
> 			} else {
> 				return false;
> 			}
> {code}
> {{MutableHashTable.prepareNextPartition}} calls {{HashPartition.finalizeProbePhase}}
which only spills the partition (to be read and processed in the next instantiation of {{MutableHashTable}})
if probe-side records were spilled. In an equi-join this is fine but with an outer join the
unmatched build-side records must still be retained (though no further probing is necessary,
so could this be short-circuited when loaded by the next {{MutableHashTable}}?).
> {code}
> 		if (isInMemory()) {
> 			...
> 		}
> 		else if (this.probeSideRecordCounter == 0) {
> 			// partition is empty, no spilled buffers
> 			// return the memory buffer
> 			freeMemory.add(this.probeSideBuffer.getCurrentSegment());
> 			// delete the spill files
> 			this.probeSideChannel.close();
> 			this.buildSideChannel.deleteChannel();
> 			this.probeSideChannel.deleteChannel();
> 			return 0;
> 		}
> 		else {
> 			// flush the last probe side buffer and register this partition as pending
> 			this.probeSideBuffer.close();
> 			this.probeSideChannel.close();
> 			spilledPartitions.add(this);
> 			return 1;
> 		}
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message