drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amansinha100 <...@git.apache.org>
Subject [GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate
Date Wed, 17 May 2017 18:42:22 GMT
Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r117078208
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
---
    @@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
     
       @Override
       public int getOutputCount() {
    -    // return outputCount;
         return lastBatchOutputCount;
       }
     
       @Override
       public void cleanup() {
    -    if (htable != null) {
    -      htable.clear();
    -      htable = null;
    -    }
    +      if ( schema == null ) { return; } // not set up; nothing to clean
    +      for ( int i = 0; i < numPartitions; i++) {
    +          if (htables[i] != null) {
    +              htables[i].clear();
    +              htables[i] = null;
    +          }
    +          if ( batchHolders[i] != null) {
    +              for (BatchHolder bh : batchHolders[i]) {
    +                    bh.clear();
    +              }
    +              batchHolders[i].clear();
    +              batchHolders[i] = null;
    +          }
    +
    +          // delete any (still active) output spill file
    +          if ( outputStream[i] != null && spillFiles[i] != null) {
    +            try {
    +              spillSet.delete(spillFiles[i]);
    +            } catch(IOException e) {
    +              logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]);
    +            }
    +          }
    +      }
    +      // delete any spill file left in unread spilled partitions
    +      while ( ! spilledPartitionsList.isEmpty() ) {
    +        SpilledPartition sp = spilledPartitionsList.remove(0);
    +        try {
    +          spillSet.delete(sp.spillFile);
    +        } catch(IOException e) {
    +          logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile);
    +        }
    +      }
    +      spillSet.close(); // delete the spill directory(ies)
         htIdxHolder = null;
         materializedValueFields = null;
         outStartIdxHolder = null;
         outNumRecordsHolder = null;
    +  }
     
    -    if (batchHolders != null) {
    -      for (BatchHolder bh : batchHolders) {
    +  // First free the memory used by the given (spilled) partition (i.e., hash table plus
batches)
    +  // then reallocate them in pristine state to allow the partition to continue receiving
rows
    +  private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException,
IOException {
    +    assert htables[part] != null;
    +    htables[part].reset();
    +    if ( batchHolders[part] != null) {
    +      for (BatchHolder bh : batchHolders[part]) {
             bh.clear();
           }
    -      batchHolders.clear();
    -      batchHolders = null;
    +      batchHolders[part].clear();
         }
    +    batchHolders[part] = new ArrayList<BatchHolder>(); // First BatchHolder is
created when the first put request is received.
       }
     
    -//  private final AggOutcome setOkAndReturn() {
    -//    this.outcome = IterOutcome.OK;
    -//    for (VectorWrapper<?> v : outgoing) {
    -//      v.getValueVector().getMutator().setValueCount(outputCount);
    -//    }
    -//    return AggOutcome.RETURN_OUTCOME;
    -//  }
     
       private final void incIndex() {
         underlyingIndex++;
         if (underlyingIndex >= incoming.getRecordCount()) {
           currentIndex = Integer.MAX_VALUE;
           return;
         }
    -    currentIndex = getVectorIndex(underlyingIndex);
    +    try { currentIndex = getVectorIndex(underlyingIndex); }
    +    catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);}
       }
     
       private final void resetIndex() {
         underlyingIndex = -1;
         incIndex();
       }
     
    -  private void addBatchHolder() {
    +  private boolean isSpilled(int part) {
    +    return outputStream[part] != null;
    +  }
    +  /**
    +   * Which partition to choose for flushing out (i.e. spill or return) ?
    +   * - The current partition (to which a new bach holder is added) has a priority,
    +   *   because its last batch holder is full.
    +   * - Also the largest prior spilled partition has some priority, as it is already spilled;
    +   *   but spilling too few rows (e.g. a single batch) gets us nothing.
    +   * - So the largest non-spilled partition has some priority, to get more memory freed.
    +   * Need to weigh the above three options.
    +   *
    +   *  @param currPart - The partition that hit the memory limit (gets a priority)
    +   *  @return The partition (number) chosen to be spilled
    +   */
    +  private int chooseAPartitionToFlush(int currPart) {
    +    if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the current partition
    +    int currPartSize = batchHolders[currPart].size();
    +    if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if size is
1
    +    // first find the largest spilled partition
    +    int maxSizeSpilled = -1;
    +    int indexMaxSpilled = -1;
    +    for (int isp = 0; isp < numPartitions; isp++ ) {
    +      if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) {
    +        maxSizeSpilled = batchHolders[isp].size();
    +        indexMaxSpilled = isp;
    +      }
    +    }
    +    // Give the current (if already spilled) some priority
    +    if ( isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) {
    +      maxSizeSpilled = currPartSize ;
    +      indexMaxSpilled = currPart;
    +    }
    +    // now find the largest non-spilled partition
    +    int maxSize = -1;
    +    int indexMax = -1;
    +    // Use the largest spilled (if found) as a base line, with a factor of 4
    +    if ( indexMaxSpilled > -1 && maxSizeSpilled > 1 ) {
    +      indexMax = indexMaxSpilled;
    +      maxSize = 4 * maxSizeSpilled ;
    +    }
    +    for ( int insp = 0; insp < numPartitions; insp++) {
    +      if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) {
    +        indexMax = insp;
    +        maxSize = batchHolders[insp].size();
    +      }
    +    }
    +    // again - priority to the current partition
    +    if ( ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) {
    +      return currPart;
    +    }
    +    if ( maxSize <= 1 ) { // Can not make progress by spilling a single batch!
    +      return -1; // try skipping this spill
    +    }
    +    return indexMax;
    +  }
    +
    +  /**
    +   * Iterate through the batches of the given partition, writing them to a file
    +   *
    +   * @param part The partition (number) to spill
    +   */
    +  private void spillAPartition(int part) {
    +
    +    ArrayList<BatchHolder> currPartition = batchHolders[part];
    +    rowsInPartition = 0;
    +    if ( EXTRA_DEBUG_SPILL ) {
    +      logger.debug("HashAggregate: Spilling partition {} current cycle {} part size {}",
part, cycleNum, currPartition.size());
    +    }
    +
    +    if ( currPartition.size() == 0 ) { return; } // in case empty - nothing to spill
    +
    +    // If this is the first spill for this partition, create an output stream
    +    if ( ! isSpilled(part) ) {
    +
    +      spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? Integer.toString(cycleNum)
: null);
    +
    +      try {
    +        outputStream[part] = spillSet.openForOutput(spillFiles[part]);
    +      } catch (IOException e) {
    +        throw new DrillRuntimeException("Hash Aggregation failed to open spill file:
" + spillFiles[part]);
    +      }
    +    }
    +
    +    for (int currOutBatchIndex = 0; currOutBatchIndex < currPartition.size(); currOutBatchIndex++
) {
    +
    +      // get the number of records in the batch holder that are pending output
    +      int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput();
    +
    +      rowsInPartition += numPendingOutput;  // for logging
    +      rowsSpilled += numPendingOutput;
    +
    +      allocateOutgoing(numPendingOutput);
    --- End diff --
    
    Suppose numPendingOutput is small (e.g 100 rows) but there are several such batches, 
we would end up calling allocateOutgoing() multiple times for small amounts...shouldn't we
try to combine these into smaller number of outgoing batches ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message