drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
Date Mon, 22 May 2017 23:09:04 GMT

    [ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16020372#comment-16020372
] 

ASF GitHub Bot commented on DRILL-5457:
---------------------------------------

Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r117865368
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
---
    @@ -546,44 +1254,204 @@ private void checkGroupAndAggrValues(int incomingRowIdx) {
          holder.value = vv0.getAccessor().get(incomingRowIdx) ;
          }
          */
    +    /*
    +    if ( handlingSpills && ( incomingRowIdx == 0 ) ) {
    +      // for debugging -- show the first row from a spilled batch
    +      Object tmp0 = (incoming).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector();
    +      Object tmp1 = (incoming).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector();
    +      Object tmp2 = (incoming).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector();
    +
    +      if (tmp0 != null && tmp1 != null && tmp2 != null) {
    +        NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0);
    +        NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1);
    +        NullableBigIntVector  vv2 = ((NullableBigIntVector) tmp2);
    +        logger.debug("The first row = {} , {} , {}", vv0.getAccessor().get(incomingRowIdx),
vv1.getAccessor().get(incomingRowIdx), vv2.getAccessor().get(incomingRowIdx));
    +      }
    +    }
    +    */
    +    // The hash code is computed once, then its lower bits are used to determine the
    +    // partition to use, and the higher bits determine the location in the hash table.
    +    int hashCode;
    +    try {
    +      htables[0].updateBatches();
    +      hashCode = htables[0].getHashCode(incomingRowIdx);
    +    } catch (SchemaChangeException e) {
    +      throw new IllegalStateException("Unexpected schema change", e);
    +    }
     
    -    htable.put(incomingRowIdx, htIdxHolder, 1 /* retry count */);
    +    // right shift hash code for secondary (or tertiary...) spilling
    +    for (int i = 0; i < cycleNum; i++) { hashCode >>>= bitsInMask; }
     
    +    int currentPartition = hashCode & partitionMask ;
    +    hashCode >>>= bitsInMask;
    +    HashTable.PutStatus putStatus = null;
    +    long allocatedBefore = allocator.getAllocatedMemory();
    +
    +    // Insert the key columns into the hash table
    +    try {
    +      putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode);
    +    } catch (OutOfMemoryException exc) {
    +      throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not
spill
    +    } catch (SchemaChangeException e) {
    +      throw new IllegalStateException("Unexpected schema change", e);
    +    }
         int currentIdx = htIdxHolder.value;
     
    -    // get the batch index and index within the batch
    -    if (currentIdx >= batchHolders.size() * HashTable.BATCH_SIZE) {
    -      addBatchHolder();
    +    long addedMem = allocator.getAllocatedMemory() - allocatedBefore;
    +    if ( addedMem > 0 ) {
    +      logger.trace("MEMORY CHECK HT: allocated {}  added {} partition {}",allocatedBefore,addedMem,currentPartition);
         }
    -    BatchHolder bh = batchHolders.get((currentIdx >>> 16) & HashTable.BATCH_MASK);
    +
    +    // Check if put() added a new batch (for the keys) inside the hash table, hence a
matching batch
    +    // (for the aggregate columns) needs to be created
    +    if ( putStatus == HashTable.PutStatus.NEW_BATCH_ADDED ) {
    +      try {
    +        long allocatedBeforeAggCol = allocator.getAllocatedMemory();
    +
    +        addBatchHolder(currentPartition);
    +
    +        if ( plannedBatches > 0 ) { plannedBatches--; } // just allocated a planned
batch
    +        long totalAddedMem = allocator.getAllocatedMemory() - allocatedBefore;
    +        logger.trace("MEMORY CHECK AGG: added {}  total (with HT) added {}",allocator.getAllocatedMemory()-allocatedBeforeAggCol,totalAddedMem);
    +        // resize the batch estimate if needed (e.g., varchars may take more memory than
estimated)
    +        if ( totalAddedMem > estMaxBatchSize ) {
    +          logger.trace("Adjusting Batch size estimate from {} to {}",estMaxBatchSize,totalAddedMem);
    +          estMaxBatchSize = totalAddedMem;
    +        }
    +      } catch (OutOfMemoryException exc) {
    --- End diff --
    
    addBatchHolder() calls newBatchHolder(), which should allocate the new batch using the
allocator, hence (I think) may OOM if not enough memory left.



> Support Spill to Disk for the Hash Aggregate Operator
> -----------------------------------------------------
>
>                 Key: DRILL-5457
>                 URL: https://issues.apache.org/jira/browse/DRILL-5457
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Relational Operators
>    Affects Versions: 1.10.0
>            Reporter: Boaz Ben-Zvi
>            Assignee: Boaz Ben-Zvi
>             Fix For: 1.11.0
>
>
> Support gradual spilling memory to disk as the available memory gets too small to allow
in memory work for the Hash Aggregate Operator.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message