flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...
Date Wed, 13 Jan 2016 21:54:57 GMT
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49656801
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
---
    @@ -1558,7 +1641,209 @@ public void reset() {
     		}
     
     	} // end HashBucketIterator
    +	
    +	/**
    +	 * Iterate all the elements in memory which has not been probed during probe phase.
    +	 */
    +	public static class UnmatchedBuildIterator<BT, PT> implements MutableObjectIterator<BT>
{
    +	
    +		private final TypeSerializer<BT> accessor;
    +	
    +		private final long totalBucketNumber;
    +		
    +		private final int bucketsPerSegmentBits;
    +		
    +		private final int bucketsPerSegmentMask;
    +		
    +		private final MemorySegment[] buckets;
    +		
    +		private final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt;
    +		
    +		private final BitSet probedSet;
    +		
    +		private MemorySegment bucket;
    +		
    +		private MemorySegment[] overflowSegments;
    +		
    +		private HashPartition<BT, PT> partition;
    +		
    +		private int scanCount;
    +		
    +		private int bucketInSegmentOffset;
    +		
    +		private int countInSegment;
    +		
    +		private int numInSegment;
    +		
    +		UnmatchedBuildIterator(
    +			TypeSerializer<BT> accessor,
    +			long totalBucketNumber,
    +			int bucketsPerSegmentBits,
    +			int bucketsPerSegmentMask,
    +			MemorySegment[] buckets,
    +			ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
    +			BitSet probedSet) {
    +			
    +			this.accessor = accessor;
    +			this.totalBucketNumber = totalBucketNumber;
    +			this.bucketsPerSegmentBits = bucketsPerSegmentBits;
    +			this.bucketsPerSegmentMask = bucketsPerSegmentMask;
    +			this.buckets = buckets;
    +			this.partitionsBeingBuilt = partitionsBeingBuilt;
    +			this.probedSet = probedSet;
    +			init();
    +		}
    +		
    +		private void init() {
    +			scanCount = -1;
    +			while (!moveToNextBucket()) {
    +				if (scanCount >= totalBucketNumber) {
    +					break;
    +				}
    +			}
    +		}
    +		
    +		public BT next(BT reuse) {
    +			while (true) {
    +				BT result = nextInBucket(reuse);
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +		
    +		public BT next() {
    +			while (true) {
    +				BT result = nextInBucket();
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +	
    +		private boolean moveToNextBucket() {
    +			scanCount++;
    +			if (scanCount > totalBucketNumber - 1) {
    +				return false;
    +			}
    +			final int bucketArrayPos = scanCount >> this.bucketsPerSegmentBits;
    +			final int currentBucketInSegmentOffset = (scanCount & this.bucketsPerSegmentMask)
<< NUM_INTRA_BUCKET_BITS;
    +			MemorySegment currentBucket = this.buckets[bucketArrayPos];
    +			final int partitionNumber = currentBucket.get(currentBucketInSegmentOffset + HEADER_PARTITION_OFFSET);
    +			final HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(partitionNumber);
    +			if (p.isInMemory()) {
    +				set(currentBucket, p.overflowSegments, p, currentBucketInSegmentOffset);
    +				return true;
    +			} else {
    +				return false;
    +			}
    +		}
    +	
    +		private void set(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition<BT,
PT> partition,
    +			int bucketInSegmentOffset) {
    +			this.bucket = bucket;
    +			this.overflowSegments = overflowSegments;
    +			this.partition = partition;
    +			this.bucketInSegmentOffset = bucketInSegmentOffset;
    +			this.countInSegment = bucket.getShort(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
    +			this.numInSegment = 0;
    +		}
    +	
    +		public BT nextInBucket(BT reuse) {
    +			// loop over all segments that are involved in the bucket (original bucket plus overflow
buckets)
    +			while (true) {
    +				probedSet.setMemorySegment(bucket, this.bucketInSegmentOffset + HEADER_BITSET_OFFSET);
     
    +				while (this.numInSegment < this.countInSegment) {
    +					boolean probed = probedSet.get(numInSegment);
    +					if (!probed) {
    +						final long pointer = this.bucket.getLong(this.bucketInSegmentOffset +
    +							BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
    +						// deserialize the key to check whether it is really equal, or whether we had only
a hash collision
    +						try {
    +							this.partition.setReadPosition(pointer);
    +							reuse = this.accessor.deserialize(reuse, this.partition);
    +							this.numInSegment++;
    +							return reuse;
    +						} catch (IOException ioex) {
    +							throw new RuntimeException("Error deserializing key or value from the hashtable:
" +
    +								ioex.getMessage(), ioex);
    +						}
    +					} else {
    +						this.numInSegment++;
    +					}
    +				}
    +
    +				// this segment is done. check if there is another chained bucket
    +				final long forwardPointer = this.bucket.getLong(this.bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
    +				if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
    +					return null;
    +				}
    +
    +				final int overflowSegNum = (int) (forwardPointer >>> 32);
    +				this.bucket = this.overflowSegments[overflowSegNum];
    +				this.bucketInSegmentOffset = (int) forwardPointer;
    +				this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
    +				this.numInSegment = 0;
    +			}
    +		}
    +	
    +		public BT nextInBucket() {
    +			// loop over all segments that are involved in the bucket (original bucket plus overflow
buckets)
    +			while (true) {
    +				probedSet.setMemorySegment(bucket, this.bucketInSegmentOffset + HEADER_BITSET_OFFSET);
    +
    +				while (this.numInSegment < this.countInSegment) {
    +					boolean probed = probedSet.get(numInSegment);
    +					if (!probed) {
    +						final long pointer = this.bucket.getLong(this.bucketInSegmentOffset +
    +							BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
    +						// deserialize the key to check whether it is really equal, or whether we had only
a hash collision
    +						try {
    +							this.partition.setReadPosition(pointer);
    +							BT result = this.accessor.deserialize(this.partition);
    +							this.numInSegment++;
    +							return result;
    +						} catch (IOException ioex) {
    +							throw new RuntimeException("Error deserializing key or value from the hashtable:
" +
    +								ioex.getMessage(), ioex);
    +						}
    +					} else {
    +						this.numInSegment++;
    +					}
    +				}
    +	
    +				// this segment is done. check if there is another chained bucket
    +				final long forwardPointer = this.bucket.getLong(this.bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
    +				if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
    +					return null;
    +				}
    +
    +				final int overflowSegNum = (int) (forwardPointer >>> 32);
    +				this.bucket = this.overflowSegments[overflowSegNum];
    +				this.bucketInSegmentOffset = (int) forwardPointer;
    +				this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
    +				this.numInSegment = 0;
    +			}
    +		}
    +		
    +		public void back() {
    --- End diff --
    
    Can you replace this method by a `hasNext()` method (see previous comment in `processUnmatchedBuildIter()`)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message