drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
Date Mon, 22 May 2017 22:59:04 GMT

    [ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16020352#comment-16020352
] 

ASF GitHub Bot commented on DRILL-5457:
---------------------------------------

Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r117863850
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
---
    @@ -546,44 +1254,204 @@ private void checkGroupAndAggrValues(int incomingRowIdx) {
          holder.value = vv0.getAccessor().get(incomingRowIdx) ;
          }
          */
    +    /*
    +    if ( handlingSpills && ( incomingRowIdx == 0 ) ) {
    +      // for debugging -- show the first row from a spilled batch
    +      Object tmp0 = (incoming).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector();
    +      Object tmp1 = (incoming).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector();
    +      Object tmp2 = (incoming).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector();
    +
    +      if (tmp0 != null && tmp1 != null && tmp2 != null) {
    +        NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0);
    +        NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1);
    +        NullableBigIntVector  vv2 = ((NullableBigIntVector) tmp2);
    +        logger.debug("The first row = {} , {} , {}", vv0.getAccessor().get(incomingRowIdx),
vv1.getAccessor().get(incomingRowIdx), vv2.getAccessor().get(incomingRowIdx));
    +      }
    +    }
    +    */
    +    // The hash code is computed once, then its lower bits are used to determine the
    +    // partition to use, and the higher bits determine the location in the hash table.
    +    int hashCode;
    +    try {
    +      htables[0].updateBatches();
    +      hashCode = htables[0].getHashCode(incomingRowIdx);
    +    } catch (SchemaChangeException e) {
    +      throw new IllegalStateException("Unexpected schema change", e);
    +    }
     
    -    htable.put(incomingRowIdx, htIdxHolder, 1 /* retry count */);
    +    // right shift hash code for secondary (or tertiary...) spilling
    +    for (int i = 0; i < cycleNum; i++) { hashCode >>>= bitsInMask; }
     
    +    int currentPartition = hashCode & partitionMask ;
    +    hashCode >>>= bitsInMask;
    +    HashTable.PutStatus putStatus = null;
    +    long allocatedBefore = allocator.getAllocatedMemory();
    +
    +    // Insert the key columns into the hash table
    +    try {
    +      putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode);
    +    } catch (OutOfMemoryException exc) {
    +      throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not
spill
    +    } catch (SchemaChangeException e) {
    +      throw new IllegalStateException("Unexpected schema change", e);
    +    }
         int currentIdx = htIdxHolder.value;
     
    -    // get the batch index and index within the batch
    -    if (currentIdx >= batchHolders.size() * HashTable.BATCH_SIZE) {
    -      addBatchHolder();
    +    long addedMem = allocator.getAllocatedMemory() - allocatedBefore;
    +    if ( addedMem > 0 ) {
    +      logger.trace("MEMORY CHECK HT: allocated {}  added {} partition {}",allocatedBefore,addedMem,currentPartition);
         }
    -    BatchHolder bh = batchHolders.get((currentIdx >>> 16) & HashTable.BATCH_MASK);
    +
    +    // Check if put() added a new batch (for the keys) inside the hash table, hence a
matching batch
    +    // (for the aggregate columns) needs to be created
    +    if ( putStatus == HashTable.PutStatus.NEW_BATCH_ADDED ) {
    +      try {
    +        long allocatedBeforeAggCol = allocator.getAllocatedMemory();
    +
    +        addBatchHolder(currentPartition);
    +
    +        if ( plannedBatches > 0 ) { plannedBatches--; } // just allocated a planned
batch
    +        long totalAddedMem = allocator.getAllocatedMemory() - allocatedBefore;
    --- End diff --
    
    totalAddedMem is for both the group-by keys and the aggr columns. 
    allocatedBefore is the initial size of the allocation, then the keys are added to the
hash table, (then allocatedBeforeAggCol keeps the size), then a batch holder for the agg-columns
is added, and the total is computed. 
    The total is important, like if the incoming batch becomes bigger, and we try to adjust
the estimate for the batch size.
    The allocatedBeforeAggCol only gives some tracing refinement to tell which batch grew
(the one in the hash table or the one for the agg columns).
     


> Support Spill to Disk for the Hash Aggregate Operator
> -----------------------------------------------------
>
>                 Key: DRILL-5457
>                 URL: https://issues.apache.org/jira/browse/DRILL-5457
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Relational Operators
>    Affects Versions: 1.10.0
>            Reporter: Boaz Ben-Zvi
>            Assignee: Boaz Ben-Zvi
>             Fix For: 1.11.0
>
>
> Support gradual spilling memory to disk as the available memory gets too small to allow
in memory work for the Hash Aggregate Operator.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message