incubator-blur-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aaron McCurry <amccu...@gmail.com>
Subject Re: [1/2] git commit: Adding a way to change the BlurIndex implementation for a cluster or a table.
Date Fri, 13 Dec 2013 03:51:04 GMT
The BlurNRTIndex has some serious issues when it's being hit by a lot of
threads for updates and searching.  There are very long blocking when the
index commits and has to roll the write ahead log.  I have started a
simpler version of the BlurIndex that allows for updates but doesn't have a
write ahead log yet, but I had no way to even try it without patching Blur.
 Also there has been a lot of discussion in jira on BLUR-290 that would
involve a new implementation of the BlurIndex.  So this is just a bit code
to allow for different implementations.

Aaron


On Thu, Dec 12, 2013 at 8:15 PM, Tim Williams <williamstw@gmail.com> wrote:

> Interesting, do you have a specific use-case for this you can share?
>
> Thanks,
> --tim
>
> On Thu, Dec 12, 2013 at 10:25 AM,  <amccurry@apache.org> wrote:
> > Updated Branches:
> >   refs/heads/apache-blur-0.2 a4169a999 -> 7ff903332
> >
> >
> > Adding a way to change the BlurIndex implementation for a cluster or a
> table.
> >
> >
> > Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
> > Commit:
> http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/3dc5b842
> > Tree:
> http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/3dc5b842
> > Diff:
> http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/3dc5b842
> >
> > Branch: refs/heads/apache-blur-0.2
> > Commit: 3dc5b842ba4a3b836adf035206c7b157cf73e0e9
> > Parents: a4169a9
> > Author: Aaron McCurry <amccurry@gmail.com>
> > Authored: Thu Dec 12 10:24:55 2013 -0500
> > Committer: Aaron McCurry <amccurry@gmail.com>
> > Committed: Thu Dec 12 10:24:55 2013 -0500
> >
> > ----------------------------------------------------------------------
> >  .../indexserver/DistributedIndexServer.java     |  23 +-
> >  .../manager/indexserver/LocalIndexServer.java   |   2 +-
> >  .../apache/blur/manager/writer/BlurIndex.java   |  63 +++--
> >  .../blur/manager/writer/BlurIndexNRTSimple.java | 237 ++++++++++++++++++
> >  .../blur/manager/writer/BlurIndexReadOnly.java  |   3 +-
> >  .../blur/manager/writer/BlurIndexReader.java    |  12 +-
> >  .../blur/manager/writer/BlurNRTIndex.java       |   6 +-
> >  .../org/apache/blur/server/TableContext.java    |  49 ++++
> >  .../manager/writer/BlurIndexNRTSimpleTest.java  | 242
> +++++++++++++++++++
> >  .../manager/writer/BlurIndexReaderTest.java     |   2 +-
> >  .../blur/manager/writer/BlurNRTIndexTest.java   |  33 ++-
> >  .../src/main/resources/blur-default.properties  |   3 +
> >  12 files changed, 610 insertions(+), 65 deletions(-)
> > ----------------------------------------------------------------------
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
> b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
> > index f4a69ac..de45d29 100644
> > ---
> a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
> > +++
> b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
> > @@ -48,7 +48,6 @@ import org.apache.blur.manager.writer.BlurIndex;
> >  import org.apache.blur.manager.writer.BlurIndexCloser;
> >  import org.apache.blur.manager.writer.BlurIndexReadOnly;
> >  import org.apache.blur.manager.writer.BlurIndexRefresher;
> > -import org.apache.blur.manager.writer.BlurNRTIndex;
> >  import org.apache.blur.manager.writer.SharedMergeScheduler;
> >  import org.apache.blur.server.IndexSearcherClosable;
> >  import org.apache.blur.server.ShardContext;
> > @@ -193,7 +192,7 @@ public class DistributedIndexServer extends
> AbstractDistributedIndexServer {
> >
> >        @Override
> >        public DistributedLayout createDistributedLayout(String table,
> List<String> shardList,
> > -          List<String> shardServerList, List<String>
> offlineShardServers, boolean readOnly) {
> > +          List<String> shardServerList, List<String>
> offlineShardServers) {
> >          DistributedLayoutManager layoutManager = new
> DistributedLayoutManager();
> >          layoutManager.setNodes(shardServerList);
> >          layoutManager.setNodesOffline(offlineShardServers);
> > @@ -201,6 +200,11 @@ public class DistributedIndexServer extends
> AbstractDistributedIndexServer {
> >          layoutManager.init();
> >          return layoutManager;
> >        }
> > +
> > +      @Override
> > +      public DistributedLayout readCurrentLayout(String table) {
> > +        throw new RuntimeException("Not implemented");
> > +      }
> >      };
> >    }
> >
> > @@ -474,18 +478,11 @@ public class DistributedIndexServer extends
> AbstractDistributedIndexServer {
> >        dir = directory;
> >      }
> >
> > -    BlurIndex index;
> > -
> > -    BlurNRTIndex writer = new BlurNRTIndex(shardContext,
> _mergeScheduler, dir, _gc, _searchExecutor);
> > -
> > -    // BlurIndexNRTSimple writer = new BlurIndexNRTSimple(shardContext,
> > -    // _mergeScheduler, dir, _gc, _searchExecutor,
> > -    // _indexCloser, _refresher);
> > +    BlurIndex index = tableContext.newInstanceBlurIndex(shardContext,
> dir, _mergeScheduler, _gc, _searchExecutor,
> > +        _indexCloser, _refresher);
> >
> >      if (_clusterStatus.isReadOnly(true, _cluster, table)) {
> > -      index = new BlurIndexReadOnly(writer);
> > -    } else {
> > -      index = writer;
> > +      index = new BlurIndexReadOnly(index);
> >      }
> >      _filterCache.opening(table, shard, index);
> >      TableDescriptor tableDescriptor =
> _clusterStatus.getTableDescriptor(true, _cluster, table);
> > @@ -598,7 +595,7 @@ public class DistributedIndexServer extends
> AbstractDistributedIndexServer {
> >      }
> >
> >      DistributedLayout layoutManager =
> _distributedLayoutFactory.createDistributedLayout(table, shardList,
> > -        shardServerList, offlineShardServers, false);
> > +        shardServerList, offlineShardServers);
> >
> >      Map<String, String> layout = layoutManager.getLayout();
> >      String nodeName = getNodeName();
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
> b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
> > index 51a7468..27960cd 100644
> > ---
> a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
> > +++
> b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
> > @@ -156,7 +156,7 @@ public class LocalIndexServer extends
> AbstractIndexServer {
> >
> >    private BlurIndex openIndex(String table, String shard, Directory
> dir) throws CorruptIndexException, IOException {
> >      ShardContext shardContext = ShardContext.create(_tableContext,
> shard);
> > -    BlurNRTIndex index = new BlurNRTIndex(shardContext,
> _mergeScheduler, dir, _gc, _searchExecutor);
> > +    BlurNRTIndex index = new BlurNRTIndex(shardContext, dir,
> _mergeScheduler, _gc, _searchExecutor, null, null);
> >      return index;
> >    }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> > index 8100a58..49fe965 100644
> > ---
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> > +++
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> > @@ -18,10 +18,13 @@ package org.apache.blur.manager.writer;
> >   */
> >  import java.io.IOException;
> >  import java.util.List;
> > +import java.util.concurrent.ExecutorService;
> >  import java.util.concurrent.TimeUnit;
> >  import java.util.concurrent.atomic.AtomicBoolean;
> >
> > +import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
> >  import org.apache.blur.server.IndexSearcherClosable;
> > +import org.apache.blur.server.ShardContext;
> >  import org.apache.blur.thrift.generated.Row;
> >  import org.apache.blur.utils.BlurUtil;
> >  import org.apache.lucene.index.IndexReader;
> > @@ -29,6 +32,7 @@ import org.apache.lucene.index.IndexReaderContext;
> >  import org.apache.lucene.search.IndexSearcher;
> >  import org.apache.lucene.search.TermQuery;
> >  import org.apache.lucene.search.TopDocs;
> > +import org.apache.lucene.store.Directory;
> >
> >  public abstract class BlurIndex {
> >
> > @@ -37,6 +41,12 @@ public abstract class BlurIndex {
> >    private long _lastMemoryCheck = 0;
> >    private long _memoryUsage = 0;
> >
> > +  public BlurIndex(ShardContext shardContext, Directory directory,
> SharedMergeScheduler mergeScheduler,
> > +      DirectoryReferenceFileGC gc, ExecutorService searchExecutor,
> BlurIndexCloser indexCloser,
> > +      BlurIndexRefresher refresher) throws IOException {
> > +
> > +  }
> > +
> >    public abstract void replaceRow(boolean waitToBeVisible, boolean wal,
> Row row) throws IOException;
> >
> >    public abstract void deleteRow(boolean waitToBeVisible, boolean wal,
> String rowId) throws IOException;
> > @@ -50,11 +60,11 @@ public abstract class BlurIndex {
> >    public abstract AtomicBoolean isClosed();
> >
> >    public abstract void optimize(int numberOfSegmentsPerShard) throws
> IOException;
> > -
> > +
> >    public abstract void createSnapshot(String name) throws IOException;
> > -
> > +
> >    public abstract void removeSnapshot(String name) throws IOException;
> > -
> > +
> >    public abstract List<String> getSnapshots() throws IOException;
> >
> >    public long getRecordCount() throws IOException {
> > @@ -86,29 +96,30 @@ public abstract class BlurIndex {
> >
> >    public long getIndexMemoryUsage() throws IOException {
> >      return 0;
> > -//    long now = System.currentTimeMillis();
> > -//    if (_lastMemoryCheck + ONE_MINUTE > now) {
> > -//      return _memoryUsage;
> > -//    }
> > -//    IndexSearcherClosable searcher = getIndexReader();
> > -//    try {
> > -//      IndexReaderContext topReaderContext =
> searcher.getTopReaderContext();
> > -//      return _memoryUsage =
> RamUsageEstimator.sizeOf(topReaderContext, new ClassNameFilter() {
> > -//        @Override
> > -//        public boolean include(String className) {
> > -//          if
> (className.startsWith("org.apache.blur.index.ExitableReader")) {
> > -//            return true;
> > -//          } else if (className.startsWith("org.apache.blur.")) {
> > -//            // System.out.println("className [" + className + "]");
> > -//            return false;
> > -//          }
> > -//          return true;
> > -//        }
> > -//      });
> > -//    } finally {
> > -//      searcher.close();
> > -//      _lastMemoryCheck = System.currentTimeMillis();
> > -//    }
> > +    // long now = System.currentTimeMillis();
> > +    // if (_lastMemoryCheck + ONE_MINUTE > now) {
> > +    // return _memoryUsage;
> > +    // }
> > +    // IndexSearcherClosable searcher = getIndexReader();
> > +    // try {
> > +    // IndexReaderContext topReaderContext =
> searcher.getTopReaderContext();
> > +    // return _memoryUsage = RamUsageEstimator.sizeOf(topReaderContext,
> new
> > +    // ClassNameFilter() {
> > +    // @Override
> > +    // public boolean include(String className) {
> > +    // if
> (className.startsWith("org.apache.blur.index.ExitableReader")) {
> > +    // return true;
> > +    // } else if (className.startsWith("org.apache.blur.")) {
> > +    // // System.out.println("className [" + className + "]");
> > +    // return false;
> > +    // }
> > +    // return true;
> > +    // }
> > +    // });
> > +    // } finally {
> > +    // searcher.close();
> > +    // _lastMemoryCheck = System.currentTimeMillis();
> > +    // }
> >    }
> >
> >    public long getSegmentCount() throws IOException {
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexNRTSimple.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexNRTSimple.java
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexNRTSimple.java
> > new file mode 100644
> > index 0000000..f35560a
> > --- /dev/null
> > +++
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexNRTSimple.java
> > @@ -0,0 +1,237 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements.  See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License.  You may obtain a copy of the License at
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +package org.apache.blur.manager.writer;
> > +
> > +import static
> org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
> > +
> > +import java.io.IOException;
> > +import java.util.List;
> > +import java.util.concurrent.ExecutorService;
> > +import java.util.concurrent.TimeUnit;
> > +import java.util.concurrent.atomic.AtomicBoolean;
> > +import java.util.concurrent.atomic.AtomicReference;
> > +
> > +import org.apache.blur.analysis.FieldManager;
> > +import org.apache.blur.index.ExitableReader;
> > +import org.apache.blur.log.Log;
> > +import org.apache.blur.log.LogFactory;
> > +import org.apache.blur.lucene.codec.Blur022Codec;
> > +import
> org.apache.blur.lucene.store.refcounter.DirectoryReferenceCounter;
> > +import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
> > +import org.apache.blur.lucene.warmup.TraceableDirectory;
> > +import org.apache.blur.server.IndexSearcherClosable;
> > +import org.apache.blur.server.ShardContext;
> > +import org.apache.blur.server.TableContext;
> > +import org.apache.blur.thrift.generated.Row;
> > +import org.apache.hadoop.io.IOUtils;
> > +import org.apache.lucene.analysis.Analyzer;
> > +import org.apache.lucene.document.Field;
> > +import org.apache.lucene.index.BlurIndexWriter;
> > +import org.apache.lucene.index.DirectoryReader;
> > +import org.apache.lucene.index.IndexReader;
> > +import org.apache.lucene.index.IndexWriterConfig;
> > +import org.apache.lucene.index.TieredMergePolicy;
> > +import org.apache.lucene.store.Directory;
> > +
> > +public class BlurIndexNRTSimple extends BlurIndex {
> > +
> > +  private static final Log LOG =
> LogFactory.getLog(BlurIndexNRTSimple.class);
> > +
> > +  private final AtomicBoolean _isClosed = new AtomicBoolean();
> > +  private final BlurIndexCloser _indexCloser;
> > +  private final AtomicReference<DirectoryReader> _indexReader = new
> AtomicReference<DirectoryReader>();
> > +  private final ExecutorService _searchThreadPool;
> > +  private final Directory _directory;
> > +  private final Thread _writerOpener;
> > +  private final IndexWriterConfig _conf;
> > +  private final TableContext _tableContext;
> > +  private final FieldManager _fieldManager;
> > +  private final BlurIndexRefresher _refresher;
> > +  private final ShardContext _shardContext;
> > +  private final AtomicReference<BlurIndexWriter> _writer = new
> AtomicReference<BlurIndexWriter>();
> > +  private final boolean _makeReaderExitable = true;
> > +
> > +  public BlurIndexNRTSimple(ShardContext shardContext, Directory
> directory, SharedMergeScheduler mergeScheduler,
> > +      DirectoryReferenceFileGC gc, final ExecutorService
> searchExecutor, BlurIndexCloser indexCloser,
> > +      BlurIndexRefresher refresher) throws IOException {
> > +    super(shardContext, directory, mergeScheduler, gc, searchExecutor,
> indexCloser, refresher);
> > +    _searchThreadPool = searchExecutor;
> > +    _shardContext = shardContext;
> > +    _tableContext = _shardContext.getTableContext();
> > +    _fieldManager = _tableContext.getFieldManager();
> > +    Analyzer analyzer = _fieldManager.getAnalyzerForIndex();
> > +    _conf = new IndexWriterConfig(LUCENE_VERSION, analyzer);
> > +    _conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
> > +    _conf.setCodec(new
> Blur022Codec(_tableContext.getBlurConfiguration()));
> > +    _conf.setSimilarity(_tableContext.getSimilarity());
> > +    AtomicBoolean stop = new AtomicBoolean();
> > +    _conf.setMergedSegmentWarmer(new FieldBasedWarmer(shardContext,
> stop, _isClosed));
> > +    TieredMergePolicy mergePolicy = (TieredMergePolicy)
> _conf.getMergePolicy();
> > +    mergePolicy.setUseCompoundFile(false);
> > +    _conf.setMergeScheduler(mergeScheduler.getMergeScheduler());
> > +
> > +    if (!DirectoryReader.indexExists(directory)) {
> > +      new BlurIndexWriter(directory, _conf).close();
> > +    }
> > +    DirectoryReferenceCounter referenceCounter = new
> DirectoryReferenceCounter(directory, gc);
> > +    // This directory allows for warm up by adding tracing ability.
> > +    TraceableDirectory dir = new TraceableDirectory(referenceCounter);
> > +    _directory = dir;
> > +
> > +    // _directory = directory;
> > +
> > +    _indexCloser = indexCloser;
> > +    _indexReader.set(wrap(DirectoryReader.open(_directory)));
> > +    _refresher = refresher;
> > +
> > +    _writerOpener = getWriterOpener(shardContext);
> > +    _writerOpener.start();
> > +    _refresher.register(this);
> > +  }
> > +
> > +  private DirectoryReader wrap(DirectoryReader reader) {
> > +    if (_makeReaderExitable) {
> > +      reader = new ExitableReader(reader);
> > +    }
> > +    return reader;
> > +  }
> > +
> > +  private Thread getWriterOpener(ShardContext shardContext) {
> > +    Thread thread = new Thread(new Runnable() {
> > +      @Override
> > +      public void run() {
> > +        try {
> > +          _writer.set(new BlurIndexWriter(_directory, _conf.clone()));
> > +          synchronized (_writer) {
> > +            _writer.notify();
> > +          }
> > +        } catch (IOException e) {
> > +          LOG.error("Unknown error on index writer open.", e);
> > +        }
> > +      }
> > +    });
> > +    thread.setName("Writer Opener for Table [" +
> shardContext.getTableContext().getTable() + "] Shard ["
> > +        + shardContext.getShard() + "]");
> > +    thread.setDaemon(true);
> > +    return thread;
> > +  }
> > +
> > +  @Override
> > +  public IndexSearcherClosable getIndexSearcher() throws IOException {
> > +    final IndexReader indexReader = _indexReader.get();
> > +    while (!indexReader.tryIncRef()) {
> > +      // keep trying to increment the ref
> > +    }
> > +    return new IndexSearcherClosable(indexReader, _searchThreadPool) {
> > +      @Override
> > +      public Directory getDirectory() {
> > +        return _directory;
> > +      }
> > +
> > +      @Override
> > +      public void close() throws IOException {
> > +        indexReader.decRef();
> > +      }
> > +    };
> > +  }
> > +
> > +  @Override
> > +  public void replaceRow(boolean waitToBeVisible, boolean wal, Row row)
> throws IOException {
> > +    waitUntilNotNull(_writer);
> > +    BlurIndexWriter writer = _writer.get();
> > +    List<List<Field>> docs = TransactionRecorder.getDocs(row,
> _fieldManager);
> > +
>  writer.updateDocuments(TransactionRecorder.createRowId(row.getId()), docs);
> > +    waitToBeVisible(waitToBeVisible);
> > +  }
> > +
> > +  @Override
> > +  public void deleteRow(boolean waitToBeVisible, boolean wal, String
> rowId) throws IOException {
> > +    waitUntilNotNull(_writer);
> > +    BlurIndexWriter writer = _writer.get();
> > +    writer.deleteDocuments(TransactionRecorder.createRowId(rowId));
> > +    waitToBeVisible(waitToBeVisible);
> > +  }
> > +
> > +  private void waitUntilNotNull(AtomicReference<?> ref) {
> > +    while (true) {
> > +      Object object = ref.get();
> > +      if (object != null) {
> > +        return;
> > +      }
> > +      synchronized (ref) {
> > +        try {
> > +          ref.wait(TimeUnit.SECONDS.toMillis(1));
> > +        } catch (InterruptedException e) {
> > +          return;
> > +        }
> > +      }
> > +    }
> > +  }
> > +
> > +  @Override
> > +  public void close() throws IOException {
> > +    _isClosed.set(true);
> > +    IOUtils.cleanup(LOG, _writer.get());
> > +    IOUtils.cleanup(LOG, _indexReader.get());
> > +  }
> > +
> > +  @Override
> > +  public void refresh() throws IOException {
> > +    DirectoryReader currentReader = _indexReader.get();
> > +    DirectoryReader newReader =
> DirectoryReader.openIfChanged(currentReader);
> > +    if (newReader != null) {
> > +      LOG.info("Refreshing index for table [{0}] shard [{1}].",
> _tableContext.getTable(), _shardContext.getShard());
> > +      _indexReader.set(wrap(newReader));
> > +      _indexCloser.close(currentReader);
> > +    }
> > +  }
> > +
> > +  @Override
> > +  public AtomicBoolean isClosed() {
> > +    return _isClosed;
> > +  }
> > +
> > +  @Override
> > +  public void optimize(int numberOfSegmentsPerShard) throws IOException
> {
> > +    throw new RuntimeException("not impl");
> > +  }
> > +
> > +  @Override
> > +  public void createSnapshot(String name) throws IOException {
> > +    throw new RuntimeException("not impl");
> > +  }
> > +
> > +  @Override
> > +  public void removeSnapshot(String name) throws IOException {
> > +    throw new RuntimeException("not impl");
> > +  }
> > +
> > +  @Override
> > +  public List<String> getSnapshots() throws IOException {
> > +    throw new RuntimeException("not impl");
> > +  }
> > +
> > +  private void waitToBeVisible(boolean waitToBeVisible) throws
> IOException {
> > +    if (waitToBeVisible) {
> > +      waitUntilNotNull(_writer);
> > +      BlurIndexWriter writer = _writer.get();
> > +      writer.commit();
> > +      refresh();
> > +    }
> > +  }
> > +
> > +}
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> > index ddd82e1..e8a3c32 100644
> > ---
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> > +++
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> > @@ -27,7 +27,8 @@ public class BlurIndexReadOnly extends BlurIndex {
> >
> >    private final BlurIndex _blurIndex;
> >
> > -  public BlurIndexReadOnly(BlurIndex blurIndex) {
> > +  public BlurIndexReadOnly(BlurIndex blurIndex) throws IOException {
> > +    super(null, null, null, null, null, null, null);
> >      _blurIndex = blurIndex;
> >    }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
> > index 13effa3..db5301a 100644
> > ---
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
> > +++
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
> > @@ -20,6 +20,7 @@ import static
> org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
> >
> >  import java.io.IOException;
> >  import java.util.List;
> > +import java.util.concurrent.ExecutorService;
> >  import java.util.concurrent.TimeUnit;
> >  import java.util.concurrent.atomic.AtomicBoolean;
> >  import java.util.concurrent.atomic.AtomicReference;
> > @@ -27,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
> >  import org.apache.blur.log.Log;
> >  import org.apache.blur.log.LogFactory;
> >  import org.apache.blur.lucene.codec.Blur022Codec;
> > +import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
> >  import org.apache.blur.lucene.warmup.TraceableDirectory;
> >  import org.apache.blur.server.IndexSearcherClosable;
> >  import org.apache.blur.server.ShardContext;
> > @@ -50,15 +52,17 @@ public class BlurIndexReader extends BlurIndex {
> >    private BlurIndexRefresher _refresher;
> >    private final TableContext _tableContext;
> >    private final ShardContext _shardContext;
> > -
> > -  public BlurIndexReader(ShardContext shardContext, Directory
> directory, BlurIndexRefresher refresher,
> > -      BlurIndexCloser closer) throws IOException {
> > +
> > +  public BlurIndexReader(ShardContext shardContext, Directory
> directory, SharedMergeScheduler mergeScheduler,
> > +      DirectoryReferenceFileGC gc, final ExecutorService
> searchExecutor, BlurIndexCloser indexCloser,
> > +      BlurIndexRefresher refresher) throws IOException {
> > +    super(shardContext, directory, mergeScheduler, gc, searchExecutor,
> indexCloser, refresher);
> >      _tableContext = shardContext.getTableContext();
> >      // This directory allows for warm up by adding tracing ability.
> >      _directory = new TraceableDirectory(directory);
> >      _shardContext = shardContext;
> >      _refresher = refresher;
> > -    _closer = closer;
> > +    _closer = indexCloser;
> >
> >      _open.set(true);
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
> > index a74678b..2b9d38e 100644
> > ---
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
> > +++
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
> > @@ -95,8 +95,10 @@ public class BlurNRTIndex extends BlurIndex {
> >    private final ReadWriteLock _lock = new ReentrantReadWriteLock();
> >    private long _lastRefresh = 0;
> >
> > -  public BlurNRTIndex(ShardContext shardContext, SharedMergeScheduler
> mergeScheduler, Directory directory,
> > -      DirectoryReferenceFileGC gc, final ExecutorService
> searchExecutor) throws IOException {
> > +  public BlurNRTIndex(ShardContext shardContext, Directory directory,
> SharedMergeScheduler mergeScheduler,
> > +      DirectoryReferenceFileGC gc, final ExecutorService
> searchExecutor, BlurIndexCloser indexCloser,
> > +      BlurIndexRefresher refresher) throws IOException {
> > +    super(shardContext, directory, mergeScheduler, gc, searchExecutor,
> indexCloser, refresher);
> >      _tableContext = shardContext.getTableContext();
> >      _directory = directory;
> >      _shardContext = shardContext;
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/main/java/org/apache/blur/server/TableContext.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/server/TableContext.java
> b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
> > index 3256404..0102a70 100644
> > --- a/blur-core/src/main/java/org/apache/blur/server/TableContext.java
> > +++ b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
> > @@ -17,6 +17,7 @@ package org.apache.blur.server;
> >   * limitations under the License.
> >   */
> >  import static org.apache.blur.utils.BlurConstants.BLUR_FIELDTYPE;
> > +import static
> org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLURINDEX_CLASS;
> >  import static
> org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_DELETION_POLICY_MAXAGE;
> >  import static
> org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_SIMILARITY;
> >  import static
> org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_COMMITS;
> > @@ -24,11 +25,14 @@ import static
> org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_REFRES
> >  import static org.apache.blur.utils.BlurConstants.SUPER;
> >
> >  import java.io.IOException;
> > +import java.lang.reflect.Constructor;
> > +import java.lang.reflect.InvocationTargetException;
> >  import java.util.HashMap;
> >  import java.util.Map;
> >  import java.util.Map.Entry;
> >  import java.util.Set;
> >  import java.util.concurrent.ConcurrentHashMap;
> > +import java.util.concurrent.ExecutorService;
> >  import java.util.concurrent.TimeUnit;
> >
> >  import org.apache.blur.BlurConfiguration;
> > @@ -39,6 +43,12 @@ import
> org.apache.blur.analysis.NoStopWordStandardAnalyzer;
> >  import org.apache.blur.log.Log;
> >  import org.apache.blur.log.LogFactory;
> >  import org.apache.blur.lucene.search.FairSimilarity;
> > +import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
> > +import org.apache.blur.manager.writer.BlurIndex;
> > +import org.apache.blur.manager.writer.BlurIndexCloser;
> > +import org.apache.blur.manager.writer.BlurIndexRefresher;
> > +import org.apache.blur.manager.writer.BlurNRTIndex;
> > +import org.apache.blur.manager.writer.SharedMergeScheduler;
> >  import org.apache.blur.thrift.generated.ScoreType;
> >  import org.apache.blur.thrift.generated.TableDescriptor;
> >  import org.apache.blur.utils.BlurConstants;
> > @@ -49,6 +59,7 @@ import org.apache.lucene.index.IndexDeletionPolicy;
> >  import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
> >  import org.apache.lucene.index.Term;
> >  import org.apache.lucene.search.similarities.Similarity;
> > +import org.apache.lucene.store.Directory;
> >
> >  public class TableContext {
> >
> > @@ -279,4 +290,42 @@ public class TableContext {
> >    public static void setSystemBlurConfiguration(BlurConfiguration
> systemBlurConfiguration) {
> >      TableContext.systemBlurConfiguration = systemBlurConfiguration;
> >    }
> > +
> > +  @SuppressWarnings("unchecked")
> > +  public BlurIndex newInstanceBlurIndex(ShardContext shardContext,
> Directory dir, SharedMergeScheduler mergeScheduler,
> > +      DirectoryReferenceFileGC gc, ExecutorService searchExecutor,
> BlurIndexCloser indexCloser,
> > +      BlurIndexRefresher refresher) throws IOException {
> > +
> > +    String className =
> blurConfiguration.get(BLUR_SHARD_BLURINDEX_CLASS,
> BlurNRTIndex.class.getName());
> > +
> > +    Class<? extends BlurIndex> clazz;
> > +    try {
> > +      clazz = (Class<? extends BlurIndex>) Class.forName(className);
> > +    } catch (ClassNotFoundException e) {
> > +      throw new IOException(e);
> > +    }
> > +    Constructor<? extends BlurIndex> constructor =
> findConstructor(clazz);
> > +    try {
> > +      return constructor.newInstance(shardContext, dir, mergeScheduler,
> gc, searchExecutor, indexCloser, refresher);
> > +    } catch (InstantiationException e) {
> > +      throw new IOException(e);
> > +    } catch (IllegalAccessException e) {
> > +      throw new IOException(e);
> > +    } catch (IllegalArgumentException e) {
> > +      throw new IOException(e);
> > +    } catch (InvocationTargetException e) {
> > +      throw new IOException(e);
> > +    }
> > +  }
> > +
> > +  private Constructor<? extends BlurIndex> findConstructor(Class<?
> extends BlurIndex> clazz) throws IOException {
> > +    try {
> > +      return clazz.getConstructor(new Class[] { ShardContext.class,
> Directory.class, SharedMergeScheduler.class,
> > +          DirectoryReferenceFileGC.class, ExecutorService.class,
> BlurIndexCloser.class, BlurIndexRefresher.class });
> > +    } catch (NoSuchMethodException e) {
> > +      throw new IOException(e);
> > +    } catch (SecurityException e) {
> > +      throw new IOException(e);
> > +    }
> > +  }
> >  }
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java
> b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java
> > new file mode 100644
> > index 0000000..cd528cb
> > --- /dev/null
> > +++
> b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexNRTSimpleTest.java
> > @@ -0,0 +1,242 @@
> > +package org.apache.blur.manager.writer;
> > +
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements.  See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License.  You may obtain a copy of the License at
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +import static org.junit.Assert.assertEquals;
> > +
> > +import java.io.File;
> > +import java.io.IOException;
> > +import java.util.Random;
> > +import java.util.UUID;
> > +import java.util.concurrent.ExecutorService;
> > +
> > +import org.apache.blur.concurrent.Executors;
> > +import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
> > +import org.apache.blur.server.IndexSearcherClosable;
> > +import org.apache.blur.server.ShardContext;
> > +import org.apache.blur.server.TableContext;
> > +import org.apache.blur.thrift.generated.Column;
> > +import org.apache.blur.thrift.generated.Record;
> > +import org.apache.blur.thrift.generated.Row;
> > +import org.apache.blur.thrift.generated.TableDescriptor;
> > +import org.apache.hadoop.conf.Configuration;
> > +import org.apache.lucene.index.IndexReader;
> > +import org.apache.lucene.store.FSDirectory;
> > +import org.junit.After;
> > +import org.junit.Before;
> > +import org.junit.Test;
> > +
> > +public class BlurIndexNRTSimpleTest {
> > +
> > +  private static final int TEST_NUMBER_WAIT_VISIBLE = 500;
> > +  private static final int TEST_NUMBER = 50000;
> > +
> > +  private static final File TMPDIR = new File("./target/tmp");
> > +
> > +  private BlurIndexNRTSimple writer;
> > +  private Random random = new Random();
> > +  private ExecutorService service;
> > +  private File base;
> > +  private Configuration configuration;
> > +
> > +  private DirectoryReferenceFileGC gc;
> > +  private SharedMergeScheduler mergeScheduler;
> > +  private String uuid;
> > +  private BlurIndexRefresher _refresher;
> > +  private BlurIndexCloser _closer;
> > +
> > +  @Before
> > +  public void setup() throws IOException {
> > +    TableContext.clear();
> > +    base = new File(TMPDIR, "blur-index-writer-test");
> > +    rm(base);
> > +    base.mkdirs();
> > +
> > +    mergeScheduler = new SharedMergeScheduler(1);
> > +    gc = new DirectoryReferenceFileGC();
> > +
> > +    configuration = new Configuration();
> > +    service = Executors.newThreadPool("test", 10);
> > +    _refresher = new BlurIndexRefresher();
> > +    _closer = new BlurIndexCloser();
> > +  }
> > +
> > +  private void setupWriter(Configuration configuration, long refresh,
> boolean reload) throws IOException {
> > +    TableDescriptor tableDescriptor = new TableDescriptor();
> > +    tableDescriptor.setName("test-table");
> > +    /*
> > +     * if reload is set to true...we create a new writer instance
> pointing
> > +     * to the same location as the old one.....
> > +     * so previous writer instances should be closed
> > +     */
> > +
> > +    if (!reload && uuid == null) {
> > +      uuid = UUID.randomUUID().toString();
> > +    }
> > +
> > +    tableDescriptor.setTableUri(new File(base, "table-store-" +
> uuid).toURI().toString());
> > +
>  tableDescriptor.putToTableProperties("blur.shard.time.between.refreshs",
> Long.toString(refresh));
> > +
> > +    TableContext tableContext = TableContext.create(tableDescriptor);
> > +    File path = new File(base, "index_" + uuid);
> > +    path.mkdirs();
> > +    FSDirectory directory = FSDirectory.open(path);
> > +    ShardContext shardContext = ShardContext.create(tableContext,
> "test-shard-" + uuid);
> > +    writer = new BlurIndexNRTSimple(shardContext, directory,
> mergeScheduler, gc, service, _closer, _refresher);
> > +  }
> > +
> > +  @After
> > +  public void tearDown() throws IOException {
> > +    _refresher.close();
> > +    writer.close();
> > +    mergeScheduler.close();
> > +    gc.close();
> > +    service.shutdownNow();
> > +    rm(base);
> > +  }
> > +
> > +  private void rm(File file) {
> > +    if (!file.exists()) {
> > +      return;
> > +    }
> > +    if (file.isDirectory()) {
> > +      for (File f : file.listFiles()) {
> > +        rm(f);
> > +      }
> > +    }
> > +    file.delete();
> > +  }
> > +
> > +  @Test
> > +  public void testBlurIndexWriter() throws IOException {
> > +    setupWriter(configuration, 5, false);
> > +    long s = System.nanoTime();
> > +    int total = 0;
> > +    for (int i = 0; i < TEST_NUMBER_WAIT_VISIBLE; i++) {
> > +      writer.replaceRow(true, true, genRow());
> > +      IndexSearcherClosable searcher = writer.getIndexSearcher();
> > +      IndexReader reader = searcher.getIndexReader();
> > +      assertEquals(i + 1, reader.numDocs());
> > +      searcher.close();
> > +      total++;
> > +    }
> > +    long e = System.nanoTime();
> > +    double seconds = (e - s) / 1000000000.0;
> > +    double rate = total / seconds;
> > +    System.out.println("Rate " + rate);
> > +    IndexSearcherClosable searcher = writer.getIndexSearcher();
> > +    IndexReader reader = searcher.getIndexReader();
> > +    assertEquals(TEST_NUMBER_WAIT_VISIBLE, reader.numDocs());
> > +    searcher.close();
> > +  }
> > +
> > +  @Test
> > +  public void testBlurIndexWriterFaster() throws IOException,
> InterruptedException {
> > +    setupWriter(configuration, 100, false);
> > +    IndexSearcherClosable searcher1 = writer.getIndexSearcher();
> > +    IndexReader reader1 = searcher1.getIndexReader();
> > +    assertEquals(0, reader1.numDocs());
> > +    searcher1.close();
> > +    long s = System.nanoTime();
> > +    int total = 0;
> > +    for (int i = 0; i < TEST_NUMBER; i++) {
> > +      if (i == TEST_NUMBER - 1) {
> > +        writer.replaceRow(true, true, genRow());
> > +      } else {
> > +        writer.replaceRow(false, true, genRow());
> > +      }
> > +      total++;
> > +    }
> > +    long e = System.nanoTime();
> > +    double seconds = (e - s) / 1000000000.0;
> > +    double rate = total / seconds;
> > +    System.out.println("Rate " + rate);
> > +    // //wait one second for the data to become visible the test is set
> to
> > +    // refresh once every 25 ms
> > +    // Thread.sleep(1000);
> > +    writer.refresh();
> > +    IndexSearcherClosable searcher2 = writer.getIndexSearcher();
> > +    IndexReader reader2 = searcher2.getIndexReader();
> > +    assertEquals(TEST_NUMBER, reader2.numDocs());
> > +    searcher2.close();
> > +  }
> > +
> > +  private Row genRow() {
> > +    Row row = new Row();
> > +    row.setId(Long.toString(random.nextLong()));
> > +    Record record = new Record();
> > +    record.setFamily("testing");
> > +    record.setRecordId(Long.toString(random.nextLong()));
> > +    for (int i = 0; i < 10; i++) {
> > +      record.addToColumns(new Column("col" + i,
> Long.toString(random.nextLong())));
> > +    }
> > +    row.addToRecords(record);
> > +    return row;
> > +  }
> > +
> > +//  @Test
> > +//  public void testCreateSnapshot() throws IOException {
> > +//    setupWriter(configuration, 5, false);
> > +//    writer.createSnapshot("test_snapshot");
> > +//    assertTrue(writer.getSnapshots().contains("test_snapshot"));
> > +//
> > +//    // check that the file is persisted
> > +//    Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
> > +//    FileSystem fileSystem = snapshotsDirPath.getFileSystem(new
> Configuration());
> > +//    Path snapshotFilePath = new Path(snapshotsDirPath,
> "test_snapshot");
> > +//    assertTrue(fileSystem.exists(snapshotFilePath));
> > +//
> > +//    // create a new writer instance and test whether the snapshots
> are loaded properly
> > +//    writer.close();
> > +//    setupWriter(configuration, 5, true);
> > +//    assertTrue(writer.getSnapshots().contains("test_snapshot"));
> > +//  }
> > +//
> > +//
> > +//  @Test
> > +//  public void testRemoveSnapshots() throws IOException {
> > +//    setupWriter(configuration, 5, false);
> > +//    Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
> > +//    FileSystem fileSystem = snapshotsDirPath.getFileSystem(new
> Configuration());
> > +//    fileSystem.mkdirs(snapshotsDirPath);
> > +//
> > +//    // create 2 files in snapshots sub-dir
> > +//    Path snapshotFile1 = new Path(snapshotsDirPath, "test_snapshot1");
> > +//    Path snapshotFile2 = new Path(snapshotsDirPath, "test_snapshot2");
> > +//
> > +//    BufferedWriter br1 = new BufferedWriter(new
> OutputStreamWriter(fileSystem.create(snapshotFile1, true)));
> > +//    br1.write("segments_1");
> > +//    br1.close();
> > +//
> > +//    BufferedWriter br2 = new BufferedWriter(new
> OutputStreamWriter(fileSystem.create(snapshotFile2, true)));
> > +//    br2.write("segments_1");
> > +//    br2.close();
> > +//
> > +//    // re-load the writer to load the snpshots
> > +//    writer.close();
> > +//    setupWriter(configuration, 5, true);
> > +//    assertEquals(writer.getSnapshots().size(), 2);
> > +//
> > +//
> > +//    writer.removeSnapshot("test_snapshot2");
> > +//    assertEquals(writer.getSnapshots().size(), 1);
> > +//    assertTrue(!writer.getSnapshots().contains("test_snapshot2"));
> > +//    assertTrue(!fileSystem.exists(snapshotFile2));
> > +//
> > +//  }
> > +}
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
> b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
> > index 01762b1..cb7d649 100644
> > ---
> a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
> > +++
> b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
> > @@ -90,7 +90,7 @@ public class BlurIndexReaderTest {
> >      ShardContext shardContext = ShardContext.create(tableContext,
> "test-shard");
> >      refresher = new BlurIndexRefresher();
> >      indexCloser = new BlurIndexCloser();
> > -    reader = new BlurIndexReader(shardContext, directory, refresher,
> indexCloser);
> > +    reader = new BlurIndexReader(shardContext, directory, null, null,
> null, indexCloser, refresher);
> >    }
> >
> >    @After
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
> b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
> > index 5f40fac..9f2ffc3 100644
> > ---
> a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
> > +++
> b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
> > @@ -81,15 +81,15 @@ public class BlurNRTIndexTest {
> >      TableDescriptor tableDescriptor = new TableDescriptor();
> >      tableDescriptor.setName("test-table");
> >      /*
> > -     * if reload is set to true...we create a new writer instance
> pointing
> > -     * to the same location as the old one.....
> > -     * so previous writer instances should be closed
> > +     * if reload is set to true...we create a new writer instance
> pointing to
> > +     * the same location as the old one..... so previous writer
> instances should
> > +     * be closed
> >       */
> > -
> > +
> >      if (!reload && uuid == null) {
> >        uuid = UUID.randomUUID().toString();
> >      }
> > -
> > +
> >      tableDescriptor.setTableUri(new File(base, "table-store-" +
> uuid).toURI().toString());
> >
>  tableDescriptor.putToTableProperties("blur.shard.time.between.refreshs",
> Long.toString(refresh));
> >
> > @@ -98,7 +98,7 @@ public class BlurNRTIndexTest {
> >      path.mkdirs();
> >      FSDirectory directory = FSDirectory.open(path);
> >      ShardContext shardContext = ShardContext.create(tableContext,
> "test-shard-" + uuid);
> > -    writer = new BlurNRTIndex(shardContext, mergeScheduler, directory,
> gc, service);
> > +    writer = new BlurNRTIndex(shardContext, directory, mergeScheduler,
> gc, service, null, null);
> >    }
> >
> >    @After
> > @@ -194,45 +194,44 @@ public class BlurNRTIndexTest {
> >      setupWriter(configuration, 5, false);
> >      writer.createSnapshot("test_snapshot");
> >      assertTrue(writer.getSnapshots().contains("test_snapshot"));
> > -
> > +
> >      // check that the file is persisted
> >      Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
> >      FileSystem fileSystem = snapshotsDirPath.getFileSystem(new
> Configuration());
> >      Path snapshotFilePath = new Path(snapshotsDirPath, "test_snapshot");
> >      assertTrue(fileSystem.exists(snapshotFilePath));
> > -
> > -    // create a new writer instance and test whether the snapshots are
> loaded properly
> > +
> > +    // create a new writer instance and test whether the snapshots are
> loaded
> > +    // properly
> >      writer.close();
> >      setupWriter(configuration, 5, true);
> >      assertTrue(writer.getSnapshots().contains("test_snapshot"));
> >    }
> > -
> > -
> > +
> >    @Test
> >    public void testRemoveSnapshots() throws IOException {
> >      setupWriter(configuration, 5, false);
> >      Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
> >      FileSystem fileSystem = snapshotsDirPath.getFileSystem(new
> Configuration());
> >      fileSystem.mkdirs(snapshotsDirPath);
> > -
> > +
> >      // create 2 files in snapshots sub-dir
> >      Path snapshotFile1 = new Path(snapshotsDirPath, "test_snapshot1");
> >      Path snapshotFile2 = new Path(snapshotsDirPath, "test_snapshot2");
> > -
> > +
> >      BufferedWriter br1 = new BufferedWriter(new
> OutputStreamWriter(fileSystem.create(snapshotFile1, true)));
> >      br1.write("segments_1");
> >      br1.close();
> > -
> > +
> >      BufferedWriter br2 = new BufferedWriter(new
> OutputStreamWriter(fileSystem.create(snapshotFile2, true)));
> >      br2.write("segments_1");
> >      br2.close();
> > -
> > +
> >      // re-load the writer to load the snpshots
> >      writer.close();
> >      setupWriter(configuration, 5, true);
> >      assertEquals(writer.getSnapshots().size(), 2);
> > -
> > -
> > +
> >      writer.removeSnapshot("test_snapshot2");
> >      assertEquals(writer.getSnapshots().size(), 1);
> >      assertTrue(!writer.getSnapshots().contains("test_snapshot2"));
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3dc5b842/blur-util/src/main/resources/blur-default.properties
> > ----------------------------------------------------------------------
> > diff --git a/blur-util/src/main/resources/blur-default.properties
> b/blur-util/src/main/resources/blur-default.properties
> > index 72bf66d..1876519 100644
> > --- a/blur-util/src/main/resources/blur-default.properties
> > +++ b/blur-util/src/main/resources/blur-default.properties
> > @@ -181,6 +181,9 @@ blur.gui.shard.port=40090
> >  # To intercept the calls made to the shard server and perform server
> side changes to the calls extend org.apache.blur.server.FilteredBlurServer.
> >  blur.shard.filtered.server.class=
> >
> > +# Defines the blur index class to be used to handle index requests.
>  This class has to extend org.apache.blur.manager.writer.BlurIndex.  This
> can be defined globally as well as per table.
> > +blur.shard.blurindex.class=
> > +
> >
> >  ### Controller Server Configuration
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message