Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2567C200C86 for ; Wed, 31 May 2017 23:41:24 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 23BF1160BDB; Wed, 31 May 2017 21:41:24 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 697D6160BC2 for ; Wed, 31 May 2017 23:41:23 +0200 (CEST) Received: (qmail 77313 invoked by uid 500); 31 May 2017 21:41:22 -0000 Mailing-List: contact dev-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@drill.apache.org Delivered-To: mailing list dev@drill.apache.org Received: (qmail 77300 invoked by uid 99); 31 May 2017 21:41:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 May 2017 21:41:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 31142E0016; Wed, 31 May 2017 21:41:22 +0000 (UTC) From: Ben-Zvi To: dev@drill.apache.org Reply-To: dev@drill.apache.org References: In-Reply-To: Subject: [GitHub] drill pull request #822: DRILL-5457: Spill implementation for Hash Aggregate Content-Type: text/plain Message-Id: <20170531214122.31142E0016@git1-us-west.apache.org> Date: Wed, 31 May 2017 21:41:22 +0000 (UTC) archived-at: Wed, 31 May 2017 21:41:24 -0000 Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119481421 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } - ChainedHashTable ht = + spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); + baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); - this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - + this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; - batchHolders = new ArrayList(); - // First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + + // Set the number of partitions from the configuration (raise to a power of two, if needed) + numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); + if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); + } + while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; + } + if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch + else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); + } + long memAvail = memoryLimit - allocator.getAllocatedMemory(); + if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) + } else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { + numPartitions /= 2; + if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; + } + } + } + logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", + numPartitions, canSpill ? "Can" : "Cannot"); + + // The following initial safety check should be revisited once we can lower the number of rows in a batch + // In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) + if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); + } + // Based on the number of partitions: Set the mask and bit count + partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F + bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + + // Create arrays (one entry per partition) + htables = new HashTable[numPartitions] ; + batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; + outBatchIndex = new int[numPartitions] ; + outputStream = new OutputStream[numPartitions]; + spilledBatchesCount = new int[numPartitions]; + // spilledPaths = new Path[numPartitions]; + spillFiles = new String[numPartitions]; + spilledPartitionsList = new ArrayList(); + + plannedBatches = numPartitions; // each partition should allocate its first batch + + // initialize every (per partition) entry in the arrays + for (int i = 0; i < numPartitions; i++ ) { + try { + this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); + this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (IllegalStateException ise) {} // ignore + catch (Exception e) { throw new DrillRuntimeException(e); } --- End diff -- Done: ` } catch (ClassTransformationException e) { throw UserException.unsupportedError(e) .message("Code generation error - likely an error in the code.") .build(logger); } catch (IOException e) { throw UserException.resourceError(e) .message("IO Error while creating a hash table.") .build(logger); } catch (SchemaChangeException sce) { throw new IllegalStateException("Unexpected Schema Change while creating a hash table",sce); }` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---