drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
Date Thu, 15 Jun 2017 22:13:00 GMT

    [ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051148#comment-16051148
] 

ASF GitHub Bot commented on DRILL-5457:
---------------------------------------

Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r122324618
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
---
    @@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch incoming) {
         }
       }
     
    +  /**
    +   *  Read and process (i.e., insert into the hash table and aggregate) records from
the current batch.
    +   *  Once complete, get the incoming NEXT batch and process it as well, etc.
    +   *  For 1st phase, may return when an early output needs to be performed.
    +   *
    +   * @return Agg outcome status
    +   */
       @Override
       public AggOutcome doWork() {
    -    try {
    -      // Note: Keeping the outer and inner try blocks here to maintain some similarity
with
    -      // StreamingAggregate which does somethings conditionally in the outer try block.
    -      // In the future HashAggregate may also need to perform some actions conditionally
    -      // in the outer try block.
    -
    -      assert ! handlingSpills || currentIndex < Integer.MAX_VALUE;
     
    -      outside:
    -      while (true) {
    +    while (true) {
     
    -        // This would be called only once - after actual data arrives on incoming
    -        if ( schema == null && incoming.getRecordCount() > 0 ) {
    -          this.schema = incoming.getSchema();
    -          // Calculate the number of partitions based on actual incoming data
    -          delayedSetup();
    -        }
    +      // This would be called only once - first time actual data arrives on incoming
    +      if ( schema == null && incoming.getRecordCount() > 0 ) {
    +        this.schema = incoming.getSchema();
    +        currentBatchRecordCount = incoming.getRecordCount(); // initialize for first
non empty batch
    +        // Calculate the number of partitions based on actual incoming data
    +        delayedSetup();
    +      }
     
    -        // loop through existing records, aggregating the values as necessary.
    -        if (EXTRA_DEBUG_1) {
    -          logger.debug("Starting outer loop of doWork()...");
    +      //
    +      //  loop through existing records in this batch, aggregating the values as necessary.
    +      //
    +      if (EXTRA_DEBUG_1) {
    +        logger.debug("Starting outer loop of doWork()...");
    +      }
    +      for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
    +        if (EXTRA_DEBUG_2) {
    +          logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex,
currentIndex);
             }
    -        for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
    -          if (EXTRA_DEBUG_2) {
    -            logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex,
currentIndex);
    -          }
    -          checkGroupAndAggrValues(currentIndex);
    -          // If adding a group discovered a memory pressure during 1st phase, then start
    -          // outputing some partition to free memory.
    -          if ( earlyOutput ) {
    -            outputCurrentBatch();
    -            incIndex(); // next time continue with the next incoming row
    -            return AggOutcome.RETURN_OUTCOME;
    -          }
    +        checkGroupAndAggrValues(currentIndex);
    +        // If adding a group discovered a memory pressure during 1st phase, then start
    +        // outputing some partition downstream in order to free memory.
    +        if ( earlyOutput ) {
    +          outputCurrentBatch();
    +          incIndex(); // next time continue with the next incoming row
    +          return AggOutcome.RETURN_OUTCOME;
             }
    +      }
    +
    +      if (EXTRA_DEBUG_1) {
    +        logger.debug("Processed {} records", underlyingIndex);
    +      }
     
    -        if (EXTRA_DEBUG_1) {
    -          logger.debug("Processed {} records", underlyingIndex);
    +      // Cleanup the previous batch since we are done processing it.
    +      for (VectorWrapper<?> v : incoming) {
    +        v.getValueVector().clear();
    +      }
    +      //
    +      // Get the NEXT input batch, initially from the upstream, later (if there was a
spill)
    +      // from one of the spill files (The spill case is handled differently here to avoid
    +      // collecting stats on the spilled records)
    +      //
    +      if ( handlingSpills ) {
    +        outcome = context.shouldContinue() ? incoming.next() : IterOutcome.STOP;
    +      } else {
    +        long beforeAlloc = allocator.getAllocatedMemory();
    +
    +        // Get the next RecordBatch from the incoming (i.e. upstream operator)
    +        outcome = outgoing.next(0, incoming);
    +
    +        // If incoming batch is bigger than our estimate - adjust the estimate to match
    +        long afterAlloc = allocator.getAllocatedMemory();
    +        long incomingBatchSize = afterAlloc - beforeAlloc;
    +        if ( estMaxBatchSize < incomingBatchSize) {
    +          logger.trace("Found a bigger incoming batch: {} , prior estimate was: {}",
incomingBatchSize, estMaxBatchSize);
    +          estMaxBatchSize = incomingBatchSize;
             }
    +      }
     
    -        try {
    +      if (EXTRA_DEBUG_1) {
    +        logger.debug("Received IterOutcome of {}", outcome);
    +      }
     
    -          while (true) {
    -            // Cleanup the previous batch since we are done processing it.
    -            long pre = allocator.getAllocatedMemory();
    -            for (VectorWrapper<?> v : incoming) {
    -              v.getValueVector().clear();
    -            }
    -            long beforeAlloc = allocator.getAllocatedMemory();
    +      // Handle various results from getting the next batch
    +      switch (outcome) {
    +        case OUT_OF_MEMORY:
    +        case NOT_YET:
    --- End diff --
    
    The problem with `OUT_OF_MEMORY` is that it basically does not work. But, if this is original
code, then we can address that problem another time.


> Support Spill to Disk for the Hash Aggregate Operator
> -----------------------------------------------------
>
>                 Key: DRILL-5457
>                 URL: https://issues.apache.org/jira/browse/DRILL-5457
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Relational Operators
>    Affects Versions: 1.10.0
>            Reporter: Boaz Ben-Zvi
>            Assignee: Boaz Ben-Zvi
>             Fix For: 1.11.0
>
>
> Support gradual spilling memory to disk as the available memory gets too small to allow
in memory work for the Hash Aggregate Operator.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message