Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6035E200CAA for ; Sat, 3 Jun 2017 03:18:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5EE75160BD2; Sat, 3 Jun 2017 01:18:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 58021160BDD for ; Sat, 3 Jun 2017 03:18:09 +0200 (CEST) Received: (qmail 49749 invoked by uid 500); 3 Jun 2017 01:18:08 -0000 Mailing-List: contact issues-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@drill.apache.org Delivered-To: mailing list issues@drill.apache.org Received: (qmail 49739 invoked by uid 99); 3 Jun 2017 01:18:08 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 03 Jun 2017 01:18:08 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 2E1B3C0027 for ; Sat, 3 Jun 2017 01:18:08 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.001 X-Spam-Level: X-Spam-Status: No, score=-100.001 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id W_9LJQ9EEohg for ; Sat, 3 Jun 2017 01:18:06 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id A59235FAFA for ; Sat, 3 Jun 2017 01:18:05 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id D1C23E0905 for ; Sat, 3 Jun 2017 01:18:04 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 4A28820DF5 for ; Sat, 3 Jun 2017 01:18:04 +0000 (UTC) Date: Sat, 3 Jun 2017 01:18:04 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@drill.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Sat, 03 Jun 2017 01:18:10 -0000 [ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035741#comment-16035741 ] ASF GitHub Bot commented on DRILL-5457: --------------------------------------- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119975302 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { - // return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { - if (htable != null) { - htable.clear(); - htable = null; - } + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { + bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { + try { + spillSet.delete(spillFiles[i]); + } catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); + } + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { + SpilledPartition sp = spilledPartitionsList.remove(0); + try { + spillSet.delete(sp.spillFile); + } catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); + } + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } - if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { + assert htables[part] != null; + htables[part].reset(); + if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } + batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -// this.outcome = IterOutcome.OK; -// for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -// } -// return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } - currentIndex = getVectorIndex(underlyingIndex); + try { currentIndex = getVectorIndex(underlyingIndex); } + catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { + return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int currPart) { + if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the current partition + int currPartSize = batchHolders[currPart].size(); + if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if size is 1 + // first find the largest spilled partition + int maxSizeSpilled = -1; + int indexMaxSpilled = -1; + for (int isp = 0; isp < numPartitions; isp++ ) { + if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) { + maxSizeSpilled = batchHolders[isp].size(); + indexMaxSpilled = isp; + } + } + // Give the current (if already spilled) some priority + if ( isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) { + maxSizeSpilled = currPartSize ; + indexMaxSpilled = currPart; + } + // now find the largest non-spilled partition + int maxSize = -1; + int indexMax = -1; + // Use the largest spilled (if found) as a base line, with a factor of 4 + if ( indexMaxSpilled > -1 && maxSizeSpilled > 1 ) { + indexMax = indexMaxSpilled; + maxSize = 4 * maxSizeSpilled ; + } + for ( int insp = 0; insp < numPartitions; insp++) { + if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) { + indexMax = insp; + maxSize = batchHolders[insp].size(); + } + } + // again - priority to the current partition + if ( ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) { + return currPart; + } + if ( maxSize <= 1 ) { // Can not make progress by spilling a single batch! + return -1; // try skipping this spill + } + return indexMax; + } + + /** + * Iterate through the batches of the given partition, writing them to a file + * + * @param part The partition (number) to spill + */ + private void spillAPartition(int part) { + + ArrayList currPartition = batchHolders[part]; + rowsInPartition = 0; + if ( EXTRA_DEBUG_SPILL ) { + logger.debug("HashAggregate: Spilling partition {} current cycle {} part size {}", part, cycleNum, currPartition.size()); + } + + if ( currPartition.size() == 0 ) { return; } // in case empty - nothing to spill + + // If this is the first spill for this partition, create an output stream + if ( ! isSpilled(part) ) { + + spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? Integer.toString(cycleNum) : null); + + try { + outputStream[part] = spillSet.openForOutput(spillFiles[part]); + } catch (IOException e) { + throw new DrillRuntimeException("Hash Aggregation failed to open spill file: " + spillFiles[part]); + } + } + + for (int currOutBatchIndex = 0; currOutBatchIndex < currPartition.size(); currOutBatchIndex++ ) { + + // get the number of records in the batch holder that are pending output + int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput(); + + rowsInPartition += numPendingOutput; // for logging + rowsSpilled += numPendingOutput; + + allocateOutgoing(numPendingOutput); + + currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder); + int numOutputRecords = outNumRecordsHolder.value; + + this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value); + + // set the value count for outgoing batch value vectors + /* int i = 0; */ + for (VectorWrapper v : outgoing) { + v.getValueVector().getMutator().setValueCount(numOutputRecords); + /* + // print out the first row to be spilled ( varchar, varchar, bigint ) + try { + if (i++ < 2) { + NullableVarCharVector vv = ((NullableVarCharVector) v.getValueVector()); + logger.info("FIRST ROW = {}", vv.getAccessor().get(0)); + } else { + NullableBigIntVector vv = ((NullableBigIntVector) v.getValueVector()); + logger.info("FIRST ROW = {}", vv.getAccessor().get(0)); + } + } catch (Exception e) { logger.info("While printing the first row - Got an exception = {}",e); } + */ + } + + outContainer.setRecordCount(numPendingOutput); + WritableBatch batch = WritableBatch.getBatchNoHVWrap(numPendingOutput, outContainer, false); + VectorAccessibleSerializable outputBatch = new VectorAccessibleSerializable(batch, allocator); + Stopwatch watch = Stopwatch.createStarted(); + try { + outputBatch.writeToStream(outputStream[part]); + } catch (IOException e) { + throw new DrillRuntimeException("Hash Aggregation failed to write to output stream: " + outputStream[part].toString()); + } + outContainer.zeroVectors(); + logger.trace("HASH AGG: Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), numPendingOutput); + } + + spilledBatchesCount[part] += currPartition.size(); // update count of spilled batches + + logger.trace("HASH AGG: Spilled {} rows from {} batches of partition {}", rowsInPartition, currPartition.size(), part); + } + + private void addBatchHolder(int part) { + BatchHolder bh = newBatchHolder(); - batchHolders.add(bh); + batchHolders[part].add(bh); if (EXTRA_DEBUG_1) { - logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size()); + logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders[part].size()); } bh.setup(); } - // Overridden in the generated class when created as plain Java code. - + // These methods are overridden in the generated class when created as plain Java code. protected BatchHolder newBatchHolder() { return new BatchHolder(); } + protected SpilledRecordbatch newSpilledRecordBatch(String arg1, int arg2, FragmentContext arg4, BatchSchema arg5, OperatorContext arg6) { + return new SpilledRecordbatch(arg1, arg2, arg4, arg5, arg6); + } + /** + * Output the next batch from partition "nextPartitionToReturn" + * + * @return iteration outcome (e.g., OK, NONE ...) + */ @Override public IterOutcome outputCurrentBatch() { - if (outBatchIndex >= batchHolders.size()) { - this.outcome = IterOutcome.NONE; - return outcome; + + // when incoming was an empty batch, just finish up + if ( schema == null ) { + logger.trace("Incoming was empty; output is an empty batch."); + this.outcome = IterOutcome.NONE; // no records were read + allFlushed = true; + return this.outcome; --- End diff -- This code was changed anyway, as part of eliminating RESTART. > Support Spill to Disk for the Hash Aggregate Operator > ----------------------------------------------------- > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators > Affects Versions: 1.10.0 > Reporter: Boaz Ben-Zvi > Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)