drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
Date Sat, 27 May 2017 05:39:06 GMT

    [ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027231#comment-16027231
] 

ASF GitHub Bot commented on DRILL-5457:
---------------------------------------

Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r118812385
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
---
    @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig
htConfig, Fragme
           }
         }
     
    -    ChainedHashTable ht =
    +    spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE);
    +    baseHashTable =
             new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming
probe */, outgoing);
    -    this.htable = ht.createAndSetupHashTable(groupByOutFieldIds);
    -
    +    this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and
to allow recreating hash tables (after a spill)
         numGroupByOutFields = groupByOutFieldIds.length;
    -    batchHolders = new ArrayList<BatchHolder>();
    -    // First BatchHolder is created when the first put request is received.
     
         doSetup(incoming);
       }
     
    +  /**
    +   *  Delayed setup are the parts from setup() that can only be set after actual data
arrives in incoming
    +   *  This data is used to compute the number of partitions.
    +   */
    +  private void delayedSetup() {
    +
    +    // Set the number of partitions from the configuration (raise to a power of two,
if needed)
    +    numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY);
    +    if ( numPartitions == 1 ) {
    +      canSpill = false;
    +      logger.warn("Spilling was disabled");
    +    }
    +    while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2
    +      numPartitions++;
    +    }
    +    if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch
    +    else {
    +      // Estimate the max batch size; should use actual data (e.g. lengths of varchars)
    +      updateEstMaxBatchSize(incoming);
    +    }
    +    long memAvail = memoryLimit - allocator.getAllocatedMemory();
    +    if ( !canSpill ) { // single phase, or spill disabled by configuation
    +      numPartitions = 1; // single phase should use only a single partition (to save
memory)
    +    } else { // two phase
    +      // Adjust down the number of partitions if needed - when the memory available can
not hold as
    +      // many batches (configurable option), plus overhead (e.g. hash table, links, hash
values))
    +      while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 *
1024) > memAvail ) {
    +        numPartitions /= 2;
    +        if ( numPartitions < 2) {
    +          if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make
progress
    +          break;
    +        }
    +      }
    +    }
    +    logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",
    +        numPartitions, canSpill ? "Can" : "Cannot");
    +
    +    // The following initial safety check should be revisited once we can lower the number
of rows in a batch
    +    // In cases of very tight memory -- need at least memory to process one batch, plus
overhead (e.g. hash table)
    +    if ( numPartitions == 1 ) {
    +      // if too little memory - behave like the old code -- no memory limit for hash
aggregate
    +      allocator.setLimit(10_000_000_000L);
    +    }
    +    // Based on the number of partitions: Set the mask and bit count
    +    partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
    +    bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
    +
    +    // Create arrays (one entry per partition)
    +    htables = new HashTable[numPartitions] ;
    +    batchHolders = (ArrayList<BatchHolder>[]) new ArrayList<?>[numPartitions]
;
    +    outBatchIndex = new int[numPartitions] ;
    +    outputStream = new OutputStream[numPartitions];
    +    spilledBatchesCount = new int[numPartitions];
    +    // spilledPaths = new Path[numPartitions];
    +    spillFiles = new String[numPartitions];
    +    spilledPartitionsList = new ArrayList<SpilledPartition>();
    +
    +    plannedBatches = numPartitions; // each partition should allocate its first batch
    +
    +    // initialize every (per partition) entry in the arrays
    +    for (int i = 0; i < numPartitions; i++ ) {
    +      try {
    +        this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions);
    +        this.htables[i].setMaxVarcharSize(maxColumnWidth);
    +      } catch (IllegalStateException ise) {} // ignore
    --- End diff --
    
    Here, rather than doing a loop that keeps arrays in sync, if we go with the partition
state class, we simply call, say, `setup()` for each partition. Maybe even just call the constructor
and let it set up the per-partition state.


> Support Spill to Disk for the Hash Aggregate Operator
> -----------------------------------------------------
>
>                 Key: DRILL-5457
>                 URL: https://issues.apache.org/jira/browse/DRILL-5457
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Relational Operators
>    Affects Versions: 1.10.0
>            Reporter: Boaz Ben-Zvi
>            Assignee: Boaz Ben-Zvi
>             Fix For: 1.11.0
>
>
> Support gradual spilling memory to disk as the available memory gets too small to allow
in memory work for the Hash Aggregate Operator.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message