Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9324D18A6B for ; Fri, 15 May 2015 02:52:07 +0000 (UTC) Received: (qmail 84396 invoked by uid 500); 15 May 2015 02:52:07 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 84364 invoked by uid 500); 15 May 2015 02:52:07 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 84355 invoked by uid 99); 15 May 2015 02:52:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 May 2015 02:52:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 52B9FE0978; Fri, 15 May 2015 02:52:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: smp@apache.org To: commits@drill.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: drill git commit: DRILL-3065: Ensure the selection vectors in mSort to be closed after failure happens Date: Fri, 15 May 2015 02:52:07 +0000 (UTC) Repository: drill Updated Branches: refs/heads/master 7f575df33 -> 5c4a9b212 DRILL-3065: Ensure the selection vectors in mSort to be closed after failure happens Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5c4a9b21 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5c4a9b21 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5c4a9b21 Branch: refs/heads/master Commit: 5c4a9b2129dc7c2ae4a4fca9640ddbf990da126b Parents: 7f575df Author: Hsuan-Yi Chu Authored: Wed May 13 17:17:53 2015 -0700 Committer: Hsuan-Yi Chu Committed: Thu May 14 19:28:26 2015 -0700 ---------------------------------------------------------------------- .../physical/impl/xsort/ExternalSortBatch.java | 15 ++++++ .../exec/physical/impl/xsort/MSortTemplate.java | 11 ++++ .../drill/exec/physical/impl/xsort/MSorter.java | 1 + .../exec/server/TestDrillbitResilience.java | 54 +++++++++++++++++++- 4 files changed, 79 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/5c4a9b21/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 529a6ca..de4a86e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -63,6 +63,7 @@ import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.store.ischema.Records; +import org.apache.drill.exec.testing.ExecutionControlsInjector; import org.apache.drill.exec.vector.CopyUtil; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; @@ -112,6 +113,10 @@ public class ExternalSortBatch extends AbstractRecordBatch { private final String fileName; private int firstSpillBatchCount = 0; + public static final String INTERRUPTION_AFTER_SORT = "after-sort"; + public static final String INTERRUPTION_AFTER_SETUP = "after-setup"; + private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(ExternalSortBatch.class); + public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { super(popConfig, context, true); this.incoming = incoming; @@ -172,6 +177,10 @@ public class ExternalSortBatch extends AbstractRecordBatch { } copierAllocator.close(); super.close(); + + if(mSorter != null) { + mSorter.clear(); + } } } @@ -366,12 +375,18 @@ public class ExternalSortBatch extends AbstractRecordBatch { sv4 = builder.getSv4(); mSorter = createNewMSorter(); mSorter.setup(context, oContext.getAllocator(), getSelectionVector4(), this.container); + + // For testing memory-leak purpose, inject exception after mSorter finishes setup + injector.injectUnchecked(context.getExecutionControls(), INTERRUPTION_AFTER_SETUP); mSorter.sort(this.container); // sort may have prematurely exited due to should continue returning false. if (!context.shouldContinue()) { return IterOutcome.STOP; } + + // For testing memory-leak purpose, inject exception after mSorter finishes sorting + injector.injectUnchecked(context.getExecutionControls(), INTERRUPTION_AFTER_SORT); sv4 = mSorter.getSV4(); long t = watch.elapsed(TimeUnit.MICROSECONDS); http://git-wip-us.apache.org/repos/asf/drill/blob/5c4a9b21/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java index 9acae9e..9b21ae3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java @@ -171,6 +171,17 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{ return doEval(sv1, sv2); } + @Override + public void clear() { + if(vector4 != null) { + vector4.clear(); + } + + if(aux != null) { + aux.clear(); + } + } + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing); public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex); http://git-wip-us.apache.org/repos/asf/drill/blob/5c4a9b21/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java index d97ffc0..af8cbfb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java @@ -33,4 +33,5 @@ public interface MSorter { public static TemplateClassDefinition TEMPLATE_DEFINITION = new TemplateClassDefinition(MSorter.class, MSortTemplate.class); + public void clear(); } http://git-wip-us.apache.org/repos/asf/drill/blob/5c4a9b21/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java index f95fbe1..8552ec1 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java @@ -48,10 +48,13 @@ import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.physical.impl.ScreenCreator; import org.apache.drill.exec.physical.impl.SingleSenderCreator.SingleSenderRootExec; +import org.apache.drill.exec.physical.impl.filter.FilterRecordBatch; import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch; import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec; import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator; +import org.apache.drill.exec.physical.impl.union.UnionAllRecordBatch; import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch; +import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch; import org.apache.drill.exec.planner.sql.DrillSqlWorker; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; @@ -682,16 +685,21 @@ public class TestDrillbitResilience extends DrillTest { * Given a set of controls, this method ensures TEST_QUERY fails with the given class and desc. */ private static void assertFailsWithException(final String controls, final Class exceptionClass, - final String exceptionDesc) { + final String exceptionDesc, final String query) { setControls(controls); final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(); - QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener); + QueryTestUtil.testWithListener(drillClient, QueryType.SQL, query, listener); final Pair result = listener.waitForCompletion(); final QueryState state = result.getFirst(); assertTrue(String.format("Query state should be FAILED (and not %s).", state), state == QueryState.FAILED); assertExceptionInjected(result.getSecond(), exceptionClass, exceptionDesc); } + private static void assertFailsWithException(final String controls, final Class exceptionClass, + final String exceptionDesc) { + assertFailsWithException(controls, exceptionClass, exceptionDesc, TEST_QUERY); + } + @Test // Completion TC 2: failed query - before query is executed - while sql parsing public void failsWhenParsing() { final String exceptionDesc = "sql-parsing"; @@ -791,4 +799,46 @@ public class TestDrillbitResilience extends DrillTest { createPauseInjection(SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1); assertCancelled(control, TEST_QUERY, new ListenerThatCancelsQueryAfterFirstBatchOfData()); } + + @Test // DRILL-3065 + public void testInterruptingAfterMSorterSorting() { + final String query = "select n_name from cp.`tpch/nation.parquet` order by n_name"; + Class typeOfException = RuntimeException.class; + + final long before = countAllocatedMemory(); + final String controls = createSingleException(ExternalSortBatch.class, ExternalSortBatch.INTERRUPTION_AFTER_SORT, typeOfException); + assertFailsWithException(controls, typeOfException, ExternalSortBatch.INTERRUPTION_AFTER_SORT, query); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); + } + + @Test // DRILL-3085 + public void testInterruptingAfterMSorterSetup() { + final String query = "select n_name from cp.`tpch/nation.parquet` order by n_name"; + Class typeOfException = RuntimeException.class; + + final long before = countAllocatedMemory(); + final String controls = createSingleException(ExternalSortBatch.class, ExternalSortBatch.INTERRUPTION_AFTER_SETUP, typeOfException); + assertFailsWithException(controls, typeOfException, ExternalSortBatch.INTERRUPTION_AFTER_SETUP, query); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); + } + + private long countAllocatedMemory() { + // wait to make sure all fragments finished cleaning up + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + // just ignore + } + + long allocated = 0; + for (String name : drillbits.keySet()) { + allocated += drillbits.get(name).getContext().getAllocator().getAllocatedMemory(); + } + + return allocated; + } }