Return-Path: X-Original-To: apmail-phoenix-commits-archive@minotaur.apache.org Delivered-To: apmail-phoenix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 89A26F509 for ; Thu, 13 Nov 2014 07:19:35 +0000 (UTC) Received: (qmail 41480 invoked by uid 500); 13 Nov 2014 07:07:26 -0000 Delivered-To: apmail-phoenix-commits-archive@phoenix.apache.org Received: (qmail 41451 invoked by uid 500); 13 Nov 2014 07:07:26 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 41437 invoked by uid 99); 13 Nov 2014 07:07:26 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Nov 2014 07:07:26 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 985CAA135D1; Thu, 13 Nov 2014 07:06:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jamestaylor@apache.org To: commits@phoenix.apache.org Date: Thu, 13 Nov 2014 07:07:05 -0000 Message-Id: In-Reply-To: <26d632035d5e4ff9ae87eb64cea66a3b@git.apache.org> References: <26d632035d5e4ff9ae87eb64cea66a3b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [8/9] phoenix git commit: PHOENIX-1448 Fix resource leak when work rejected by thread executor PHOENIX-1448 Fix resource leak when work rejected by thread executor Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8875c86d Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8875c86d Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8875c86d Branch: refs/heads/3.2 Commit: 8875c86d6ed9b07a30f08bc656c00f5654a4469a Parents: cfe5454 Author: James Taylor Authored: Wed Nov 12 18:59:25 2014 -0800 Committer: James Taylor Committed: Wed Nov 12 23:05:00 2014 -0800 ---------------------------------------------------------------------- .../phoenix/iterate/BaseResultIterators.java | 22 +++++++++++++------- .../phoenix/iterate/ParallelIterators.java | 6 ++++-- .../apache/phoenix/iterate/SerialIterators.java | 6 ++++-- 3 files changed, 22 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8875c86d/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 7f635c3..293063e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -481,13 +481,17 @@ public abstract class BaseResultIterators extends ExplainTable implements Result boolean isReverse = ScanUtil.isReversed(scan); final ConnectionQueryServices services = context.getConnection().getQueryServices(); ReadOnlyProps props = services.getProps(); - int numSplits = size(); - List iterators = new ArrayList(numSplits); - final List>>> futures = Lists.newArrayListWithExpectedSize(numSplits); + int numScans = size(); + // Capture all iterators so that if something goes wrong, we close them all + // The iterators list is based on the submission of work, so it may not + // contain them all (for example if work was rejected from the queue) + List allIterators = Lists.newArrayListWithExpectedSize(this.splits.size()); + List iterators = new ArrayList(numScans); + final List>>> futures = Lists.newArrayListWithExpectedSize(numScans); allFutures.add(futures); SQLException toThrow = null; try { - submitWork(scans, futures, splits.size()); + submitWork(scans, futures, allIterators, splits.size()); int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS); boolean clearedCache = false; for (List>> future : reverseIfNecessary(futures,isReverse)) { @@ -514,7 +518,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // as we need these to be in order addIterator(iterators, concatIterators); concatIterators = Collections.emptyList(); - submitWork(newNestedScans, newFutures, newNestedScans.size()); + submitWork(newNestedScans, newFutures, allIterators, newNestedScans.size()); allFutures.add(newFutures); for (List>> newFuture : reverseIfNecessary(newFutures, isReverse)) { for (Pair> newScanPair : reverseIfNecessary(newFuture, isReverse)) { @@ -550,7 +554,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } } finally { try { - SQLCloseables.closeAll(iterators); + SQLCloseables.closeAll(allIterators); } catch (Exception e) { if (toThrow == null) { toThrow = ServerUtil.parseServerException(e); @@ -578,7 +582,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result for (List>>> futures : allFutures) { for (List>> futureScans : futures) { for (Pair> futurePair : futureScans) { - if (futurePair != null) { // FIXME: null check should not be necessary + // When work is rejected, we may have null futurePair entries, because + // we randomize these and set them as they're submitted. + if (futurePair != null) { Future future = futurePair.getSecond(); if (future != null) { cancelledWork |= future.cancel(false); @@ -622,7 +628,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result abstract protected String getName(); abstract protected void submitWork(List> nestedScans, List>>> nestedFutures, - int estFlattenedSize); + List allIterators, int estFlattenedSize); @Override public int size() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/8875c86d/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index f7661fc..acf401c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -54,7 +54,7 @@ public class ParallelIterators extends BaseResultIterators { @Override protected void submitWork(List> nestedScans, List>>> nestedFutures, - int estFlattenedSize) { + final List allIterators, int estFlattenedSize) { // Pre-populate nestedFutures lists so that we can shuffle the scans // and add the future to the right nested list. By shuffling the scans // we get better utilization of the cluster since our thread executor @@ -86,7 +86,9 @@ public class ParallelIterators extends BaseResultIterators { if (logger.isDebugEnabled()) { logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan); } - return iteratorFactory.newIterator(context, scanner, scan); + PeekingResultIterator iterator = iteratorFactory.newIterator(context, scanner, scan); + allIterators.add(iterator); + return iterator; } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/8875c86d/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index 44a1324..5ddf615 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -59,7 +59,7 @@ public class SerialIterators extends BaseResultIterators { @Override protected void submitWork(List> nestedScans, List>>> nestedFutures, - int estFlattenedSize) { + final List allIterators, int estFlattenedSize) { // Pre-populate nestedFutures lists so that we can shuffle the scans // and add the future to the right nested list. By shuffling the scans // we get better utilization of the cluster since our thread executor @@ -86,7 +86,9 @@ public class SerialIterators extends BaseResultIterators { concatIterators.add(iteratorFactory.newIterator(context, scanner, scan)); } PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators); - return new LimitingPeekingResultIterator(concatIterator, limit); + PeekingResultIterator iterator = new LimitingPeekingResultIterator(concatIterator, limit); + allIterators.add(iterator); + return iterator; } /**