Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5A4BF200A01 for ; Tue, 3 May 2016 19:50:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 597321609F8; Tue, 3 May 2016 19:50:46 +0200 (CEST) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A73C51609F6 for ; Tue, 3 May 2016 19:50:45 +0200 (CEST) Received: (qmail 56661 invoked by uid 500); 3 May 2016 17:50:44 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 56554 invoked by uid 99); 3 May 2016 17:50:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 May 2016 17:50:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 83013DFE04; Tue, 3 May 2016 17:50:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: parthc@apache.org To: commits@drill.apache.org Date: Tue, 03 May 2016 17:50:45 -0000 Message-Id: <2ff8e2d6d4634ecea6a95b08007bd88f@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/5] drill git commit: DRILL-2100: Added deleting temporary spill directories when query is finished. archived-at: Tue, 03 May 2016 17:50:46 -0000 DRILL-2100: Added deleting temporary spill directories when query is finished. This closes #454 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/38e1016c Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/38e1016c Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/38e1016c Branch: refs/heads/master Commit: 38e1016c49786acaacb153ee37784b3ce3023eb5 Parents: 1a89a7f Author: Vitalii Diravka Authored: Mon Mar 28 18:05:22 2016 +0000 Committer: Parth Chandra Committed: Tue May 3 10:50:09 2016 -0700 ---------------------------------------------------------------------- .../physical/impl/xsort/ExternalSortBatch.java | 31 ++++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/38e1016c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 0ee518e..32df705 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -22,8 +22,10 @@ import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Sets; import org.apache.calcite.rel.RelFieldCollation.Direction; import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.config.DrillConfig; @@ -72,6 +74,7 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import com.google.common.base.Joiner; import com.google.common.base.Stopwatch; @@ -116,6 +119,7 @@ public class ExternalSortBatch extends AbstractRecordBatch { private boolean first = true; private int targetRecordCount; private final String fileName; + private Set currSpillDirs = Sets.newTreeSet(); private int firstSpillBatchCount = 0; private int peakNumBatches = -1; @@ -158,7 +162,7 @@ public class ExternalSortBatch extends AbstractRecordBatch { copierAllocator = oAllocator.newChildAllocator(oAllocator.getName() + ":copier", PriorityQueueCopier.INITIAL_ALLOCATION, PriorityQueueCopier.MAX_ALLOCATION); FragmentHandle handle = context.getHandle(); - fileName = String.format("%s/major_fragment_%s/minor_fragment_%s/operator_%s", QueryIdHelper.getQueryId(handle.getQueryId()), + fileName = String.format("%s_majorfragment%s_minorfragment%s_operator%s", QueryIdHelper.getQueryId(handle.getQueryId()), handle.getMajorFragmentId(), handle.getMinorFragmentId(), popConfig.getOperatorId()); } @@ -223,7 +227,19 @@ public class ExternalSortBatch extends AbstractRecordBatch { if (mSorter != null) { mSorter.clear(); } - + for(Iterator iter = this.currSpillDirs.iterator(); iter.hasNext(); iter.remove()) { + Path path = (Path)iter.next(); + try { + if (fs != null && path != null && fs.exists(path)) { + if (fs.delete(path, true)) { + fs.cancelDeleteOnExit(path); + } + } + } catch (IOException e) { + // since this is meant to be used in a batches's cleanup, we don't propagate the exception + logger.warn("Unable to delete spill directory " + path, e); + } + } } } @@ -554,7 +570,16 @@ public class ExternalSortBatch extends AbstractRecordBatch { c1.buildSchema(BatchSchema.SelectionVectorMode.NONE); c1.setRecordCount(count); - String outputFile = Joiner.on("/").join(dirs.next(), fileName, spillCount++); + String spillDir = dirs.next(); + Path currSpillPath = new Path(Joiner.on("/").join(spillDir, fileName)); + currSpillDirs.add(currSpillPath); + String outputFile = Joiner.on("/").join(currSpillPath, spillCount++); + try { + fs.deleteOnExit(currSpillPath); + } catch (IOException e) { + // since this is meant to be used in a batches's spilling, we don't propagate the exception + logger.warn("Unable to mark spill directory " + currSpillPath + " for deleting on exit", e); + } stats.setLongStat(Metric.SPILL_COUNT, spillCount); BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext); try (AutoCloseable a = AutoCloseables.all(batchGroupList)) {