drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From paul-rogers <...@git.apache.org>
Subject [GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate
Date Sat, 27 May 2017 05:38:09 GMT
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r118812385
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
---
    @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig
htConfig, Fragme
           }
         }
     
    -    ChainedHashTable ht =
    +    spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE);
    +    baseHashTable =
             new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming
probe */, outgoing);
    -    this.htable = ht.createAndSetupHashTable(groupByOutFieldIds);
    -
    +    this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and
to allow recreating hash tables (after a spill)
         numGroupByOutFields = groupByOutFieldIds.length;
    -    batchHolders = new ArrayList<BatchHolder>();
    -    // First BatchHolder is created when the first put request is received.
     
         doSetup(incoming);
       }
     
    +  /**
    +   *  Delayed setup are the parts from setup() that can only be set after actual data
arrives in incoming
    +   *  This data is used to compute the number of partitions.
    +   */
    +  private void delayedSetup() {
    +
    +    // Set the number of partitions from the configuration (raise to a power of two,
if needed)
    +    numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY);
    +    if ( numPartitions == 1 ) {
    +      canSpill = false;
    +      logger.warn("Spilling was disabled");
    +    }
    +    while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2
    +      numPartitions++;
    +    }
    +    if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch
    +    else {
    +      // Estimate the max batch size; should use actual data (e.g. lengths of varchars)
    +      updateEstMaxBatchSize(incoming);
    +    }
    +    long memAvail = memoryLimit - allocator.getAllocatedMemory();
    +    if ( !canSpill ) { // single phase, or spill disabled by configuation
    +      numPartitions = 1; // single phase should use only a single partition (to save
memory)
    +    } else { // two phase
    +      // Adjust down the number of partitions if needed - when the memory available can
not hold as
    +      // many batches (configurable option), plus overhead (e.g. hash table, links, hash
values))
    +      while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 *
1024) > memAvail ) {
    +        numPartitions /= 2;
    +        if ( numPartitions < 2) {
    +          if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make
progress
    +          break;
    +        }
    +      }
    +    }
    +    logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",
    +        numPartitions, canSpill ? "Can" : "Cannot");
    +
    +    // The following initial safety check should be revisited once we can lower the number
of rows in a batch
    +    // In cases of very tight memory -- need at least memory to process one batch, plus
overhead (e.g. hash table)
    +    if ( numPartitions == 1 ) {
    +      // if too little memory - behave like the old code -- no memory limit for hash
aggregate
    +      allocator.setLimit(10_000_000_000L);
    +    }
    +    // Based on the number of partitions: Set the mask and bit count
    +    partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
    +    bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
    +
    +    // Create arrays (one entry per partition)
    +    htables = new HashTable[numPartitions] ;
    +    batchHolders = (ArrayList<BatchHolder>[]) new ArrayList<?>[numPartitions]
;
    +    outBatchIndex = new int[numPartitions] ;
    +    outputStream = new OutputStream[numPartitions];
    +    spilledBatchesCount = new int[numPartitions];
    +    // spilledPaths = new Path[numPartitions];
    +    spillFiles = new String[numPartitions];
    +    spilledPartitionsList = new ArrayList<SpilledPartition>();
    +
    +    plannedBatches = numPartitions; // each partition should allocate its first batch
    +
    +    // initialize every (per partition) entry in the arrays
    +    for (int i = 0; i < numPartitions; i++ ) {
    +      try {
    +        this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions);
    +        this.htables[i].setMaxVarcharSize(maxColumnWidth);
    +      } catch (IllegalStateException ise) {} // ignore
    --- End diff --
    
    Here, rather than doing a loop that keeps arrays in sync, if we go with the partition
state class, we simply call, say, `setup()` for each partition. Maybe even just call the constructor
and let it set up the per-partition state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message