phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [8/9] phoenix git commit: PHOENIX-1448 Fix resource leak when work rejected by thread executor
Date Thu, 13 Nov 2014 07:07:05 GMT
PHOENIX-1448 Fix resource leak when work rejected by thread executor


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8875c86d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8875c86d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8875c86d

Branch: refs/heads/3.2
Commit: 8875c86d6ed9b07a30f08bc656c00f5654a4469a
Parents: cfe5454
Author: James Taylor <jtaylor@salesforce.com>
Authored: Wed Nov 12 18:59:25 2014 -0800
Committer: James Taylor <jtaylor@salesforce.com>
Committed: Wed Nov 12 23:05:00 2014 -0800

----------------------------------------------------------------------
 .../phoenix/iterate/BaseResultIterators.java    | 22 +++++++++++++-------
 .../phoenix/iterate/ParallelIterators.java      |  6 ++++--
 .../apache/phoenix/iterate/SerialIterators.java |  6 ++++--
 3 files changed, 22 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8875c86d/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 7f635c3..293063e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -481,13 +481,17 @@ public abstract class BaseResultIterators extends ExplainTable implements
Result
         boolean isReverse = ScanUtil.isReversed(scan);
         final ConnectionQueryServices services = context.getConnection().getQueryServices();
         ReadOnlyProps props = services.getProps();
-        int numSplits = size();
-        List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits);
-        final List<List<Pair<Scan,Future<PeekingResultIterator>>>>
futures = Lists.newArrayListWithExpectedSize(numSplits);
+        int numScans = size();
+        // Capture all iterators so that if something goes wrong, we close them all
+        // The iterators list is based on the submission of work, so it may not
+        // contain them all (for example if work was rejected from the queue)
+        List<PeekingResultIterator> allIterators = Lists.newArrayListWithExpectedSize(this.splits.size());
+        List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numScans);
+        final List<List<Pair<Scan,Future<PeekingResultIterator>>>>
futures = Lists.newArrayListWithExpectedSize(numScans);
         allFutures.add(futures);
         SQLException toThrow = null;
         try {
-            submitWork(scans, futures, splits.size());
+            submitWork(scans, futures, allIterators, splits.size());
             int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
             boolean clearedCache = false;
             for (List<Pair<Scan,Future<PeekingResultIterator>>> future
: reverseIfNecessary(futures,isReverse)) {
@@ -514,7 +518,7 @@ public abstract class BaseResultIterators extends ExplainTable implements
Result
                             // as we need these to be in order
                             addIterator(iterators, concatIterators);
                             concatIterators = Collections.emptyList();
-                            submitWork(newNestedScans, newFutures, newNestedScans.size());
+                            submitWork(newNestedScans, newFutures, allIterators, newNestedScans.size());
                             allFutures.add(newFutures);
                             for (List<Pair<Scan,Future<PeekingResultIterator>>>
newFuture : reverseIfNecessary(newFutures, isReverse)) {
                                 for (Pair<Scan,Future<PeekingResultIterator>>
newScanPair : reverseIfNecessary(newFuture, isReverse)) {
@@ -550,7 +554,7 @@ public abstract class BaseResultIterators extends ExplainTable implements
Result
                         }
                     } finally {
                         try {
-                            SQLCloseables.closeAll(iterators);
+                            SQLCloseables.closeAll(allIterators);
                         } catch (Exception e) {
                             if (toThrow == null) {
                                 toThrow = ServerUtil.parseServerException(e);
@@ -578,7 +582,9 @@ public abstract class BaseResultIterators extends ExplainTable implements
Result
         for (List<List<Pair<Scan,Future<PeekingResultIterator>>>>
futures : allFutures) {
             for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans
: futures) {
                 for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans)
{
-                    if (futurePair != null) { // FIXME: null check should not be necessary
+                    // When work is rejected, we may have null futurePair entries, because
+                    // we randomize these and set them as they're submitted.
+                    if (futurePair != null) {
                         Future<PeekingResultIterator> future = futurePair.getSecond();
                         if (future != null) {
                             cancelledWork |= future.cancel(false);
@@ -622,7 +628,7 @@ public abstract class BaseResultIterators extends ExplainTable implements
Result
 
     abstract protected String getName();    
     abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>>
nestedFutures,
-            int estFlattenedSize);
+            List<PeekingResultIterator> allIterators, int estFlattenedSize);
 
     @Override
     public int size() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8875c86d/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index f7661fc..acf401c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -54,7 +54,7 @@ public class ParallelIterators extends BaseResultIterators {
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>>
nestedFutures,
-            int estFlattenedSize) {
+            final List<PeekingResultIterator> allIterators, int estFlattenedSize) {
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor
@@ -86,7 +86,9 @@ public class ParallelIterators extends BaseResultIterators {
                     if (logger.isDebugEnabled()) {
                         logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis()
- startTime) + "ms, Scan: " + scan);
                     }
-                    return iteratorFactory.newIterator(context, scanner, scan);
+                    PeekingResultIterator iterator = iteratorFactory.newIterator(context,
scanner, scan);
+                    allIterators.add(iterator);
+                    return iterator;
                 }
 
                 /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8875c86d/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 44a1324..5ddf615 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -59,7 +59,7 @@ public class SerialIterators extends BaseResultIterators {
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>>
nestedFutures,
-            int estFlattenedSize) {
+            final List<PeekingResultIterator> allIterators, int estFlattenedSize) {
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor
@@ -86,7 +86,9 @@ public class SerialIterators extends BaseResultIterators {
 	                    concatIterators.add(iteratorFactory.newIterator(context, scanner, scan));
                 	}
                 	PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators);
-                	return new LimitingPeekingResultIterator(concatIterator, limit);
+                	PeekingResultIterator iterator = new LimitingPeekingResultIterator(concatIterator,
limit);
+                    allIterators.add(iterator);
+                    return iterator;
                 }
 
                 /**


Mime
View raw message