phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sama...@apache.org
Subject phoenix git commit: PHOENIX-1894 Iterators in BaseResultIterators#submitWork should be added to a thread safe collection.
Date Mon, 20 Apr 2015 23:32:36 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.3 06e0d6102 -> c9daef5dd


PHOENIX-1894 Iterators in BaseResultIterators#submitWork should be added to a thread safe
collection.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c9daef5d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c9daef5d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c9daef5d

Branch: refs/heads/4.3
Commit: c9daef5dd14c7008cfe82957488831387e77cafc
Parents: 06e0d61
Author: Samarth <samarth.jain@salesforce.com>
Authored: Mon Apr 20 16:32:25 2015 -0700
Committer: Samarth <samarth.jain@salesforce.com>
Committed: Mon Apr 20 16:32:25 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/phoenix/iterate/BaseResultIterators.java   | 6 ++++--
 .../java/org/apache/phoenix/iterate/ParallelIterators.java     | 3 ++-
 .../main/java/org/apache/phoenix/iterate/SerialIterators.java  | 3 ++-
 3 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/c9daef5d/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 9ac6a29..9d5fa33 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -29,10 +29,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableSet;
+import java.util.Queue;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -513,7 +515,7 @@ public abstract class BaseResultIterators extends ExplainTable implements
Result
         // Capture all iterators so that if something goes wrong, we close them all
         // The iterators list is based on the submission of work, so it may not
         // contain them all (for example if work was rejected from the queue)
-        List<PeekingResultIterator> allIterators = Lists.newArrayListWithExpectedSize(this.splits.size());
+        Queue<PeekingResultIterator> allIterators = new ConcurrentLinkedQueue<>();
         List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numScans);
         final List<List<Pair<Scan,Future<PeekingResultIterator>>>>
futures = Lists.newArrayListWithExpectedSize(numScans);
         allFutures.add(futures);
@@ -680,7 +682,7 @@ public abstract class BaseResultIterators extends ExplainTable implements
Result
 
     abstract protected String getName();    
     abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>>
nestedFutures,
-            List<PeekingResultIterator> allIterators, int estFlattenedSize);
+            Queue<PeekingResultIterator> allIterators, int estFlattenedSize);
     
     @Override
     public int size() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c9daef5d/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index b74919b..97270ef 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -22,6 +22,7 @@ import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.PARALLEL_S
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
@@ -57,7 +58,7 @@ public class ParallelIterators extends BaseResultIterators {
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>>
nestedFutures,
-            final List<PeekingResultIterator> allIterators, int estFlattenedSize) {
+            final Queue<PeekingResultIterator> allIterators, int estFlattenedSize)
{
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c9daef5d/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index ded9344..6b3b5e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.iterate;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
@@ -60,7 +61,7 @@ public class SerialIterators extends BaseResultIterators {
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>>
nestedFutures,
-            final List<PeekingResultIterator> allIterators, int estFlattenedSize) {
+            final Queue<PeekingResultIterator> allIterators, int estFlattenedSize)
{
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor


Mime
View raw message