drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amansinha100 <...@git.apache.org>
Subject [GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate
Date Wed, 17 May 2017 18:42:22 GMT
Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r117059683
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
---
    @@ -546,44 +1254,204 @@ private void checkGroupAndAggrValues(int incomingRowIdx) {
          holder.value = vv0.getAccessor().get(incomingRowIdx) ;
          }
          */
    +    /*
    +    if ( handlingSpills && ( incomingRowIdx == 0 ) ) {
    +      // for debugging -- show the first row from a spilled batch
    +      Object tmp0 = (incoming).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector();
    +      Object tmp1 = (incoming).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector();
    +      Object tmp2 = (incoming).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector();
    +
    +      if (tmp0 != null && tmp1 != null && tmp2 != null) {
    +        NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0);
    +        NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1);
    +        NullableBigIntVector  vv2 = ((NullableBigIntVector) tmp2);
    +        logger.debug("The first row = {} , {} , {}", vv0.getAccessor().get(incomingRowIdx),
vv1.getAccessor().get(incomingRowIdx), vv2.getAccessor().get(incomingRowIdx));
    +      }
    +    }
    +    */
    +    // The hash code is computed once, then its lower bits are used to determine the
    +    // partition to use, and the higher bits determine the location in the hash table.
    +    int hashCode;
    +    try {
    +      htables[0].updateBatches();
    +      hashCode = htables[0].getHashCode(incomingRowIdx);
    +    } catch (SchemaChangeException e) {
    +      throw new IllegalStateException("Unexpected schema change", e);
    +    }
     
    -    htable.put(incomingRowIdx, htIdxHolder, 1 /* retry count */);
    +    // right shift hash code for secondary (or tertiary...) spilling
    +    for (int i = 0; i < cycleNum; i++) { hashCode >>>= bitsInMask; }
     
    +    int currentPartition = hashCode & partitionMask ;
    +    hashCode >>>= bitsInMask;
    +    HashTable.PutStatus putStatus = null;
    +    long allocatedBefore = allocator.getAllocatedMemory();
    +
    +    // Insert the key columns into the hash table
    +    try {
    +      putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode);
    +    } catch (OutOfMemoryException exc) {
    +      throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not
spill
    +    } catch (SchemaChangeException e) {
    +      throw new IllegalStateException("Unexpected schema change", e);
    +    }
         int currentIdx = htIdxHolder.value;
     
    -    // get the batch index and index within the batch
    -    if (currentIdx >= batchHolders.size() * HashTable.BATCH_SIZE) {
    -      addBatchHolder();
    +    long addedMem = allocator.getAllocatedMemory() - allocatedBefore;
    +    if ( addedMem > 0 ) {
    +      logger.trace("MEMORY CHECK HT: allocated {}  added {} partition {}",allocatedBefore,addedMem,currentPartition);
         }
    -    BatchHolder bh = batchHolders.get((currentIdx >>> 16) & HashTable.BATCH_MASK);
    +
    +    // Check if put() added a new batch (for the keys) inside the hash table, hence a
matching batch
    +    // (for the aggregate columns) needs to be created
    +    if ( putStatus == HashTable.PutStatus.NEW_BATCH_ADDED ) {
    +      try {
    +        long allocatedBeforeAggCol = allocator.getAllocatedMemory();
    +
    +        addBatchHolder(currentPartition);
    +
    +        if ( plannedBatches > 0 ) { plannedBatches--; } // just allocated a planned
batch
    +        long totalAddedMem = allocator.getAllocatedMemory() - allocatedBefore;
    --- End diff --
    
    I presume totalAddedMem is the memory added for the group-by keys.  Would it be better
to rename it to something like totalAddedMemGroupBy and allocatedBefore to allocatedBeforeGroupBy
such that it is explicit ?   Also, why not use allocatedBeforeAggCol  here  instead of calling
allocator.getAllocatedMemory() again ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message