drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From paul-rogers <...@git.apache.org>
Subject [GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate
Date Sat, 27 May 2017 05:38:12 GMT
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r118813868
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
---
    @@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
     
       @Override
       public int getOutputCount() {
    -    // return outputCount;
         return lastBatchOutputCount;
       }
     
       @Override
       public void cleanup() {
    -    if (htable != null) {
    -      htable.clear();
    -      htable = null;
    -    }
    +      if ( schema == null ) { return; } // not set up; nothing to clean
    +      for ( int i = 0; i < numPartitions; i++) {
    +          if (htables[i] != null) {
    +              htables[i].clear();
    +              htables[i] = null;
    +          }
    +          if ( batchHolders[i] != null) {
    +              for (BatchHolder bh : batchHolders[i]) {
    +                    bh.clear();
    +              }
    +              batchHolders[i].clear();
    +              batchHolders[i] = null;
    +          }
    +
    +          // delete any (still active) output spill file
    +          if ( outputStream[i] != null && spillFiles[i] != null) {
    +            try {
    +              spillSet.delete(spillFiles[i]);
    +            } catch(IOException e) {
    +              logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]);
    +            }
    +          }
    +      }
    +      // delete any spill file left in unread spilled partitions
    +      while ( ! spilledPartitionsList.isEmpty() ) {
    +        SpilledPartition sp = spilledPartitionsList.remove(0);
    +        try {
    +          spillSet.delete(sp.spillFile);
    +        } catch(IOException e) {
    +          logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile);
    +        }
    +      }
    +      spillSet.close(); // delete the spill directory(ies)
         htIdxHolder = null;
         materializedValueFields = null;
         outStartIdxHolder = null;
         outNumRecordsHolder = null;
    +  }
     
    -    if (batchHolders != null) {
    -      for (BatchHolder bh : batchHolders) {
    +  // First free the memory used by the given (spilled) partition (i.e., hash table plus
batches)
    +  // then reallocate them in pristine state to allow the partition to continue receiving
rows
    +  private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException,
IOException {
    --- End diff --
    
    Method on the partition state class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message