incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/2] git commit: Adding a way to change the BlurIndex implementation for a cluster or a table.
Date Thu, 12 Dec 2013 15:25:44 GMT
Updated Branches:
  refs/heads/apache-blur-0.2 a4169a999 -> 7ff903332


Adding a way to change the BlurIndex implementation for a cluster or a table.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/3dc5b842
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/3dc5b842
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/3dc5b842

Branch: refs/heads/apache-blur-0.2
Commit: 3dc5b842ba4a3b836adf035206c7b157cf73e0e9
Parents: a4169a9
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Dec 12 10:24:55 2013 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Dec 12 10:24:55 2013 -0500

----------------------------------------------------------------------
 .../indexserver/DistributedIndexServer.java     |  23 +-
 .../manager/indexserver/LocalIndexServer.java   |   2 +-
 .../apache/blur/manager/writer/BlurIndex.java   |  63 +++--
 .../blur/manager/writer/BlurIndexNRTSimple.java | 237 ++++++++++++++++++
 .../blur/manager/writer/BlurIndexReadOnly.java  |   3 +-
 .../blur/manager/writer/BlurIndexReader.java    |  12 +-
 .../blur/manager/writer/BlurNRTIndex.java       |   6 +-
 .../org/apache/blur/server/TableContext.java    |  49 ++++
 .../manager/writer/BlurIndexNRTSimpleTest.java  | 242 +++++++++++++++++++
 .../manager/writer/BlurIndexReaderTest.java     |   2 +-
 .../blur/manager/writer/BlurNRTIndexTest.java   |  33 ++-
 .../src/main/resources/blur-default.properties  |   3 +
 12 files changed, 610 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index f4a69ac..de45d29 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -48,7 +48,6 @@ import org.apache.blur.manager.writer.BlurIndex;
 import org.apache.blur.manager.writer.BlurIndexCloser;
 import org.apache.blur.manager.writer.BlurIndexReadOnly;
 import org.apache.blur.manager.writer.BlurIndexRefresher;
-import org.apache.blur.manager.writer.BlurNRTIndex;
 import org.apache.blur.manager.writer.SharedMergeScheduler;
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
@@ -193,7 +192,7 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer
{
 
       @Override
       public DistributedLayout createDistributedLayout(String table, List<String> shardList,
-          List<String> shardServerList, List<String> offlineShardServers, boolean
readOnly) {
+          List<String> shardServerList, List<String> offlineShardServers) {
         DistributedLayoutManager layoutManager = new DistributedLayoutManager();
         layoutManager.setNodes(shardServerList);
         layoutManager.setNodesOffline(offlineShardServers);
@@ -201,6 +200,11 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer
{
         layoutManager.init();
         return layoutManager;
       }
+
+      @Override
+      public DistributedLayout readCurrentLayout(String table) {
+        throw new RuntimeException("Not implemented");
+      }
     };
   }
 
@@ -474,18 +478,11 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer
{
       dir = directory;
     }
 
-    BlurIndex index;
-
-    BlurNRTIndex writer = new BlurNRTIndex(shardContext, _mergeScheduler, dir, _gc, _searchExecutor);
-
-    // BlurIndexNRTSimple writer = new BlurIndexNRTSimple(shardContext,
-    // _mergeScheduler, dir, _gc, _searchExecutor,
-    // _indexCloser, _refresher);
+    BlurIndex index = tableContext.newInstanceBlurIndex(shardContext, dir, _mergeScheduler,
_gc, _searchExecutor,
+        _indexCloser, _refresher);
 
     if (_clusterStatus.isReadOnly(true, _cluster, table)) {
-      index = new BlurIndexReadOnly(writer);
-    } else {
-      index = writer;
+      index = new BlurIndexReadOnly(index);
     }
     _filterCache.opening(table, shard, index);
     TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, _cluster, table);
@@ -598,7 +595,7 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer
{
     }
 
     DistributedLayout layoutManager = _distributedLayoutFactory.createDistributedLayout(table,
shardList,
-        shardServerList, offlineShardServers, false);
+        shardServerList, offlineShardServers);
 
     Map<String, String> layout = layoutManager.getLayout();
     String nodeName = getNodeName();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
index 51a7468..27960cd 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
@@ -156,7 +156,7 @@ public class LocalIndexServer extends AbstractIndexServer {
 
   private BlurIndex openIndex(String table, String shard, Directory dir) throws CorruptIndexException,
IOException {
     ShardContext shardContext = ShardContext.create(_tableContext, shard);
-    BlurNRTIndex index = new BlurNRTIndex(shardContext, _mergeScheduler, dir, _gc, _searchExecutor);
+    BlurNRTIndex index = new BlurNRTIndex(shardContext, dir, _mergeScheduler, _gc, _searchExecutor,
null, null);
     return index;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
index 8100a58..49fe965 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
@@ -18,10 +18,13 @@ package org.apache.blur.manager.writer;
  */
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
 import org.apache.blur.server.IndexSearcherClosable;
+import org.apache.blur.server.ShardContext;
 import org.apache.blur.thrift.generated.Row;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.lucene.index.IndexReader;
@@ -29,6 +32,7 @@ import org.apache.lucene.index.IndexReaderContext;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
 
 public abstract class BlurIndex {
 
@@ -37,6 +41,12 @@ public abstract class BlurIndex {
   private long _lastMemoryCheck = 0;
   private long _memoryUsage = 0;
 
+  public BlurIndex(ShardContext shardContext, Directory directory, SharedMergeScheduler mergeScheduler,
+      DirectoryReferenceFileGC gc, ExecutorService searchExecutor, BlurIndexCloser indexCloser,
+      BlurIndexRefresher refresher) throws IOException {
+
+  }
+
   public abstract void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException;
 
   public abstract void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws
IOException;
@@ -50,11 +60,11 @@ public abstract class BlurIndex {
   public abstract AtomicBoolean isClosed();
 
   public abstract void optimize(int numberOfSegmentsPerShard) throws IOException;
-  
+
   public abstract void createSnapshot(String name) throws IOException;
-  
+
   public abstract void removeSnapshot(String name) throws IOException;
-  
+
   public abstract List<String> getSnapshots() throws IOException;
 
   public long getRecordCount() throws IOException {
@@ -86,29 +96,30 @@ public abstract class BlurIndex {
 
   public long getIndexMemoryUsage() throws IOException {
     return 0;
-//    long now = System.currentTimeMillis();
-//    if (_lastMemoryCheck + ONE_MINUTE > now) {
-//      return _memoryUsage;
-//    }
-//    IndexSearcherClosable searcher = getIndexReader();
-//    try {
-//      IndexReaderContext topReaderContext = searcher.getTopReaderContext();
-//      return _memoryUsage = RamUsageEstimator.sizeOf(topReaderContext, new ClassNameFilter()
{
-//        @Override
-//        public boolean include(String className) {
-//          if (className.startsWith("org.apache.blur.index.ExitableReader")) {
-//            return true;
-//          } else if (className.startsWith("org.apache.blur.")) {
-//            // System.out.println("className [" + className + "]");
-//            return false;
-//          }
-//          return true;
-//        }
-//      });
-//    } finally {
-//      searcher.close();
-//      _lastMemoryCheck = System.currentTimeMillis();
-//    }
+    // long now = System.currentTimeMillis();
+    // if (_lastMemoryCheck + ONE_MINUTE > now) {
+    // return _memoryUsage;
+    // }
+    // IndexSearcherClosable searcher = getIndexReader();
+    // try {
+    // IndexReaderContext topReaderContext = searcher.getTopReaderContext();
+    // return _memoryUsage = RamUsageEstimator.sizeOf(topReaderContext, new
+    // ClassNameFilter() {
+    // @Override
+    // public boolean include(String className) {
+    // if (className.startsWith("org.apache.blur.index.ExitableReader")) {
+    // return true;
+    // } else if (className.startsWith("org.apache.blur.")) {
+    // // System.out.println("className [" + className + "]");
+    // return false;
+    // }
+    // return true;
+    // }
+    // });
+    // } finally {
+    // searcher.close();
+    // _lastMemoryCheck = System.currentTimeMillis();
+    // }
   }
 
   public long getSegmentCount() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexNRTSimple.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexNRTSimple.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexNRTSimple.java
new file mode 100644
index 0000000..f35560a
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexNRTSimple.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.writer;
+
+import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.blur.analysis.FieldManager;
+import org.apache.blur.index.ExitableReader;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.codec.Blur022Codec;
+import org.apache.blur.lucene.store.refcounter.DirectoryReferenceCounter;
+import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
+import org.apache.blur.lucene.warmup.TraceableDirectory;
+import org.apache.blur.server.IndexSearcherClosable;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.index.BlurIndexWriter;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.store.Directory;
+
+public class BlurIndexNRTSimple extends BlurIndex {
+
+  private static final Log LOG = LogFactory.getLog(BlurIndexNRTSimple.class);
+
+  private final AtomicBoolean _isClosed = new AtomicBoolean();
+  private final BlurIndexCloser _indexCloser;
+  private final AtomicReference<DirectoryReader> _indexReader = new AtomicReference<DirectoryReader>();
+  private final ExecutorService _searchThreadPool;
+  private final Directory _directory;
+  private final Thread _writerOpener;
+  private final IndexWriterConfig _conf;
+  private final TableContext _tableContext;
+  private final FieldManager _fieldManager;
+  private final BlurIndexRefresher _refresher;
+  private final ShardContext _shardContext;
+  private final AtomicReference<BlurIndexWriter> _writer = new AtomicReference<BlurIndexWriter>();
+  private final boolean _makeReaderExitable = true;
+
+  public BlurIndexNRTSimple(ShardContext shardContext, Directory directory, SharedMergeScheduler
mergeScheduler,
+      DirectoryReferenceFileGC gc, final ExecutorService searchExecutor, BlurIndexCloser
indexCloser,
+      BlurIndexRefresher refresher) throws IOException {
+    super(shardContext, directory, mergeScheduler, gc, searchExecutor, indexCloser, refresher);
+    _searchThreadPool = searchExecutor;
+    _shardContext = shardContext;
+    _tableContext = _shardContext.getTableContext();
+    _fieldManager = _tableContext.getFieldManager();
+    Analyzer analyzer = _fieldManager.getAnalyzerForIndex();
+    _conf = new IndexWriterConfig(LUCENE_VERSION, analyzer);
+    _conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
+    _conf.setCodec(new Blur022Codec(_tableContext.getBlurConfiguration()));
+    _conf.setSimilarity(_tableContext.getSimilarity());
+    AtomicBoolean stop = new AtomicBoolean();
+    _conf.setMergedSegmentWarmer(new FieldBasedWarmer(shardContext, stop, _isClosed));
+    TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
+    mergePolicy.setUseCompoundFile(false);
+    _conf.setMergeScheduler(mergeScheduler.getMergeScheduler());
+
+    if (!DirectoryReader.indexExists(directory)) {
+      new BlurIndexWriter(directory, _conf).close();
+    }
+    DirectoryReferenceCounter referenceCounter = new DirectoryReferenceCounter(directory,
gc);
+    // This directory allows for warm up by adding tracing ability.
+    TraceableDirectory dir = new TraceableDirectory(referenceCounter);
+    _directory = dir;
+
+    // _directory = directory;
+
+    _indexCloser = indexCloser;
+    _indexReader.set(wrap(DirectoryReader.open(_directory)));
+    _refresher = refresher;
+
+    _writerOpener = getWriterOpener(shardContext);
+    _writerOpener.start();
+    _refresher.register(this);
+  }
+
+  private DirectoryReader wrap(DirectoryReader reader) {
+    if (_makeReaderExitable) {
+      reader = new ExitableReader(reader);
+    }
+    return reader;
+  }
+
+  private Thread getWriterOpener(ShardContext shardContext) {
+    Thread thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          _writer.set(new BlurIndexWriter(_directory, _conf.clone()));
+          synchronized (_writer) {
+            _writer.notify();
+          }
+        } catch (IOException e) {
+          LOG.error("Unknown error on index writer open.", e);
+        }
+      }
+    });
+    thread.setName("Writer Opener for Table [" + shardContext.getTableContext().getTable()
+ "] Shard ["
+        + shardContext.getShard() + "]");
+    thread.setDaemon(true);
+    return thread;
+  }
+
+  @Override
+  public IndexSearcherClosable getIndexSearcher() throws IOException {
+    final IndexReader indexReader = _indexReader.get();
+    while (!indexReader.tryIncRef()) {
+      // keep trying to increment the ref
+    }
+    return new IndexSearcherClosable(indexReader, _searchThreadPool) {
+      @Override
+      public Directory getDirectory() {
+        return _directory;
+      }
+
+      @Override
+      public void close() throws IOException {
+        indexReader.decRef();
+      }
+    };
+  }
+
+  @Override
+  public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException
{
+    waitUntilNotNull(_writer);
+    BlurIndexWriter writer = _writer.get();
+    List<List<Field>> docs = TransactionRecorder.getDocs(row, _fieldManager);
+    writer.updateDocuments(TransactionRecorder.createRowId(row.getId()), docs);
+    waitToBeVisible(waitToBeVisible);
+  }
+
+  @Override
+  public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException
{
+    waitUntilNotNull(_writer);
+    BlurIndexWriter writer = _writer.get();
+    writer.deleteDocuments(TransactionRecorder.createRowId(rowId));
+    waitToBeVisible(waitToBeVisible);
+  }
+
+  private void waitUntilNotNull(AtomicReference<?> ref) {
+    while (true) {
+      Object object = ref.get();
+      if (object != null) {
+        return;
+      }
+      synchronized (ref) {
+        try {
+          ref.wait(TimeUnit.SECONDS.toMillis(1));
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    _isClosed.set(true);
+    IOUtils.cleanup(LOG, _writer.get());
+    IOUtils.cleanup(LOG, _indexReader.get());
+  }
+
+  @Override
+  public void refresh() throws IOException {
+    DirectoryReader currentReader = _indexReader.get();
+    DirectoryReader newReader = DirectoryReader.openIfChanged(currentReader);
+    if (newReader != null) {
+      LOG.info("Refreshing index for table [{0}] shard [{1}].", _tableContext.getTable(),
_shardContext.getShard());
+      _indexReader.set(wrap(newReader));
+      _indexCloser.close(currentReader);
+    }
+  }
+
+  @Override
+  public AtomicBoolean isClosed() {
+    return _isClosed;
+  }
+
+  @Override
+  public void optimize(int numberOfSegmentsPerShard) throws IOException {
+    throw new RuntimeException("not impl");
+  }
+
+  @Override
+  public void createSnapshot(String name) throws IOException {
+    throw new RuntimeException("not impl");
+  }
+
+  @Override
+  public void removeSnapshot(String name) throws IOException {
+    throw new RuntimeException("not impl");
+  }
+
+  @Override
+  public List<String> getSnapshots() throws IOException {
+    throw new RuntimeException("not impl");
+  }
+
+  private void waitToBeVisible(boolean waitToBeVisible) throws IOException {
+    if (waitToBeVisible) {
+      waitUntilNotNull(_writer);
+      BlurIndexWriter writer = _writer.get();
+      writer.commit();
+      refresh();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
index ddd82e1..e8a3c32 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
@@ -27,7 +27,8 @@ public class BlurIndexReadOnly extends BlurIndex {
 
   private final BlurIndex _blurIndex;
 
-  public BlurIndexReadOnly(BlurIndex blurIndex) {
+  public BlurIndexReadOnly(BlurIndex blurIndex) throws IOException {
+    super(null, null, null, null, null, null, null);
     _blurIndex = blurIndex;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
index 13effa3..db5301a 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
@@ -20,6 +20,7 @@ import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -27,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.codec.Blur022Codec;
+import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
 import org.apache.blur.lucene.warmup.TraceableDirectory;
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
@@ -50,15 +52,17 @@ public class BlurIndexReader extends BlurIndex {
   private BlurIndexRefresher _refresher;
   private final TableContext _tableContext;
   private final ShardContext _shardContext;
-
-  public BlurIndexReader(ShardContext shardContext, Directory directory, BlurIndexRefresher
refresher,
-      BlurIndexCloser closer) throws IOException {
+  
+  public BlurIndexReader(ShardContext shardContext, Directory directory, SharedMergeScheduler
mergeScheduler,
+      DirectoryReferenceFileGC gc, final ExecutorService searchExecutor, BlurIndexCloser
indexCloser,
+      BlurIndexRefresher refresher) throws IOException {
+    super(shardContext, directory, mergeScheduler, gc, searchExecutor, indexCloser, refresher);
     _tableContext = shardContext.getTableContext();
     // This directory allows for warm up by adding tracing ability.
     _directory = new TraceableDirectory(directory);
     _shardContext = shardContext;
     _refresher = refresher;
-    _closer = closer;
+    _closer = indexCloser;
 
     _open.set(true);
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
index a74678b..2b9d38e 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -95,8 +95,10 @@ public class BlurNRTIndex extends BlurIndex {
   private final ReadWriteLock _lock = new ReentrantReadWriteLock();
   private long _lastRefresh = 0;
 
-  public BlurNRTIndex(ShardContext shardContext, SharedMergeScheduler mergeScheduler, Directory
directory,
-      DirectoryReferenceFileGC gc, final ExecutorService searchExecutor) throws IOException
{
+  public BlurNRTIndex(ShardContext shardContext, Directory directory, SharedMergeScheduler
mergeScheduler,
+      DirectoryReferenceFileGC gc, final ExecutorService searchExecutor, BlurIndexCloser
indexCloser,
+      BlurIndexRefresher refresher) throws IOException {
+    super(shardContext, directory, mergeScheduler, gc, searchExecutor, indexCloser, refresher);
     _tableContext = shardContext.getTableContext();
     _directory = directory;
     _shardContext = shardContext;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/server/TableContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/TableContext.java b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
index 3256404..0102a70 100644
--- a/blur-core/src/main/java/org/apache/blur/server/TableContext.java
+++ b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
@@ -17,6 +17,7 @@ package org.apache.blur.server;
  * limitations under the License.
  */
 import static org.apache.blur.utils.BlurConstants.BLUR_FIELDTYPE;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLURINDEX_CLASS;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_DELETION_POLICY_MAXAGE;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_SIMILARITY;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_COMMITS;
@@ -24,11 +25,14 @@ import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_REFRES
 import static org.apache.blur.utils.BlurConstants.SUPER;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.blur.BlurConfiguration;
@@ -39,6 +43,12 @@ import org.apache.blur.analysis.NoStopWordStandardAnalyzer;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.search.FairSimilarity;
+import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
+import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.manager.writer.BlurIndexCloser;
+import org.apache.blur.manager.writer.BlurIndexRefresher;
+import org.apache.blur.manager.writer.BlurNRTIndex;
+import org.apache.blur.manager.writer.SharedMergeScheduler;
 import org.apache.blur.thrift.generated.ScoreType;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurConstants;
@@ -49,6 +59,7 @@ import org.apache.lucene.index.IndexDeletionPolicy;
 import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.similarities.Similarity;
+import org.apache.lucene.store.Directory;
 
 public class TableContext {
 
@@ -279,4 +290,42 @@ public class TableContext {
   public static void setSystemBlurConfiguration(BlurConfiguration systemBlurConfiguration)
{
     TableContext.systemBlurConfiguration = systemBlurConfiguration;
   }
+
+  @SuppressWarnings("unchecked")
+  public BlurIndex newInstanceBlurIndex(ShardContext shardContext, Directory dir, SharedMergeScheduler
mergeScheduler,
+      DirectoryReferenceFileGC gc, ExecutorService searchExecutor, BlurIndexCloser indexCloser,
+      BlurIndexRefresher refresher) throws IOException {
+
+    String className = blurConfiguration.get(BLUR_SHARD_BLURINDEX_CLASS, BlurNRTIndex.class.getName());
+
+    Class<? extends BlurIndex> clazz;
+    try {
+      clazz = (Class<? extends BlurIndex>) Class.forName(className);
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+    Constructor<? extends BlurIndex> constructor = findConstructor(clazz);
+    try {
+      return constructor.newInstance(shardContext, dir, mergeScheduler, gc, searchExecutor,
indexCloser, refresher);
+    } catch (InstantiationException e) {
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(e);
+    } catch (IllegalArgumentException e) {
+      throw new IOException(e);
+    } catch (InvocationTargetException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private Constructor<? extends BlurIndex> findConstructor(Class<? extends BlurIndex>
clazz) throws IOException {
+    try {
+      return clazz.getConstructor(new Class[] { ShardContext.class, Directory.class, SharedMergeScheduler.class,
+          DirectoryReferenceFileGC.class, ExecutorService.class, BlurIndexCloser.class, BlurIndexRefresher.class
});
+    } catch (NoSuchMethodException e) {
+      throw new IOException(e);
+    } catch (SecurityException e) {
+      throw new IOException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java
new file mode 100644
index 0000000..cd528cb
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java
@@ -0,0 +1,242 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
+import org.apache.blur.server.IndexSearcherClosable;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.store.FSDirectory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BlurIndexNRTSimpleTest {
+
+  private static final int TEST_NUMBER_WAIT_VISIBLE = 500;
+  private static final int TEST_NUMBER = 50000;
+
+  private static final File TMPDIR = new File("./target/tmp");
+
+  private BlurIndexNRTSimple writer;
+  private Random random = new Random();
+  private ExecutorService service;
+  private File base;
+  private Configuration configuration;
+
+  private DirectoryReferenceFileGC gc;
+  private SharedMergeScheduler mergeScheduler;
+  private String uuid;
+  private BlurIndexRefresher _refresher;
+  private BlurIndexCloser _closer;
+
+  @Before
+  public void setup() throws IOException {
+    TableContext.clear();
+    base = new File(TMPDIR, "blur-index-writer-test");
+    rm(base);
+    base.mkdirs();
+
+    mergeScheduler = new SharedMergeScheduler(1);
+    gc = new DirectoryReferenceFileGC();
+
+    configuration = new Configuration();
+    service = Executors.newThreadPool("test", 10);
+    _refresher = new BlurIndexRefresher();
+    _closer = new BlurIndexCloser();
+  }
+
+  private void setupWriter(Configuration configuration, long refresh, boolean reload) throws
IOException {
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setName("test-table");
+    /*
+     * if reload is set to true...we create a new writer instance pointing
+     * to the same location as the old one.....
+     * so previous writer instances should be closed
+     */
+    
+    if (!reload && uuid == null) {
+      uuid = UUID.randomUUID().toString();
+    }
+    
+    tableDescriptor.setTableUri(new File(base, "table-store-" + uuid).toURI().toString());
+    tableDescriptor.putToTableProperties("blur.shard.time.between.refreshs", Long.toString(refresh));
+
+    TableContext tableContext = TableContext.create(tableDescriptor);
+    File path = new File(base, "index_" + uuid);
+    path.mkdirs();
+    FSDirectory directory = FSDirectory.open(path);
+    ShardContext shardContext = ShardContext.create(tableContext, "test-shard-" + uuid);
+    writer = new BlurIndexNRTSimple(shardContext, directory, mergeScheduler, gc, service,
_closer, _refresher);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    _refresher.close();
+    writer.close();
+    mergeScheduler.close();
+    gc.close();
+    service.shutdownNow();
+    rm(base);
+  }
+
+  private void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+  @Test
+  public void testBlurIndexWriter() throws IOException {
+    setupWriter(configuration, 5, false);
+    long s = System.nanoTime();
+    int total = 0;
+    for (int i = 0; i < TEST_NUMBER_WAIT_VISIBLE; i++) {
+      writer.replaceRow(true, true, genRow());
+      IndexSearcherClosable searcher = writer.getIndexSearcher();
+      IndexReader reader = searcher.getIndexReader();
+      assertEquals(i + 1, reader.numDocs());
+      searcher.close();
+      total++;
+    }
+    long e = System.nanoTime();
+    double seconds = (e - s) / 1000000000.0;
+    double rate = total / seconds;
+    System.out.println("Rate " + rate);
+    IndexSearcherClosable searcher = writer.getIndexSearcher();
+    IndexReader reader = searcher.getIndexReader();
+    assertEquals(TEST_NUMBER_WAIT_VISIBLE, reader.numDocs());
+    searcher.close();
+  }
+
+  @Test
+  public void testBlurIndexWriterFaster() throws IOException, InterruptedException {
+    setupWriter(configuration, 100, false);
+    IndexSearcherClosable searcher1 = writer.getIndexSearcher();
+    IndexReader reader1 = searcher1.getIndexReader();
+    assertEquals(0, reader1.numDocs());
+    searcher1.close();
+    long s = System.nanoTime();
+    int total = 0;
+    for (int i = 0; i < TEST_NUMBER; i++) {
+      if (i == TEST_NUMBER - 1) {
+        writer.replaceRow(true, true, genRow());
+      } else {
+        writer.replaceRow(false, true, genRow());
+      }
+      total++;
+    }
+    long e = System.nanoTime();
+    double seconds = (e - s) / 1000000000.0;
+    double rate = total / seconds;
+    System.out.println("Rate " + rate);
+    // //wait one second for the data to become visible the test is set to
+    // refresh once every 25 ms
+    // Thread.sleep(1000);
+    writer.refresh();
+    IndexSearcherClosable searcher2 = writer.getIndexSearcher();
+    IndexReader reader2 = searcher2.getIndexReader();
+    assertEquals(TEST_NUMBER, reader2.numDocs());
+    searcher2.close();
+  }
+
+  private Row genRow() {
+    Row row = new Row();
+    row.setId(Long.toString(random.nextLong()));
+    Record record = new Record();
+    record.setFamily("testing");
+    record.setRecordId(Long.toString(random.nextLong()));
+    for (int i = 0; i < 10; i++) {
+      record.addToColumns(new Column("col" + i, Long.toString(random.nextLong())));
+    }
+    row.addToRecords(record);
+    return row;
+  }
+
+//  @Test
+//  public void testCreateSnapshot() throws IOException {
+//    setupWriter(configuration, 5, false);
+//    writer.createSnapshot("test_snapshot");
+//    assertTrue(writer.getSnapshots().contains("test_snapshot"));
+//    
+//    // check that the file is persisted
+//    Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
+//    FileSystem fileSystem = snapshotsDirPath.getFileSystem(new Configuration());
+//    Path snapshotFilePath = new Path(snapshotsDirPath, "test_snapshot");
+//    assertTrue(fileSystem.exists(snapshotFilePath));
+//    
+//    // create a new writer instance and test whether the snapshots are loaded properly
+//    writer.close();
+//    setupWriter(configuration, 5, true);
+//    assertTrue(writer.getSnapshots().contains("test_snapshot"));
+//  }
+//  
+//  
+//  @Test
+//  public void testRemoveSnapshots() throws IOException {
+//    setupWriter(configuration, 5, false);
+//    Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
+//    FileSystem fileSystem = snapshotsDirPath.getFileSystem(new Configuration());
+//    fileSystem.mkdirs(snapshotsDirPath);
+//    
+//    // create 2 files in snapshots sub-dir
+//    Path snapshotFile1 = new Path(snapshotsDirPath, "test_snapshot1");
+//    Path snapshotFile2 = new Path(snapshotsDirPath, "test_snapshot2");
+//    
+//    BufferedWriter br1 = new BufferedWriter(new OutputStreamWriter(fileSystem.create(snapshotFile1,
true)));
+//    br1.write("segments_1");
+//    br1.close();
+//    
+//    BufferedWriter br2 = new BufferedWriter(new OutputStreamWriter(fileSystem.create(snapshotFile2,
true)));
+//    br2.write("segments_1");
+//    br2.close();
+//    
+//    // re-load the writer to load the snpshots
+//    writer.close();
+//    setupWriter(configuration, 5, true);
+//    assertEquals(writer.getSnapshots().size(), 2);
+//    
+//    
+//    writer.removeSnapshot("test_snapshot2");
+//    assertEquals(writer.getSnapshots().size(), 1);
+//    assertTrue(!writer.getSnapshots().contains("test_snapshot2"));
+//    assertTrue(!fileSystem.exists(snapshotFile2));
+//
+//  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
index 01762b1..cb7d649 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
@@ -90,7 +90,7 @@ public class BlurIndexReaderTest {
     ShardContext shardContext = ShardContext.create(tableContext, "test-shard");
     refresher = new BlurIndexRefresher();
     indexCloser = new BlurIndexCloser();
-    reader = new BlurIndexReader(shardContext, directory, refresher, indexCloser);
+    reader = new BlurIndexReader(shardContext, directory, null, null, null, indexCloser,
refresher);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
index 5f40fac..9f2ffc3 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
@@ -81,15 +81,15 @@ public class BlurNRTIndexTest {
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setName("test-table");
     /*
-     * if reload is set to true...we create a new writer instance pointing
-     * to the same location as the old one.....
-     * so previous writer instances should be closed
+     * if reload is set to true...we create a new writer instance pointing to
+     * the same location as the old one..... so previous writer instances should
+     * be closed
      */
-    
+
     if (!reload && uuid == null) {
       uuid = UUID.randomUUID().toString();
     }
-    
+
     tableDescriptor.setTableUri(new File(base, "table-store-" + uuid).toURI().toString());
     tableDescriptor.putToTableProperties("blur.shard.time.between.refreshs", Long.toString(refresh));
 
@@ -98,7 +98,7 @@ public class BlurNRTIndexTest {
     path.mkdirs();
     FSDirectory directory = FSDirectory.open(path);
     ShardContext shardContext = ShardContext.create(tableContext, "test-shard-" + uuid);
-    writer = new BlurNRTIndex(shardContext, mergeScheduler, directory, gc, service);
+    writer = new BlurNRTIndex(shardContext, directory, mergeScheduler, gc, service, null,
null);
   }
 
   @After
@@ -194,45 +194,44 @@ public class BlurNRTIndexTest {
     setupWriter(configuration, 5, false);
     writer.createSnapshot("test_snapshot");
     assertTrue(writer.getSnapshots().contains("test_snapshot"));
-    
+
     // check that the file is persisted
     Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
     FileSystem fileSystem = snapshotsDirPath.getFileSystem(new Configuration());
     Path snapshotFilePath = new Path(snapshotsDirPath, "test_snapshot");
     assertTrue(fileSystem.exists(snapshotFilePath));
-    
-    // create a new writer instance and test whether the snapshots are loaded properly
+
+    // create a new writer instance and test whether the snapshots are loaded
+    // properly
     writer.close();
     setupWriter(configuration, 5, true);
     assertTrue(writer.getSnapshots().contains("test_snapshot"));
   }
-  
-  
+
   @Test
   public void testRemoveSnapshots() throws IOException {
     setupWriter(configuration, 5, false);
     Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
     FileSystem fileSystem = snapshotsDirPath.getFileSystem(new Configuration());
     fileSystem.mkdirs(snapshotsDirPath);
-    
+
     // create 2 files in snapshots sub-dir
     Path snapshotFile1 = new Path(snapshotsDirPath, "test_snapshot1");
     Path snapshotFile2 = new Path(snapshotsDirPath, "test_snapshot2");
-    
+
     BufferedWriter br1 = new BufferedWriter(new OutputStreamWriter(fileSystem.create(snapshotFile1,
true)));
     br1.write("segments_1");
     br1.close();
-    
+
     BufferedWriter br2 = new BufferedWriter(new OutputStreamWriter(fileSystem.create(snapshotFile2,
true)));
     br2.write("segments_1");
     br2.close();
-    
+
     // re-load the writer to load the snpshots
     writer.close();
     setupWriter(configuration, 5, true);
     assertEquals(writer.getSnapshots().size(), 2);
-    
-    
+
     writer.removeSnapshot("test_snapshot2");
     assertEquals(writer.getSnapshots().size(), 1);
     assertTrue(!writer.getSnapshots().contains("test_snapshot2"));

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-util/src/main/resources/blur-default.properties
----------------------------------------------------------------------
diff --git a/blur-util/src/main/resources/blur-default.properties b/blur-util/src/main/resources/blur-default.properties
index 72bf66d..1876519 100644
--- a/blur-util/src/main/resources/blur-default.properties
+++ b/blur-util/src/main/resources/blur-default.properties
@@ -181,6 +181,9 @@ blur.gui.shard.port=40090
 # To intercept the calls made to the shard server and perform server side changes to the
calls extend org.apache.blur.server.FilteredBlurServer.
 blur.shard.filtered.server.class=
 
+# Defines the blur index class to be used to handle index requests.  This class has to extend
org.apache.blur.manager.writer.BlurIndex.  This can be defined globally as well as per table.
+blur.shard.blurindex.class=
+
 
 ### Controller Server Configuration
 


Mime
View raw message