incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/4] git commit: Adding threading to the facets.
Date Thu, 02 Jan 2014 15:45:41 GMT
Adding threading to the facets.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/abe004bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/abe004bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/abe004bb

Branch: refs/heads/apache-blur-0.2
Commit: abe004bbf9ad67b8085a7860c6d1daa4f0b34f92
Parents: f8ba394
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Jan 2 09:45:03 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Jan 2 09:45:03 2014 -0500

----------------------------------------------------------------------
 .../org/apache/blur/manager/IndexManager.java     | 15 +++++++++++++--
 .../manager/writer/BlurIndexSimpleWriter.java     | 15 +++++++++++++--
 .../java/org/apache/blur/server/TableContext.java |  4 ++--
 .../apache/blur/thrift/ThriftBlurShardServer.java |  4 +++-
 .../org/apache/blur/manager/IndexManagerTest.java |  2 +-
 .../org/apache/blur/thrift/BlurClusterTest.java   | 18 +++++++++---------
 .../apache/blur/lucene/search/FacetExecutor.java  |  8 ++++++--
 .../java/org/apache/blur/utils/BlurConstants.java |  1 +
 .../src/main/resources/blur-default.properties    |  3 +++
 9 files changed, 51 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/abe004bb/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index e9ed231..c258769 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -142,6 +142,7 @@ public class IndexManager {
   private final ClusterStatus _clusterStatus;
   private final ExecutorService _executor;
   private final ExecutorService _mutateExecutor;
+  private final ExecutorService _facetExecutor;
 
   private final QueryStatusManager _statusManager = new QueryStatusManager();
   private final AtomicBoolean _closed = new AtomicBoolean(false);
@@ -159,7 +160,8 @@ public class IndexManager {
   public static AtomicBoolean DEBUG_RUN_SLOW = new AtomicBoolean(false);
 
   public IndexManager(IndexServer indexServer, ClusterStatus clusterStatus, BlurFilterCache
filterCache,
-      int maxHeapPerRowFetch, int fetchCount, int threadCount, int mutateThreadCount, long
statusCleanupTimerDelay) {
+      int maxHeapPerRowFetch, int fetchCount, int threadCount, int mutateThreadCount, long
statusCleanupTimerDelay,
+      int facetThreadCount) {
     _indexServer = indexServer;
     _clusterStatus = clusterStatus;
     _filterCache = filterCache;
@@ -193,6 +195,12 @@ public class IndexManager {
 
     _executor = Executors.newThreadPool("index-manager", _threadCount);
     _mutateExecutor = Executors.newThreadPool("index-manager-mutate", _mutateThreadCount);
+    if (facetThreadCount < 1) {
+      _facetExecutor = null;
+    } else {
+      _facetExecutor = Executors.newThreadPool("facet-execution", facetThreadCount);
+    }
+
     _statusManager.setStatusCleanupTimerDelay(statusCleanupTimerDelay);
     _statusManager.init();
     LOG.info("Init Complete");
@@ -205,6 +213,9 @@ public class IndexManager {
       _statusManager.close();
       _executor.shutdownNow();
       _mutateExecutor.shutdownNow();
+      if (_facetExecutor != null) {
+        _facetExecutor.shutdownNow();
+      }
       try {
         _indexServer.close();
       } catch (IOException e) {
@@ -497,7 +508,7 @@ public class IndexManager {
       }).merge(merger);
 
       if (executor != null) {
-        executor.processFacets(null);
+        executor.processFacets(_facetExecutor);
       }
       return merge;
     } catch (StopExecutionCollectorException e) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/abe004bb/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index e6cbba9..9e18c35 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -20,6 +20,7 @@ import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_COMMITS;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_REFRESHS;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -113,7 +114,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
 
     _writerOpener = getWriterOpener(shardContext);
     _writerOpener.start();
-    
+
     _refresherThread = getRefresherThread();
     _refresherThread.start();
 
@@ -276,7 +277,17 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   @Override
   public void close() throws IOException {
     _isClosed.set(true);
-    IOUtils.cleanup(LOG, _indexImporter, _writer.get(), _indexReader.get());
+    IOUtils.cleanup(LOG, closeable(_commitThread), closeable(_refresherThread), _indexImporter,
_writer.get(),
+        _indexReader.get());
+  }
+
+  private Closeable closeable(final Thread thread) {
+    return new Closeable() {
+      @Override
+      public void close() throws IOException {
+        thread.interrupt();
+      }
+    };
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/abe004bb/blur-core/src/main/java/org/apache/blur/server/TableContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/TableContext.java b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
index a3647f5..aa363e3 100644
--- a/blur-core/src/main/java/org/apache/blur/server/TableContext.java
+++ b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
@@ -50,7 +50,7 @@ import org.apache.blur.manager.indexserver.BlurIndexWarmup;
 import org.apache.blur.manager.writer.BlurIndex;
 import org.apache.blur.manager.writer.BlurIndexCloser;
 import org.apache.blur.manager.writer.BlurIndexRefresher;
-import org.apache.blur.manager.writer.BlurIndexSimpleWriter;
+//import org.apache.blur.manager.writer.BlurIndexSimpleWriter;
 import org.apache.blur.manager.writer.BlurNRTIndex;
 import org.apache.blur.manager.writer.SharedMergeScheduler;
 import org.apache.blur.thrift.generated.ScoreType;
@@ -331,7 +331,7 @@ public class TableContext {
       DirectoryReferenceFileGC gc, ExecutorService searchExecutor, BlurIndexCloser indexCloser,
       BlurIndexRefresher refresher, BlurIndexWarmup indexWarmup) throws IOException {
 
-    String className = _blurConfiguration.get(BLUR_SHARD_BLURINDEX_CLASS, BlurIndexSimpleWriter.class.getName());
+    String className = _blurConfiguration.get(BLUR_SHARD_BLURINDEX_CLASS, BlurNRTIndex.class.getName());
 
     Class<? extends BlurIndex> clazz;
     try {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/abe004bb/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index f9fef0b..d352dc1 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -21,6 +21,7 @@ import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER_NAME;
 import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_GUI_SHARD_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_INDEXMANAGER_FACET_THREAD_COUNT;
 import static org.apache.blur.utils.BlurConstants.BLUR_INDEXMANAGER_MUTATE_THREAD_COUNT;
 import static org.apache.blur.utils.BlurConstants.BLUR_INDEXMANAGER_SEARCH_THREAD_COUNT;
 import static org.apache.blur.utils.BlurConstants.BLUR_MAX_CLAUSE_COUNT;
@@ -214,10 +215,11 @@ public class ThriftBlurShardServer extends ThriftServer {
     int fetchCount = configuration.getInt(BLUR_SHARD_FETCHCOUNT, 100);
     int indexManagerThreadCount = configuration.getInt(BLUR_INDEXMANAGER_SEARCH_THREAD_COUNT,
32);
     int mutateThreadCount = configuration.getInt(BLUR_INDEXMANAGER_MUTATE_THREAD_COUNT, 32);
+    int facetThreadCount = configuration.getInt(BLUR_INDEXMANAGER_FACET_THREAD_COUNT, 16);
     long statusCleanupTimerDelay = TimeUnit.SECONDS.toMillis(10);
 
     final IndexManager indexManager = new IndexManager(indexServer, clusterStatus, filterCache,
maxHeapPerRowFetch,
-        fetchCount, indexManagerThreadCount, mutateThreadCount, statusCleanupTimerDelay);
+        fetchCount, indexManagerThreadCount, mutateThreadCount, statusCleanupTimerDelay,
facetThreadCount);
 
     final BlurShardServer shardServer = new BlurShardServer();
     shardServer.setIndexServer(indexServer);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/abe004bb/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
index 9da2db1..beebdf6 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
@@ -117,7 +117,7 @@ public class IndexManagerTest {
     BlurFilterCache filterCache = new DefaultBlurFilterCache(new BlurConfiguration());
     long statusCleanupTimerDelay = 1000;
     indexManager = new IndexManager(server, getClusterStatus(tableDescriptor), filterCache,
10000000, 100, 1, 1,
-        statusCleanupTimerDelay);
+        statusCleanupTimerDelay, 0);
     setupData();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/abe004bb/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
index 17e0828..38f6457 100644
--- a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
@@ -78,6 +78,8 @@ public class BlurClusterTest {
 
   private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "./target/tmp_BlurClusterTest"));
   private static MiniCluster miniCluster;
+  
+  private int numberOfDocs = 1000;
 
   @BeforeClass
   public static void startCluster() throws IOException {
@@ -182,11 +184,10 @@ public class BlurClusterTest {
 
   public void testLoadTable() throws BlurException, TException, InterruptedException {
     Iface client = getClient();
-    int length = 250;
     int maxFacetValue = 100;
     List<RowMutation> mutations = new ArrayList<RowMutation>();
     Random random = new Random(1);
-    for (int i = 0; i < length; i++) {
+    for (int i = 0; i < numberOfDocs; i++) {
       String rowId = UUID.randomUUID().toString();
       RecordMutation mutation = BlurThriftHelper.newRecordMutation("test", rowId,
           BlurThriftHelper.newColumn("test", "value"),
@@ -207,7 +208,7 @@ public class BlurClusterTest {
     blurQueryRow.setCacheResult(false);
     BlurResults resultsRow = client.query("test", blurQueryRow);
     assertRowResults(resultsRow);
-    assertEquals(length, resultsRow.getTotalResults());
+    assertEquals(numberOfDocs, resultsRow.getTotalResults());
 
     BlurQuery blurQueryRecord = new BlurQuery();
     Query queryRecord = new Query();
@@ -216,7 +217,7 @@ public class BlurClusterTest {
     blurQueryRecord.setQuery(queryRecord);
     BlurResults resultsRecord = client.query("test", blurQueryRecord);
     assertRecordResults(resultsRecord);
-    assertEquals(length, resultsRecord.getTotalResults());
+    assertEquals(numberOfDocs, resultsRecord.getTotalResults());
 
     Schema schema = client.schema("test");
     assertFalse(schema.getFamilies().isEmpty());
@@ -234,7 +235,7 @@ public class BlurClusterTest {
 
     BlurResults resultsRow = client.query("test", blurQueryRow);
     // assertRowResults(resultsRow);
-    assertEquals(250, resultsRow.getTotalResults());
+    assertEquals(numberOfDocs, resultsRow.getTotalResults());
 
     for (BlurResult blurResult : resultsRow.getResults()) {
       System.out.println(blurResult);
@@ -258,7 +259,7 @@ public class BlurClusterTest {
 
     BlurResults resultsRow = client.query("test", blurQueryRow);
     // assertRowResults(resultsRow);
-    assertEquals(250, resultsRow.getTotalResults());
+    assertEquals(numberOfDocs, resultsRow.getTotalResults());
 
     System.out.println(resultsRow.getFacetCounts());
 
@@ -450,14 +451,13 @@ public class BlurClusterTest {
     System.out.println("===========================");
 
     Iface client = getClient();
-    int length = 250;
     BlurQuery blurQuery = new BlurQuery();
     blurQuery.setUseCacheIfPresent(false);
     Query query = new Query();
     query.setQuery("test.test:value");
     blurQuery.setQuery(query);
     BlurResults results1 = client.query("test", blurQuery);
-    assertEquals(length, results1.getTotalResults());
+    assertEquals(numberOfDocs, results1.getTotalResults());
     assertRowResults(results1);
 
     miniCluster.killShardServer(1);
@@ -468,7 +468,7 @@ public class BlurClusterTest {
     // This should block until shards have failed over
     client.shardServerLayout("test");
 
-    assertEquals(length, client.query("test", blurQuery).getTotalResults());
+    assertEquals(numberOfDocs, client.query("test", blurQuery).getTotalResults());
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/abe004bb/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java
----------------------------------------------------------------------
diff --git a/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java b/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java
index 9544b3a..e3a24bf 100644
--- a/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java
+++ b/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongArray;
 
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
 import org.apache.lucene.index.AtomicReader;
 import org.apache.lucene.index.AtomicReaderContext;
 import org.apache.lucene.search.Collector;
@@ -36,6 +38,8 @@ import org.apache.lucene.util.OpenBitSet;
 
 public class FacetExecutor {
 
+  private final Log LOG = LogFactory.getLog(FacetExecutor.class);
+
   static Comparator<Entry<Object, Info>> COMPARATOR = new Comparator<Entry<Object,
Info>>() {
     @Override
     public int compare(Entry<Object, Info> o1, Entry<Object, Info> o2) {
@@ -155,8 +159,8 @@ public class FacetExecutor {
       _infoMap.put(key, info);
     } else {
       AtomicReader reader = context.reader();
-      throw new IOException("Info about reader context [" + context + "] alread created,
existing Info [" + info
-          + "] current reader [" + reader + "].");
+      LOG.info("Info about reader context [{0}] alread created, existing Info [{1}] current
reader [{2}].", context,
+          info, reader);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/abe004bb/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
index 6c1ea4b..f1fc727 100644
--- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -72,6 +72,7 @@ public class BlurConstants {
   public static final String BLUR_SHARD_INDEX_WARMUP_CLASS = "blur.shard.index.warmup.class";
   public static final String BLUR_INDEXMANAGER_SEARCH_THREAD_COUNT = "blur.indexmanager.search.thread.count";
   public static final String BLUR_INDEXMANAGER_MUTATE_THREAD_COUNT = "blur.indexmanager.mutate.thread.count";
+  public static final String BLUR_INDEXMANAGER_FACET_THREAD_COUNT = "blur.indexmanager.facet.thread.count";
   public static final String BLUR_SHARD_DATA_FETCH_THREAD_COUNT = "blur.shard.data.fetch.thread.count";
   public static final String BLUR_SHARD_INTERNAL_SEARCH_THREAD_COUNT = "blur.shard.internal.search.thread.count";
   public static final String BLUR_SHARD_WARMUP_THREAD_COUNT = "blur.shard.warmup.thread.count";

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/abe004bb/blur-util/src/main/resources/blur-default.properties
----------------------------------------------------------------------
diff --git a/blur-util/src/main/resources/blur-default.properties b/blur-util/src/main/resources/blur-default.properties
index 41d0283..6cd6904 100644
--- a/blur-util/src/main/resources/blur-default.properties
+++ b/blur-util/src/main/resources/blur-default.properties
@@ -157,6 +157,9 @@ blur.indexmanager.search.thread.count=8
 # The number of thread used for parallel mutating in the index manager
 blur.indexmanager.mutate.thread.count=8
 
+# The number of thread used for parallel faceting in the index manager
+blur.indexmanager.facet.thread.count=8
+
 # The number of threads used for parallel searching in the index searchers
 blur.shard.internal.search.thread.count=8
 


Mime
View raw message