Updated Branches:
refs/heads/master 0400bad87 -> da26c9804
Adding threading to the facets.
Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/eda8a395
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/eda8a395
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/eda8a395
Branch: refs/heads/master
Commit: eda8a3959fb389d99b65587d3143624bd09e7329
Parents: 7c02250
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Jan 2 09:45:03 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Jan 2 10:46:24 2014 -0500
----------------------------------------------------------------------
.../org/apache/blur/manager/IndexManager.java | 15 +++++++++++++--
.../manager/writer/BlurIndexSimpleWriter.java | 15 +++++++++++++--
.../java/org/apache/blur/server/TableContext.java | 4 ++--
.../apache/blur/thrift/ThriftBlurShardServer.java | 4 +++-
.../org/apache/blur/manager/IndexManagerTest.java | 2 +-
.../org/apache/blur/thrift/BlurClusterTest.java | 18 +++++++++---------
.../apache/blur/lucene/search/FacetExecutor.java | 8 ++++++--
.../java/org/apache/blur/utils/BlurConstants.java | 1 +
.../src/main/resources/blur-default.properties | 3 +++
9 files changed, 51 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/eda8a395/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index e9ed231..c258769 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -142,6 +142,7 @@ public class IndexManager {
private final ClusterStatus _clusterStatus;
private final ExecutorService _executor;
private final ExecutorService _mutateExecutor;
+ private final ExecutorService _facetExecutor;
private final QueryStatusManager _statusManager = new QueryStatusManager();
private final AtomicBoolean _closed = new AtomicBoolean(false);
@@ -159,7 +160,8 @@ public class IndexManager {
public static AtomicBoolean DEBUG_RUN_SLOW = new AtomicBoolean(false);
public IndexManager(IndexServer indexServer, ClusterStatus clusterStatus, BlurFilterCache
filterCache,
- int maxHeapPerRowFetch, int fetchCount, int threadCount, int mutateThreadCount, long
statusCleanupTimerDelay) {
+ int maxHeapPerRowFetch, int fetchCount, int threadCount, int mutateThreadCount, long
statusCleanupTimerDelay,
+ int facetThreadCount) {
_indexServer = indexServer;
_clusterStatus = clusterStatus;
_filterCache = filterCache;
@@ -193,6 +195,12 @@ public class IndexManager {
_executor = Executors.newThreadPool("index-manager", _threadCount);
_mutateExecutor = Executors.newThreadPool("index-manager-mutate", _mutateThreadCount);
+ if (facetThreadCount < 1) {
+ _facetExecutor = null;
+ } else {
+ _facetExecutor = Executors.newThreadPool("facet-execution", facetThreadCount);
+ }
+
_statusManager.setStatusCleanupTimerDelay(statusCleanupTimerDelay);
_statusManager.init();
LOG.info("Init Complete");
@@ -205,6 +213,9 @@ public class IndexManager {
_statusManager.close();
_executor.shutdownNow();
_mutateExecutor.shutdownNow();
+ if (_facetExecutor != null) {
+ _facetExecutor.shutdownNow();
+ }
try {
_indexServer.close();
} catch (IOException e) {
@@ -497,7 +508,7 @@ public class IndexManager {
}).merge(merger);
if (executor != null) {
- executor.processFacets(null);
+ executor.processFacets(_facetExecutor);
}
return merge;
} catch (StopExecutionCollectorException e) {
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/eda8a395/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index e6cbba9..9e18c35 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -20,6 +20,7 @@ import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_COMMITS;
import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_REFRESHS;
+import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -113,7 +114,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
_writerOpener = getWriterOpener(shardContext);
_writerOpener.start();
-
+
_refresherThread = getRefresherThread();
_refresherThread.start();
@@ -276,7 +277,17 @@ public class BlurIndexSimpleWriter extends BlurIndex {
@Override
public void close() throws IOException {
_isClosed.set(true);
- IOUtils.cleanup(LOG, _indexImporter, _writer.get(), _indexReader.get());
+ IOUtils.cleanup(LOG, closeable(_commitThread), closeable(_refresherThread), _indexImporter,
_writer.get(),
+ _indexReader.get());
+ }
+
+ private Closeable closeable(final Thread thread) {
+ return new Closeable() {
+ @Override
+ public void close() throws IOException {
+ thread.interrupt();
+ }
+ };
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/eda8a395/blur-core/src/main/java/org/apache/blur/server/TableContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/TableContext.java b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
index a3647f5..aa363e3 100644
--- a/blur-core/src/main/java/org/apache/blur/server/TableContext.java
+++ b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
@@ -50,7 +50,7 @@ import org.apache.blur.manager.indexserver.BlurIndexWarmup;
import org.apache.blur.manager.writer.BlurIndex;
import org.apache.blur.manager.writer.BlurIndexCloser;
import org.apache.blur.manager.writer.BlurIndexRefresher;
-import org.apache.blur.manager.writer.BlurIndexSimpleWriter;
+//import org.apache.blur.manager.writer.BlurIndexSimpleWriter;
import org.apache.blur.manager.writer.BlurNRTIndex;
import org.apache.blur.manager.writer.SharedMergeScheduler;
import org.apache.blur.thrift.generated.ScoreType;
@@ -331,7 +331,7 @@ public class TableContext {
DirectoryReferenceFileGC gc, ExecutorService searchExecutor, BlurIndexCloser indexCloser,
BlurIndexRefresher refresher, BlurIndexWarmup indexWarmup) throws IOException {
- String className = _blurConfiguration.get(BLUR_SHARD_BLURINDEX_CLASS, BlurIndexSimpleWriter.class.getName());
+ String className = _blurConfiguration.get(BLUR_SHARD_BLURINDEX_CLASS, BlurNRTIndex.class.getName());
Class<? extends BlurIndex> clazz;
try {
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/eda8a395/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index f9fef0b..d352dc1 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -21,6 +21,7 @@ import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER_NAME;
import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_PORT;
import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
import static org.apache.blur.utils.BlurConstants.BLUR_GUI_SHARD_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_INDEXMANAGER_FACET_THREAD_COUNT;
import static org.apache.blur.utils.BlurConstants.BLUR_INDEXMANAGER_MUTATE_THREAD_COUNT;
import static org.apache.blur.utils.BlurConstants.BLUR_INDEXMANAGER_SEARCH_THREAD_COUNT;
import static org.apache.blur.utils.BlurConstants.BLUR_MAX_CLAUSE_COUNT;
@@ -214,10 +215,11 @@ public class ThriftBlurShardServer extends ThriftServer {
int fetchCount = configuration.getInt(BLUR_SHARD_FETCHCOUNT, 100);
int indexManagerThreadCount = configuration.getInt(BLUR_INDEXMANAGER_SEARCH_THREAD_COUNT,
32);
int mutateThreadCount = configuration.getInt(BLUR_INDEXMANAGER_MUTATE_THREAD_COUNT, 32);
+ int facetThreadCount = configuration.getInt(BLUR_INDEXMANAGER_FACET_THREAD_COUNT, 16);
long statusCleanupTimerDelay = TimeUnit.SECONDS.toMillis(10);
final IndexManager indexManager = new IndexManager(indexServer, clusterStatus, filterCache,
maxHeapPerRowFetch,
- fetchCount, indexManagerThreadCount, mutateThreadCount, statusCleanupTimerDelay);
+ fetchCount, indexManagerThreadCount, mutateThreadCount, statusCleanupTimerDelay,
facetThreadCount);
final BlurShardServer shardServer = new BlurShardServer();
shardServer.setIndexServer(indexServer);
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/eda8a395/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
index 9da2db1..beebdf6 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
@@ -117,7 +117,7 @@ public class IndexManagerTest {
BlurFilterCache filterCache = new DefaultBlurFilterCache(new BlurConfiguration());
long statusCleanupTimerDelay = 1000;
indexManager = new IndexManager(server, getClusterStatus(tableDescriptor), filterCache,
10000000, 100, 1, 1,
- statusCleanupTimerDelay);
+ statusCleanupTimerDelay, 0);
setupData();
}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/eda8a395/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
index 90431a5..1055385 100644
--- a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
@@ -78,6 +78,8 @@ public class BlurClusterTest {
private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "./target/tmp_BlurClusterTest"));
private static MiniCluster miniCluster;
+
+ private int numberOfDocs = 1000;
@BeforeClass
public static void startCluster() throws IOException {
@@ -182,11 +184,10 @@ public class BlurClusterTest {
public void testLoadTable() throws BlurException, TException, InterruptedException {
Iface client = getClient();
- int length = 250;
int maxFacetValue = 100;
List<RowMutation> mutations = new ArrayList<RowMutation>();
Random random = new Random(1);
- for (int i = 0; i < length; i++) {
+ for (int i = 0; i < numberOfDocs; i++) {
String rowId = UUID.randomUUID().toString();
RecordMutation mutation = BlurThriftHelper.newRecordMutation("test", rowId,
BlurThriftHelper.newColumn("test", "value"),
@@ -207,7 +208,7 @@ public class BlurClusterTest {
blurQueryRow.setCacheResult(false);
BlurResults resultsRow = client.query("test", blurQueryRow);
assertRowResults(resultsRow);
- assertEquals(length, resultsRow.getTotalResults());
+ assertEquals(numberOfDocs, resultsRow.getTotalResults());
BlurQuery blurQueryRecord = new BlurQuery();
Query queryRecord = new Query();
@@ -216,7 +217,7 @@ public class BlurClusterTest {
blurQueryRecord.setQuery(queryRecord);
BlurResults resultsRecord = client.query("test", blurQueryRecord);
assertRecordResults(resultsRecord);
- assertEquals(length, resultsRecord.getTotalResults());
+ assertEquals(numberOfDocs, resultsRecord.getTotalResults());
Schema schema = client.schema("test");
assertFalse(schema.getFamilies().isEmpty());
@@ -235,7 +236,7 @@ public class BlurClusterTest {
BlurResults resultsRow = client.query("test", blurQueryRow);
assertTrue(0 != resultsRow.getQuery().getStartTime());
// assertRowResults(resultsRow);
- assertEquals(250, resultsRow.getTotalResults());
+ assertEquals(numberOfDocs, resultsRow.getTotalResults());
for (BlurResult blurResult : resultsRow.getResults()) {
System.out.println(blurResult);
@@ -259,7 +260,7 @@ public class BlurClusterTest {
BlurResults resultsRow = client.query("test", blurQueryRow);
// assertRowResults(resultsRow);
- assertEquals(250, resultsRow.getTotalResults());
+ assertEquals(numberOfDocs, resultsRow.getTotalResults());
System.out.println(resultsRow.getFacetCounts());
@@ -451,14 +452,13 @@ public class BlurClusterTest {
System.out.println("===========================");
Iface client = getClient();
- int length = 250;
BlurQuery blurQuery = new BlurQuery();
blurQuery.setUseCacheIfPresent(false);
Query query = new Query();
query.setQuery("test.test:value");
blurQuery.setQuery(query);
BlurResults results1 = client.query("test", blurQuery);
- assertEquals(length, results1.getTotalResults());
+ assertEquals(numberOfDocs, results1.getTotalResults());
assertRowResults(results1);
miniCluster.killShardServer(1);
@@ -469,7 +469,7 @@ public class BlurClusterTest {
// This should block until shards have failed over
client.shardServerLayout("test");
- assertEquals(length, client.query("test", blurQuery).getTotalResults());
+ assertEquals(numberOfDocs, client.query("test", blurQuery).getTotalResults());
}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/eda8a395/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java
----------------------------------------------------------------------
diff --git a/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java b/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java
index 9544b3a..e3a24bf 100644
--- a/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java
+++ b/blur-query/src/main/java/org/apache/blur/lucene/search/FacetExecutor.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongArray;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.search.Collector;
@@ -36,6 +38,8 @@ import org.apache.lucene.util.OpenBitSet;
public class FacetExecutor {
+ private final Log LOG = LogFactory.getLog(FacetExecutor.class);
+
static Comparator<Entry<Object, Info>> COMPARATOR = new Comparator<Entry<Object,
Info>>() {
@Override
public int compare(Entry<Object, Info> o1, Entry<Object, Info> o2) {
@@ -155,8 +159,8 @@ public class FacetExecutor {
_infoMap.put(key, info);
} else {
AtomicReader reader = context.reader();
- throw new IOException("Info about reader context [" + context + "] alread created,
existing Info [" + info
- + "] current reader [" + reader + "].");
+ LOG.info("Info about reader context [{0}] alread created, existing Info [{1}] current
reader [{2}].", context,
+ info, reader);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/eda8a395/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
index 6c1ea4b..f1fc727 100644
--- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -72,6 +72,7 @@ public class BlurConstants {
public static final String BLUR_SHARD_INDEX_WARMUP_CLASS = "blur.shard.index.warmup.class";
public static final String BLUR_INDEXMANAGER_SEARCH_THREAD_COUNT = "blur.indexmanager.search.thread.count";
public static final String BLUR_INDEXMANAGER_MUTATE_THREAD_COUNT = "blur.indexmanager.mutate.thread.count";
+ public static final String BLUR_INDEXMANAGER_FACET_THREAD_COUNT = "blur.indexmanager.facet.thread.count";
public static final String BLUR_SHARD_DATA_FETCH_THREAD_COUNT = "blur.shard.data.fetch.thread.count";
public static final String BLUR_SHARD_INTERNAL_SEARCH_THREAD_COUNT = "blur.shard.internal.search.thread.count";
public static final String BLUR_SHARD_WARMUP_THREAD_COUNT = "blur.shard.warmup.thread.count";
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/eda8a395/blur-util/src/main/resources/blur-default.properties
----------------------------------------------------------------------
diff --git a/blur-util/src/main/resources/blur-default.properties b/blur-util/src/main/resources/blur-default.properties
index 41d0283..6cd6904 100644
--- a/blur-util/src/main/resources/blur-default.properties
+++ b/blur-util/src/main/resources/blur-default.properties
@@ -157,6 +157,9 @@ blur.indexmanager.search.thread.count=8
# The number of thread used for parallel mutating in the index manager
blur.indexmanager.mutate.thread.count=8
+# The number of thread used for parallel faceting in the index manager
+blur.indexmanager.facet.thread.count=8
+
# The number of threads used for parallel searching in the index searchers
blur.shard.internal.search.thread.count=8
|