incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cr...@apache.org
Subject [05/50] [abbrv] git commit: Changing the way facets are scheduled so that a single query can not consume the entire thread pool when there are other queries waiting for the faceting pool. Also added the ability to cancel a query with facets running.
Date Sun, 18 May 2014 21:41:40 GMT
Changing the way facets are scheduled so that a single query can not consume the entire thread
pool when there are other queries waiting for the faceting pool.  Also added the ability to
cancel a query with facets running.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/9cc7e80f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/9cc7e80f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/9cc7e80f

Branch: refs/heads/console-v2
Commit: 9cc7e80f53346e5cf2f965a885fabdb7a0ef2179
Parents: a0d4412
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sun Mar 23 18:26:04 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun Mar 23 18:26:04 2014 -0400

----------------------------------------------------------------------
 .../org/apache/blur/manager/IndexManager.java   |  9 ++--
 .../org/apache/blur/thrift/BlurClusterTest.java |  7 ---
 .../blur/lucene/search/FacetExecutor.java       | 52 ++++++++++++--------
 .../org/apache/blur/concurrent/Executors.java   | 16 +++++-
 4 files changed, 50 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9cc7e80f/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index 5e47a10..4f5c456 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -39,6 +39,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -199,7 +200,7 @@ public class IndexManager {
     if (facetThreadCount < 1) {
       _facetExecutor = null;
     } else {
-      _facetExecutor = Executors.newThreadPool("facet-execution", facetThreadCount);
+      _facetExecutor = Executors.newThreadPool(new SynchronousQueue<Runnable>(), "facet-execution",
facetThreadCount);
     }
 
     _statusManager.setStatusCleanupTimerDelay(statusCleanupTimerDelay);
@@ -504,7 +505,7 @@ public class IndexManager {
       FacetExecutor executor = null;
       if (blurQuery.facets != null) {
         long[] facetMinimums = getFacetMinimums(blurQuery.facets);
-        executor = new FacetExecutor(blurQuery.facets.size(), facetMinimums, facetedCounts);
+        executor = new FacetExecutor(blurQuery.facets.size(), facetMinimums, facetedCounts,
running);
         facetedQuery = new FacetQuery(userQuery, getFacetQueries(blurQuery, fieldManager,
context, rowFilterForSearch,
             recordFilterForSearch), executor);
       } else {
@@ -1049,7 +1050,7 @@ public class IndexManager {
       doMutates(entry.getKey(), entry.getValue());
     }
   }
-  
+
   public void enqueue(List<RowMutation> mutations) throws BlurException, IOException
{
     mutations = MutatableAction.reduceMutates(mutations);
     Map<String, List<RowMutation>> map = getMutatesPerTable(mutations);
@@ -1057,7 +1058,7 @@ public class IndexManager {
       doEnqueue(entry.getKey(), entry.getValue());
     }
   }
-  
+
   private void doEnqueue(final String table, List<RowMutation> mutations) throws IOException,
BlurException {
     final Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
     Map<String, List<RowMutation>> mutationsByShard = new HashMap<String,
List<RowMutation>>();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9cc7e80f/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
index 41672e2..11fd830 100644
--- a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
@@ -344,14 +344,7 @@ public class BlurClusterTest {
     }
 
     BlurResults resultsRow = client.query(tableName, blurQueryRow);
-    // assertRowResults(resultsRow);
     assertEquals(numberOfDocs, resultsRow.getTotalResults());
-
-    List<Long> facetCounts = resultsRow.getFacetCounts();
-    for (Long l : facetCounts) {
-      assertTrue(l >= 50);
-    }
-
     System.out.println(resultsRow.getFacetCounts());
 
     System.out.println();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9cc7e80f/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java
----------------------------------------------------------------------
diff --git a/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java b/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java
index 913a318..41c6400 100644
--- a/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java
+++ b/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongArray;
 import java.util.concurrent.locks.Lock;
@@ -148,12 +149,12 @@ public class FacetExecutor {
       _locks = locks;
     }
 
-    void process(AtomicLongArray counts, long[] minimumsBeforeReturning) throws IOException
{
+    void process(AtomicLongArray counts, long[] minimumsBeforeReturning, AtomicBoolean running)
throws IOException {
       if (minimumsBeforeReturning == null) {
         Tracer trace = Trace.trace("processing facet - segment", Trace.param("reader", _readerStr),
             Trace.param("maxDoc", _maxDoc), Trace.param("minimums", "NONE"), Trace.param("scorers",
_scorers.length));
         try {
-          for (int i = 0; i < _scorers.length; i++) {
+          for (int i = 0; i < _scorers.length && running.get(); i++) {
             SimpleCollector col = new SimpleCollector(_bitSet);
             runFacet(counts, col, i);
           }
@@ -164,7 +165,7 @@ public class FacetExecutor {
         BlockingQueue<Integer> ids = new ArrayBlockingQueue<Integer>(_scorers.length
+ 1);
         try {
           populate(ids);
-          while (!ids.isEmpty()) {
+          while (!ids.isEmpty() && running.get()) {
             int id = ids.take();
             Lock lock = _locks[id];
             if (lock.tryLock()) {
@@ -221,6 +222,7 @@ public class FacetExecutor {
   private final AtomicLongArray _counts;
   private final long[] _minimumsBeforeReturning;
   private final Lock[] _locks;
+  private final AtomicBoolean _running;
   private boolean _processed;
 
   public FacetExecutor(int length) {
@@ -228,6 +230,14 @@ public class FacetExecutor {
   }
 
   public FacetExecutor(int length, long[] minimumsBeforeReturning, AtomicLongArray counts)
{
+    this(length, minimumsBeforeReturning, counts, new AtomicBoolean(true));
+  }
+
+  public FacetExecutor(int length, long[] minimumsBeforeReturning) {
+    this(length, minimumsBeforeReturning, new AtomicLongArray(length));
+  }
+
+  public FacetExecutor(int length, long[] minimumsBeforeReturning, AtomicLongArray counts,
AtomicBoolean running) {
     _length = length;
     _counts = counts;
     _minimumsBeforeReturning = minimumsBeforeReturning;
@@ -235,10 +245,7 @@ public class FacetExecutor {
     for (int i = 0; i < _length; i++) {
       _locks[i] = new ReentrantReadWriteLock().writeLock();
     }
-  }
-
-  public FacetExecutor(int length, long[] minimumsBeforeReturning) {
-    this(length, minimumsBeforeReturning, new AtomicLongArray(length));
+    _running = running;
   }
 
   public void addScorers(AtomicReaderContext context, Scorer[] scorers) throws IOException
{
@@ -302,24 +309,28 @@ public class FacetExecutor {
     Collections.sort(entries, COMPARATOR);
     if (executor == null) {
       for (Entry<Object, Info> e : entries) {
-        e.getValue().process(_counts, _minimumsBeforeReturning);
+        if (_running.get()) {
+          e.getValue().process(_counts, _minimumsBeforeReturning, _running);
+        }
       }
     } else {
       final AtomicInteger finished = new AtomicInteger();
       for (Entry<Object, Info> e : entries) {
-        final Entry<Object, Info> entry = e;
-        executor.submit(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              entry.getValue().process(_counts, _minimumsBeforeReturning);
-            } catch (IOException e) {
-              throw new RuntimeException(e);
-            } finally {
-              finished.incrementAndGet();
+        if (_running.get()) {
+          final Entry<Object, Info> entry = e;
+          executor.submit(new Runnable() {
+            @Override
+            public void run() {
+              try {
+                entry.getValue().process(_counts, _minimumsBeforeReturning, _running);
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              } finally {
+                finished.incrementAndGet();
+              }
             }
-          }
-        });
+          });
+        }
       }
 
       while (finished.get() < entries.size()) {
@@ -333,5 +344,4 @@ public class FacetExecutor {
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9cc7e80f/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java b/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
index 62185ec..05a1153 100644
--- a/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
+++ b/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
@@ -16,6 +16,7 @@ package org.apache.blur.concurrent;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -33,7 +34,17 @@ public class Executors {
   }
 
   public static ExecutorService newThreadPool(String prefix, int threadCount, boolean watch)
{
-    BlurThreadPoolExecutor executorService = new BlurThreadPoolExecutor(threadCount, threadCount,
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new BlurThreadFactory(prefix));
+    return newThreadPool(new LinkedBlockingQueue<Runnable>(), prefix, threadCount,
watch);
+  }
+
+  public static ExecutorService newThreadPool(BlockingQueue<Runnable> workQueue, String
prefix, int threadCount) {
+    return newThreadPool(workQueue, prefix, threadCount, true);
+  }
+
+  public static ExecutorService newThreadPool(BlockingQueue<Runnable> workQueue, String
prefix, int threadCount,
+      boolean watch) {
+    BlurThreadPoolExecutor executorService = new BlurThreadPoolExecutor(threadCount, threadCount,
60L,
+        TimeUnit.SECONDS, workQueue, new BlurThreadFactory(prefix));
     executorService.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
     executorService.add(new UserThreadBoundaryProcessor());
     if (watch) {
@@ -43,7 +54,8 @@ public class Executors {
   }
 
   public static ExecutorService newSingleThreadExecutor(String prefix) {
-    return Trace.getExecutorService(java.util.concurrent.Executors.newSingleThreadExecutor(new
BlurThreadFactory(prefix)));
+    return Trace.getExecutorService(java.util.concurrent.Executors
+        .newSingleThreadExecutor(new BlurThreadFactory(prefix)));
   }
 
   public static class BlurThreadFactory implements ThreadFactory {


Mime
View raw message