drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ben-Zvi <...@git.apache.org>
Subject [GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate
Date Mon, 22 May 2017 17:26:52 GMT
Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r117801284
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
---
    @@ -544,86 +572,69 @@ private static int roundUpToPowerOf2(int number) {
         return rounded;
       }
     
    -  @Override
    -  public void put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount) {
    -    put(incomingRowIdx, htIdxHolder);
    +  public int getHashCode(int incomingRowIdx) throws SchemaChangeException {
    +    return getHashBuild(incomingRowIdx);
       }
     
    -  private PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder) {
    +  /** put() uses the hash code (from gethashCode() above) to insert the key(s) from the
incoming
    +   * row into the hash table. The code selects the bucket in the startIndices, then the
keys are
    +   * placed into the chained list - by storing the key values into a batch, and updating
its
    +   * "links" member. Last it modifies the index holder to the batch offset so that the
caller
    +   * can store the remaining parts of the row into a matching batch (outside the hash
table).
    +   * Returning
    +   *
    +   * @param incomingRowIdx - position of the incoming row
    +   * @param htIdxHolder - to return batch + batch-offset (for caller to manage a matching
batch)
    +   * @param hashCode - computed over the key(s) by calling getHashCode()
    +   * @return Status - the key(s) was ADDED or was already PRESENT
    +   */
    +  @Override
    +  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws
SchemaChangeException {
     
    -    int hash = getHashBuild(incomingRowIdx);
    -    int i = getBucketIndex(hash, numBuckets());
    -    int startIdx = startIndices.getAccessor().get(i);
    +    int bucketIndex = getBucketIndex(hashCode, numBuckets());
    +    int startIdx = startIndices.getAccessor().get(bucketIndex);
         int currentIdx;
    -    int currentIdxWithinBatch;
    -    BatchHolder bh;
         BatchHolder lastEntryBatch = null;
         int lastEntryIdxWithinBatch = EMPTY_SLOT;
     
    +    // if startIdx is non-empty, follow the hash chain links until we find a matching
    +    // key or reach the end of the chain (and remember the last link there)
    +    for ( currentIdxHolder.value = startIdx;
    +          currentIdxHolder.value != EMPTY_SLOT;
    +          /* isKeyMatch() below also advances the currentIdxHolder to the next link */)
{
     
    -    if (startIdx == EMPTY_SLOT) {
    -      // this is the first entry in this bucket; find the first available slot in the
    -      // container of keys and values
    -      currentIdx = freeIndex++;
    -      addBatchIfNeeded(currentIdx);
    +      // remember the current link, which would be the last when the next link is empty
    +      lastEntryBatch = batchHolders.get((currentIdxHolder.value >>> 16) &
HashTable.BATCH_MASK);
    +      lastEntryIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
     
    -      if (EXTRA_DEBUG) {
    -        logger.debug("Empty bucket index = {}. incomingRowIdx = {}; inserting new entry
at currentIdx = {}.", i,
    -            incomingRowIdx, currentIdx);
    +      if (lastEntryBatch.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) {
    +        htIdxHolder.value = currentIdxHolder.value;
    +        return PutStatus.KEY_PRESENT;
           }
    -
    -      insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch);
    -      // update the start index array
    -      startIndices.getMutator().setSafe(getBucketIndex(hash, numBuckets()), currentIdx);
    -      htIdxHolder.value = currentIdx;
    -      return PutStatus.KEY_ADDED;
         }
     
    -    currentIdx = startIdx;
    -    boolean found = false;
    -
    -    bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
    -    currentIdxHolder.value = currentIdx;
    -
    -    // if startIdx is non-empty, follow the hash chain links until we find a matching
    -    // key or reach the end of the chain
    -    while (true) {
    -      currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
    +    // no match was found, so insert a new entry
    +    currentIdx = freeIndex++;
    +    boolean addedBatch = addBatchIfNeeded(currentIdx);
     
    -      if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) {
    -        htIdxHolder.value = currentIdxHolder.value;
    -        found = true;
    -        break;
    -      } else if (currentIdxHolder.value == EMPTY_SLOT) {
    -        lastEntryBatch = bh;
    -        lastEntryIdxWithinBatch = currentIdxWithinBatch;
    -        break;
    -      } else {
    -        bh = batchHolders.get((currentIdxHolder.value >>> 16) & HashTable.BATCH_MASK);
    -        lastEntryBatch = bh;
    -      }
    +    if (EXTRA_DEBUG) {
    +      logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at
currentIdx = {}.", incomingRowIdx, currentIdx);
         }
     
    -    if (!found) {
    -      // no match was found, so insert a new entry
    -      currentIdx = freeIndex++;
    -      addBatchIfNeeded(currentIdx);
    +    insertEntry(incomingRowIdx, currentIdx, hashCode, lastEntryBatch, lastEntryIdxWithinBatch);
     
    -      if (EXTRA_DEBUG) {
    -        logger.debug("No match was found for incomingRowIdx = {}; inserting new entry
at currentIdx = {}.", incomingRowIdx, currentIdx);
    -      }
    -
    -      insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch);
    -      htIdxHolder.value = currentIdx;
    -      return PutStatus.KEY_ADDED;
    +    // if there was no hash chain at this bucket, need to update the start index array
    +    if (startIdx == EMPTY_SLOT) {
    +      startIndices.getMutator().setSafe(getBucketIndex(hashCode, numBuckets()), currentIdx);
         }
    -
    -    return found ? PutStatus.KEY_PRESENT : PutStatus.KEY_ADDED;
    +    htIdxHolder.value = currentIdx;
    +    return  addedBatch ? PutStatus.NEW_BATCH_ADDED :
    +        ( freeIndex + 1 > batchHolders.size() * BATCH_SIZE ) ?
    --- End diff --
    
    The former: Last key in a batch. This information is needed at the Hash Aggr to initiate
a check for memory pressure (could calculate this information, but looks cleaner/simpler to
get that as a special status from the hash table). Hopefully we'd never have to deal with
batches of size 1 (row), as it would conflict - both "last row" and "new batch" at the same
insertion. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message