Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 63D6C200CA3 for ; Thu, 1 Jun 2017 23:44:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6266B160BC4; Thu, 1 Jun 2017 21:44:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 805D4160BC1 for ; Thu, 1 Jun 2017 23:44:09 +0200 (CEST) Received: (qmail 93864 invoked by uid 500); 1 Jun 2017 21:44:08 -0000 Mailing-List: contact issues-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@drill.apache.org Delivered-To: mailing list issues@drill.apache.org Received: (qmail 93855 invoked by uid 99); 1 Jun 2017 21:44:08 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Jun 2017 21:44:08 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 596E0C04EF for ; Thu, 1 Jun 2017 21:44:08 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.001 X-Spam-Level: X-Spam-Status: No, score=-100.001 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id VId2ukLc6CAm for ; Thu, 1 Jun 2017 21:44:06 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id ADA345FDEE for ; Thu, 1 Jun 2017 21:44:05 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id DB9B8E0D1A for ; Thu, 1 Jun 2017 21:44:04 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 4667121B5C for ; Thu, 1 Jun 2017 21:44:04 +0000 (UTC) Date: Thu, 1 Jun 2017 21:44:04 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@drill.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 01 Jun 2017 21:44:10 -0000 [ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16033757#comment-16033757 ] ASF GitHub Bot commented on DRILL-5457: --------------------------------------- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119736678 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } - ChainedHashTable ht = + spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); + baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); - this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - + this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; - batchHolders = new ArrayList(); - // First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + + // Set the number of partitions from the configuration (raise to a power of two, if needed) + numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY); + if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled"); + } + while (Integer.bitCount(numPartitions) > 1) { // in case not a power of 2 + numPartitions++; + } + if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch + else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); + } + long memAvail = memoryLimit - allocator.getAllocatedMemory(); + if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) + } else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { + numPartitions /= 2; + if ( numPartitions < 2) { + if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at least 2 to make progress + break; + } + } + } + logger.trace("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", + numPartitions, canSpill ? "Can" : "Cannot"); + + // The following initial safety check should be revisited once we can lower the number of rows in a batch + // In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) + if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(10_000_000_000L); + } + // Based on the number of partitions: Set the mask and bit count + partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F + bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + + // Create arrays (one entry per partition) + htables = new HashTable[numPartitions] ; + batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; + outBatchIndex = new int[numPartitions] ; + outputStream = new OutputStream[numPartitions]; + spilledBatchesCount = new int[numPartitions]; + // spilledPaths = new Path[numPartitions]; + spillFiles = new String[numPartitions]; + spilledPartitionsList = new ArrayList(); + + plannedBatches = numPartitions; // each partition should allocate its first batch + + // initialize every (per partition) entry in the arrays + for (int i = 0; i < numPartitions; i++ ) { + try { + this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); + this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (IllegalStateException ise) {} // ignore + catch (Exception e) { throw new DrillRuntimeException(e); } + this.batchHolders[i] = new ArrayList(); // First BatchHolder is created when the first put request is received. + } + } + /** + * get new incoming: (when reading spilled files like an "incoming") + * @return The (newly replaced) incoming + */ + @Override + public RecordBatch getNewIncoming() { return incoming; } + + private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeException, ClassTransformationException, IOException { + baseHashTable.updateIncoming(newIncoming); // after a spill - a new incoming + this.incoming = newIncoming; + nextPartitionToReturn = 0; + for (int i = 0; i < numPartitions; i++ ) { + htables[i].reinit(newIncoming); + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { + bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = new ArrayList(); + } + outBatchIndex[i] = 0; + outputStream[i] = null; + spilledBatchesCount[i] = 0; + // spilledPaths[i] = null; + spillFiles[i] = null; + } + } + + /** + * Update the estimated max batch size to be used in the Hash Aggr Op. + * using the record batch size to get the row width. + * @param incoming + */ + private void updateEstMaxBatchSize(RecordBatch incoming) { + if ( estMaxBatchSize > 0 ) { return; } // no handling of a schema (or varchar) change + RecordBatchSizer sizer = new RecordBatchSizer(incoming); + logger.trace("Incoming sizer: {}",sizer); + // An empty batch only has the schema, can not tell actual length of varchars + // else use the actual varchars length, each capped at 50 (to match the space allocation) + estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50(); + estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE; + + // Get approx max (varchar) column width to get better memory allocation + maxColumnWidth = Math.max(sizer.maxSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE); + maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE); + + logger.trace("{} phase. Estimated row width: {} batch size: {} memory limit: {} max column width: {}", + isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth); + + if ( estMaxBatchSize > memoryLimit ) { + logger.warn("HashAggregate: Estimated max batch size {} is larger than the memory limit {}",estMaxBatchSize,memoryLimit); --- End diff -- True; this check alerts only of the more extreme cases, where the memory available initially can not hold even a single batch (easier to explain in a warning message). However note that following the execution of this code, there is another check (in delayedSetup() ) which does add the overhead (as a hard coded 8MB) and gives another warning alert (when down to 1 partition; this was just added in a prior review comment; see above). > Support Spill to Disk for the Hash Aggregate Operator > ----------------------------------------------------- > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators > Affects Versions: 1.10.0 > Reporter: Boaz Ben-Zvi > Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)