drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
Date Sat, 27 May 2017 05:39:07 GMT

    [ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027250#comment-16027250
] 

ASF GitHub Bot commented on DRILL-5457:
---------------------------------------

Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r118814054
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
---
    @@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
     
       @Override
       public int getOutputCount() {
    -    // return outputCount;
         return lastBatchOutputCount;
       }
     
       @Override
       public void cleanup() {
    -    if (htable != null) {
    -      htable.clear();
    -      htable = null;
    -    }
    +      if ( schema == null ) { return; } // not set up; nothing to clean
    +      for ( int i = 0; i < numPartitions; i++) {
    +          if (htables[i] != null) {
    +              htables[i].clear();
    +              htables[i] = null;
    +          }
    +          if ( batchHolders[i] != null) {
    +              for (BatchHolder bh : batchHolders[i]) {
    +                    bh.clear();
    +              }
    +              batchHolders[i].clear();
    +              batchHolders[i] = null;
    +          }
    +
    +          // delete any (still active) output spill file
    +          if ( outputStream[i] != null && spillFiles[i] != null) {
    +            try {
    +              spillSet.delete(spillFiles[i]);
    +            } catch(IOException e) {
    +              logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]);
    +            }
    +          }
    +      }
    +      // delete any spill file left in unread spilled partitions
    +      while ( ! spilledPartitionsList.isEmpty() ) {
    +        SpilledPartition sp = spilledPartitionsList.remove(0);
    +        try {
    +          spillSet.delete(sp.spillFile);
    +        } catch(IOException e) {
    +          logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile);
    +        }
    +      }
    +      spillSet.close(); // delete the spill directory(ies)
         htIdxHolder = null;
         materializedValueFields = null;
         outStartIdxHolder = null;
         outNumRecordsHolder = null;
    +  }
     
    -    if (batchHolders != null) {
    -      for (BatchHolder bh : batchHolders) {
    +  // First free the memory used by the given (spilled) partition (i.e., hash table plus
batches)
    +  // then reallocate them in pristine state to allow the partition to continue receiving
rows
    +  private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException,
IOException {
    +    assert htables[part] != null;
    +    htables[part].reset();
    +    if ( batchHolders[part] != null) {
    +      for (BatchHolder bh : batchHolders[part]) {
             bh.clear();
           }
    -      batchHolders.clear();
    -      batchHolders = null;
    +      batchHolders[part].clear();
         }
    +    batchHolders[part] = new ArrayList<BatchHolder>(); // First BatchHolder is
created when the first put request is received.
       }
     
    -//  private final AggOutcome setOkAndReturn() {
    -//    this.outcome = IterOutcome.OK;
    -//    for (VectorWrapper<?> v : outgoing) {
    -//      v.getValueVector().getMutator().setValueCount(outputCount);
    -//    }
    -//    return AggOutcome.RETURN_OUTCOME;
    -//  }
     
       private final void incIndex() {
         underlyingIndex++;
         if (underlyingIndex >= incoming.getRecordCount()) {
           currentIndex = Integer.MAX_VALUE;
           return;
         }
    -    currentIndex = getVectorIndex(underlyingIndex);
    +    try { currentIndex = getVectorIndex(underlyingIndex); }
    +    catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);}
       }
     
       private final void resetIndex() {
         underlyingIndex = -1;
         incIndex();
       }
     
    -  private void addBatchHolder() {
    +  private boolean isSpilled(int part) {
    +    return outputStream[part] != null;
    +  }
    +  /**
    +   * Which partition to choose for flushing out (i.e. spill or return) ?
    +   * - The current partition (to which a new bach holder is added) has a priority,
    +   *   because its last batch holder is full.
    +   * - Also the largest prior spilled partition has some priority, as it is already spilled;
    +   *   but spilling too few rows (e.g. a single batch) gets us nothing.
    +   * - So the largest non-spilled partition has some priority, to get more memory freed.
    +   * Need to weigh the above three options.
    +   *
    +   *  @param currPart - The partition that hit the memory limit (gets a priority)
    +   *  @return The partition (number) chosen to be spilled
    +   */
    +  private int chooseAPartitionToFlush(int currPart) {
    +    if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the current partition
    +    int currPartSize = batchHolders[currPart].size();
    +    if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if size is
1
    +    // first find the largest spilled partition
    +    int maxSizeSpilled = -1;
    +    int indexMaxSpilled = -1;
    +    for (int isp = 0; isp < numPartitions; isp++ ) {
    +      if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) {
    +        maxSizeSpilled = batchHolders[isp].size();
    +        indexMaxSpilled = isp;
    +      }
    +    }
    +    // Give the current (if already spilled) some priority
    +    if ( isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) {
    +      maxSizeSpilled = currPartSize ;
    +      indexMaxSpilled = currPart;
    +    }
    +    // now find the largest non-spilled partition
    +    int maxSize = -1;
    +    int indexMax = -1;
    +    // Use the largest spilled (if found) as a base line, with a factor of 4
    +    if ( indexMaxSpilled > -1 && maxSizeSpilled > 1 ) {
    +      indexMax = indexMaxSpilled;
    +      maxSize = 4 * maxSizeSpilled ;
    +    }
    +    for ( int insp = 0; insp < numPartitions; insp++) {
    +      if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) {
    +        indexMax = insp;
    +        maxSize = batchHolders[insp].size();
    +      }
    +    }
    +    // again - priority to the current partition
    +    if ( ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) {
    +      return currPart;
    +    }
    +    if ( maxSize <= 1 ) { // Can not make progress by spilling a single batch!
    +      return -1; // try skipping this spill
    +    }
    +    return indexMax;
    +  }
    +
    +  /**
    +   * Iterate through the batches of the given partition, writing them to a file
    +   *
    +   * @param part The partition (number) to spill
    +   */
    +  private void spillAPartition(int part) {
    +
    +    ArrayList<BatchHolder> currPartition = batchHolders[part];
    +    rowsInPartition = 0;
    +    if ( EXTRA_DEBUG_SPILL ) {
    +      logger.debug("HashAggregate: Spilling partition {} current cycle {} part size {}",
part, cycleNum, currPartition.size());
    +    }
    +
    +    if ( currPartition.size() == 0 ) { return; } // in case empty - nothing to spill
    +
    +    // If this is the first spill for this partition, create an output stream
    +    if ( ! isSpilled(part) ) {
    +
    +      spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? Integer.toString(cycleNum)
: null);
    +
    +      try {
    +        outputStream[part] = spillSet.openForOutput(spillFiles[part]);
    +      } catch (IOException e) {
    +        throw new DrillRuntimeException("Hash Aggregation failed to open spill file:
" + spillFiles[part]);
    +      }
    +    }
    +
    +    for (int currOutBatchIndex = 0; currOutBatchIndex < currPartition.size(); currOutBatchIndex++
) {
    +
    +      // get the number of records in the batch holder that are pending output
    +      int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput();
    +
    +      rowsInPartition += numPendingOutput;  // for logging
    +      rowsSpilled += numPendingOutput;
    +
    +      allocateOutgoing(numPendingOutput);
    +
    +      currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
    +      int numOutputRecords = outNumRecordsHolder.value;
    +
    +      this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value,
outNumRecordsHolder.value);
    +
    +      // set the value count for outgoing batch value vectors
    +      /* int i = 0; */
    +      for (VectorWrapper<?> v : outgoing) {
    +        v.getValueVector().getMutator().setValueCount(numOutputRecords);
    +        /*
    +        // print out the first row to be spilled ( varchar, varchar, bigint )
    +        try {
    +          if (i++ < 2) {
    +            NullableVarCharVector vv = ((NullableVarCharVector) v.getValueVector());
    +            logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
    +          } else {
    +            NullableBigIntVector vv = ((NullableBigIntVector) v.getValueVector());
    +            logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
    +          }
    +        } catch (Exception e) { logger.info("While printing the first row - Got an exception
= {}",e); }
    +        */
    +      }
    +
    +      outContainer.setRecordCount(numPendingOutput);
    +      WritableBatch batch = WritableBatch.getBatchNoHVWrap(numPendingOutput, outContainer,
false);
    +      VectorAccessibleSerializable outputBatch = new VectorAccessibleSerializable(batch,
allocator);
    +      Stopwatch watch = Stopwatch.createStarted();
    +      try {
    +        outputBatch.writeToStream(outputStream[part]);
    +      } catch (IOException e) {
    +        throw new DrillRuntimeException("Hash Aggregation failed to write to output stream:
" + outputStream[part].toString());
    +      }
    +      outContainer.zeroVectors();
    +      logger.trace("HASH AGG: Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS),
numPendingOutput);
    +    }
    +
    +    spilledBatchesCount[part] += currPartition.size(); // update count of spilled batches
    +
    +    logger.trace("HASH AGG: Spilled {} rows from {} batches of partition {}", rowsInPartition,
currPartition.size(), part);
    +  }
    +
    +  private void addBatchHolder(int part) {
    +
         BatchHolder bh = newBatchHolder();
    -    batchHolders.add(bh);
    +    batchHolders[part].add(bh);
     
         if (EXTRA_DEBUG_1) {
    -      logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size());
    +      logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders[part].size());
         }
     
         bh.setup();
       }
     
    -  // Overridden in the generated class when created as plain Java code.
    -
    +  // These methods are overridden in the generated class when created as plain Java code.
       protected BatchHolder newBatchHolder() {
         return new BatchHolder();
       }
    +  protected SpilledRecordbatch newSpilledRecordBatch(String arg1, int arg2, FragmentContext
arg4, BatchSchema arg5, OperatorContext arg6) {
    +    return new SpilledRecordbatch(arg1, arg2, arg4, arg5, arg6);
    +  }
     
    +  /**
    +   * Output the next batch from partition "nextPartitionToReturn"
    +   *
    +   * @return iteration outcome (e.g., OK, NONE ...)
    +   */
       @Override
       public IterOutcome outputCurrentBatch() {
    -    if (outBatchIndex >= batchHolders.size()) {
    -      this.outcome = IterOutcome.NONE;
    -      return outcome;
    +
    +    // when incoming was an empty batch, just finish up
    +    if ( schema == null ) {
    +      logger.trace("Incoming was empty; output is an empty batch.");
    +      this.outcome = IterOutcome.NONE; // no records were read
    +      allFlushed = true;
    +      return this.outcome;
         }
     
    -    // get the number of records in the batch holder that are pending output
    -    int numPendingOutput = batchHolders.get(outBatchIndex).getNumPendingOutput();
    +    // Initialization (covers the case of early output)
    +    ArrayList<BatchHolder> currPartition = batchHolders[earlyPartition];
    +    int currOutBatchIndex = outBatchIndex[earlyPartition];
    +    int partitionToReturn = earlyPartition;
    +
    +    if ( ! earlyOutput ) {
    +      // Update the next partition to return (if needed)
    +      // skip fully returned (or spilled) partitions
    +      while (nextPartitionToReturn < numPartitions) {
    +        //
    +        // If this partition was spilled - spill the rest of it and skip it
    +        //
    +        if ( isSpilled(nextPartitionToReturn) ) {
    +          spillAPartition(nextPartitionToReturn); // spill the rest
    +          SpilledPartition sp = new SpilledPartition();
    +          sp.spillFile = spillFiles[nextPartitionToReturn];
    +          sp.spilledBatches = spilledBatchesCount[nextPartitionToReturn];
    +          sp.cycleNum = cycleNum; // remember the current cycle
    +          sp.origPartn = nextPartitionToReturn; // for debugging / filename
    +          sp.prevOrigPartn = originalPartition; // for debugging / filename
    +          spilledPartitionsList.add(sp);
    +          try {
    +            reinitPartition(nextPartitionToReturn); // free the memory
    +          } catch (Exception e) {throw new RuntimeException(e);}
    +          try {
    +            long posn = spillSet.getPosition(outputStream[nextPartitionToReturn]);
    +            spillSet.tallyWriteBytes(posn); // for the IO stats
    +            outputStream[nextPartitionToReturn].close();
    +          } catch (IOException e) { throw new RuntimeException(e); }
    +          outputStream[nextPartitionToReturn] = null;
    +        }
    +        else {
    +          currPartition = batchHolders[nextPartitionToReturn];
    +          currOutBatchIndex = outBatchIndex[nextPartitionToReturn];
    +          // If curr batch (partition X index) is not empty - proceed to return it
    +          if (currOutBatchIndex < currPartition.size() && 0 != currPartition.get(currOutBatchIndex).getNumPendingOutput())
{
    +            break;
    +          }
    +        }
    +        nextPartitionToReturn++; // else check next partition
    +      }
    +
    +      // if passed the last partition
    +      if (nextPartitionToReturn >= numPartitions) {
    +        // The following "if" is probably never used; due to a similar check at the end
of this method
    +        if ( spilledPartitionsList.isEmpty() ) { // and no spilled partitions
    +          allFlushed = true;
    +          this.outcome = IterOutcome.NONE;
    +          if ( is2ndPhase ) {
    +            stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
    +                (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
    +          }
    +          return outcome;  // then return NONE
    +        }
    +        // Else - there are still spilled partitions to process - pick one and handle
just like a new incoming
    +        buildComplete = false; // go back and call doWork() again
    +        handlingSpills = true; // beginning to work on the spill files
    +        // pick a spilled partition; set a new incoming ...
    +        SpilledPartition sp = spilledPartitionsList.remove(0);
    +        SpilledRecordbatch newIncoming = newSpilledRecordBatch(sp.spillFile, sp.spilledBatches,
context, schema, oContext);
    +        originalPartition = sp.origPartn; // used for the filename
    +        logger.trace("Reading back spilled original partition {} as an incoming",originalPartition);
    +        // prevOriginalPartition = sp.prevOrigPartn;
    +        // Initialize .... new incoming, new set of partitions
    +        try { initializeSetup(newIncoming); } catch (Exception e) { throw new RuntimeException(e);
}
    +        // update the cycle num if needed
    +        // The current cycle num should always be one larger than in the spilled partition
    +        if ( cycleNum == sp.cycleNum ) {
    +          cycleNum = 1 + sp.cycleNum;
    +          stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats
    +          // report memory stressful situations
    +          if ( cycleNum == 2 ) { logger.info("SECONDARY SPILLING "); }
    +          if ( cycleNum == 3 ) { logger.info("TERTIARY SPILLING "); }
    +        }
    +        if ( EXTRA_DEBUG_SPILL ) {
    +          logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with
{} batches). More {} spilled partitions left.",
    +              sp.origPartn, sp.prevOrigPartn, sp.cycleNum, sp.spilledBatches, spilledPartitionsList.size());
    +        }
    +        return IterOutcome.RESTART;
    +      }
    +
    +      partitionToReturn = nextPartitionToReturn ;
     
    -    if (numPendingOutput == 0) {
    -      this.outcome = IterOutcome.NONE;
    -      return outcome;
         }
     
    +    // get the number of records in the batch holder that are pending output
    +    int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput();
    +
    +    // The following accounting is for logging, metrics, etc.
    +    rowsInPartition += numPendingOutput ;
    +    if ( ! handlingSpills ) { rowsNotSpilled += numPendingOutput; }
    +    else { rowsSpilledReturned += numPendingOutput; }
    +    if ( earlyOutput ) { rowsReturnedEarly += numPendingOutput; }
    +
         allocateOutgoing(numPendingOutput);
     
    -    batchHolders.get(outBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
    +    currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
         int numOutputRecords = outNumRecordsHolder.value;
     
         if (EXTRA_DEBUG_1) {
           logger.debug("After output values: outStartIdx = {}, outNumRecords = {}", outStartIdxHolder.value,
outNumRecordsHolder.value);
         }
    -    this.htable.outputKeys(outBatchIndex, this.outContainer, outStartIdxHolder.value,
outNumRecordsHolder.value);
    +
    +    this.htables[partitionToReturn].outputKeys(currOutBatchIndex, this.outContainer,
outStartIdxHolder.value, outNumRecordsHolder.value);
     
         // set the value count for outgoing batch value vectors
         for (VectorWrapper<?> v : outgoing) {
           v.getValueVector().getMutator().setValueCount(numOutputRecords);
         }
     
    -//    outputCount += numOutputRecords;
    -
         this.outcome = IterOutcome.OK;
     
    -    logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex,
numOutputRecords);
    +    // logger.debug("HashAggregate: Output {} current batch index {} with {} records
for partition {}.", earlyOutput ? "(early)" : "",
    +    //    outBatchIndex, numOutputRecords, partitionToReturn);
    +    if ( EXTRA_DEBUG_SPILL && is2ndPhase ) {
    +      logger.debug("So far returned {} + SpilledReturned {}  total {} (spilled {})",rowsNotSpilled,rowsSpilledReturned,
    +        rowsNotSpilled+rowsSpilledReturned,
    +        rowsSpilled);
    +    }
     
         lastBatchOutputCount = numOutputRecords;
    -    outBatchIndex++;
    -    if (outBatchIndex == batchHolders.size()) {
    -      allFlushed = true;
    +    outBatchIndex[partitionToReturn]++;
    +    // if just flushed the last batch in the partition
    +    if (outBatchIndex[partitionToReturn] == currPartition.size()) {
    +
    +      if ( EXTRA_DEBUG_SPILL ) {
    +        logger.debug("HashAggregate: {} Flushed partition {} with {} batches total {}
rows",
    +            earlyOutput ? "(Early)" : "",
    +            partitionToReturn, outBatchIndex[partitionToReturn], rowsInPartition);
    +      }
    +      rowsInPartition = 0; // reset to count for the next partition
    +
    +      try {
    +        // deallocate memory used by this partition, and re-initialize
    +        reinitPartition(partitionToReturn);
    +      } catch (SchemaChangeException sce) {
    +        throw new DrillRuntimeException("Hash Aggregation can not handle schema changes.");
    --- End diff --
    
    This isn't a real schema change, it is a bogus one that is an artifact of the way the
low level functions work. Throw an `IllegalStateException`.


> Support Spill to Disk for the Hash Aggregate Operator
> -----------------------------------------------------
>
>                 Key: DRILL-5457
>                 URL: https://issues.apache.org/jira/browse/DRILL-5457
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Relational Operators
>    Affects Versions: 1.10.0
>            Reporter: Boaz Ben-Zvi
>            Assignee: Boaz Ben-Zvi
>             Fix For: 1.11.0
>
>
> Support gradual spilling memory to disk as the available memory gets too small to allow
in memory work for the Hash Aggregate Operator.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message