incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [8/8] git commit: A bunch of changes to deal with near real time updates. Removed a lot of legacy code and the need for the BlurIndexCloser in NRT updates. There is a still a problem with files not being deleted inside the DirRefFileGC this is due to s
Date Sun, 03 Mar 2013 19:37:03 GMT
Updated Branches:
  refs/heads/0.2-dev 484edcdde -> 1373ccec0


A bunch of changes to deal with near real time updates.  Removed a lot of legacy code and
the need for the BlurIndexCloser in NRT updates.  There is a still a problem with files not
being deleted inside the DirRefFileGC this is due to some issues in Lucene 4.0 not closing
clones of inputs correctly.  I am going to updates to 4.1 and retest.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/1373ccec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/1373ccec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/1373ccec

Branch: refs/heads/0.2-dev
Commit: 1373ccec01653ba082c322640bf9fd72a8e34d9d
Parents: cf92e0f
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sun Mar 3 14:34:29 2013 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun Mar 3 14:34:29 2013 -0500

----------------------------------------------------------------------
 .../manager/indexserver/AbstractIndexServer.java   |   37 +----
 .../indexserver/DistributedIndexServer.java        |   82 +---------
 .../blur/manager/writer/AbstractBlurIndex.java     |   11 +-
 .../org/apache/blur/manager/writer/BlurIndex.java  |    4 +-
 .../blur/manager/writer/BlurIndexCloser.java       |    6 +-
 .../apache/blur/manager/writer/BlurNRTIndex.java   |  127 +++++++--------
 .../java/org/apache/blur/server/BlurServer.java    |   28 ++--
 .../apache/blur/server/IndexSearcherClosable.java  |   26 +++
 .../java/org/apache/blur/server/SessionInfo.java   |   44 ++---
 .../org/apache/blur/server/SessionManager.java     |   54 ++++++
 .../org/apache/blur/thrift/ThriftBlurServer.java   |    3 -
 .../refcounter/DirectoryReferenceCounter.java      |    7 +-
 .../store/refcounter/DirectoryReferenceFileGC.java |    2 +-
 13 files changed, 201 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1373ccec/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/AbstractIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/AbstractIndexServer.java
b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/AbstractIndexServer.java
index b9e0e6b..af46e92 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/AbstractIndexServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/AbstractIndexServer.java
@@ -21,11 +21,7 @@ import java.util.Map;
 
 import org.apache.blur.manager.IndexServer;
 import org.apache.blur.manager.writer.BlurIndex;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.search.TopDocs;
+import org.apache.blur.server.IndexSearcherClosable;
 
 public abstract class AbstractIndexServer implements IndexServer {
 
@@ -33,13 +29,13 @@ public abstract class AbstractIndexServer implements IndexServer {
     long recordCount = 0;
     Map<String, BlurIndex> indexes = getIndexes(table);
     for (Map.Entry<String, BlurIndex> index : indexes.entrySet()) {
-      IndexReader indexReader = null;
+      IndexSearcherClosable indexSearcher = null;
       try {
-        indexReader = index.getValue().getIndexReader();
-        recordCount += indexReader.numDocs();
+        indexSearcher = index.getValue().getIndexReader();
+        recordCount += indexSearcher.getIndexReader().numDocs();
       } finally {
-        if (indexReader != null) {
-          indexReader.decRef();
+        if (indexSearcher != null) {
+          indexSearcher.close();
         }
       }
     }
@@ -47,25 +43,6 @@ public abstract class AbstractIndexServer implements IndexServer {
   }
 
   public long getRowCount(String table) throws IOException {
-    long rowCount = 0;
-    Map<String, BlurIndex> indexes = getIndexes(table);
-    for (Map.Entry<String, BlurIndex> index : indexes.entrySet()) {
-      IndexReader indexReader = null;
-      try {
-        indexReader = index.getValue().getIndexReader();
-        rowCount += getRowCount(indexReader);
-      } finally {
-        if (indexReader != null) {
-          indexReader.decRef();
-        }
-      }
-    }
-    return rowCount;
-  }
-
-  private long getRowCount(IndexReader indexReader) throws IOException {
-    IndexSearcher searcher = new IndexSearcher(indexReader);
-    TopDocs topDocs = searcher.search(new TermQuery(BlurConstants.PRIME_DOC_TERM), 1);
-    return topDocs.totalHits;
+    return -1;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1373ccec/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index 1771960..881f7f7 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -47,7 +47,6 @@ import org.apache.blur.manager.BlurFilterCache;
 import org.apache.blur.manager.clusterstatus.ClusterStatus;
 import org.apache.blur.manager.clusterstatus.ZookeeperPathConstants;
 import org.apache.blur.manager.writer.BlurIndex;
-import org.apache.blur.manager.writer.BlurIndexCloser;
 import org.apache.blur.manager.writer.BlurIndexReader;
 import org.apache.blur.manager.writer.BlurIndexRefresher;
 import org.apache.blur.manager.writer.BlurNRTIndex;
@@ -56,7 +55,6 @@ import org.apache.blur.metrics.BlurMetrics;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.server.ZooKeeperTypeManager;
-import org.apache.blur.store.blockcache.Cache;
 import org.apache.blur.store.hdfs.BlurLockFactory;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurConstants;
@@ -68,7 +66,6 @@ import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.search.similarities.Similarity;
 import org.apache.lucene.store.Directory;
 import org.apache.zookeeper.CreateMode;
@@ -95,7 +92,6 @@ public class DistributedIndexServer extends AbstractIndexServer {
   private String _nodeName;
   private int _shardOpenerThreadCount;
   private BlurIndexRefresher _refresher;
-  private Cache _cache;
   private BlurMetrics _blurMetrics;
   private ZooKeeper _zookeeper;
   private String _cluster;
@@ -105,11 +101,9 @@ public class DistributedIndexServer extends AbstractIndexServer {
   private Timer _timerTableWarmer;
 
   private ExecutorService _openerService;
-  private BlurIndexCloser _closer;
   private BlurFilterCache _filterCache;
   private AtomicBoolean _running = new AtomicBoolean();
   private long _safeModeDelay;
-  private BlurIndexWarmup _warmup = new DefaultBlurIndexWarmup();
   private DirectoryReferenceFileGC _gc;
   private WatchChildren _watchOnlineShards;
   private SharedMergeScheduler _mergeScheduler;
@@ -121,8 +115,6 @@ public class DistributedIndexServer extends AbstractIndexServer {
   public void init() throws KeeperException, InterruptedException, IOException {
     BlurUtil.setupZookeeper(_zookeeper, _cluster);
     _openerService = Executors.newThreadPool("shard-opener", _shardOpenerThreadCount);
-    _closer = new BlurIndexCloser();
-    _closer.init();
     _gc = new DirectoryReferenceFileGC();
     _gc.init();
     _mergeScheduler = new SharedMergeScheduler();
@@ -384,7 +376,6 @@ public class DistributedIndexServer extends AbstractIndexServer {
 
       _timerTableWarmer.purge();
       _timerTableWarmer.cancel();
-      _closer.close();
       _gc.close();
       _openerService.shutdownNow();
     }
@@ -456,10 +447,11 @@ public class DistributedIndexServer extends AbstractIndexServer {
     Path hdfsDirPath = shardContext.getHdfsDirPath();
 
     BlurLockFactory lockFactory = new BlurLockFactory(_configuration, hdfsDirPath, _nodeName,
BlurConstants.getPid());
-//    Directory directory = shardContext.getDirectory();
-//    directory.setLockFactory(lockFactory);
-//    Directory dir = new BlockDirectory(table + "_" + shard, directory, _cache);
-    
+    // Directory directory = shardContext.getDirectory();
+    // directory.setLockFactory(lockFactory);
+    // Directory dir = new BlockDirectory(table + "_" + shard, directory,
+    // _cache);
+
     Directory dir = shardContext.getDirectory();
     dir.setLockFactory(lockFactory);
 
@@ -468,7 +460,6 @@ public class DistributedIndexServer extends AbstractIndexServer {
       BlurIndexReader reader = new BlurIndexReader();
       reader.setContext(shardContext);
       reader.setDirectory(dir);
-      reader.setCloser(_closer);
       reader.setRefresher(_refresher);
       reader.init();
       index = reader;
@@ -476,7 +467,6 @@ public class DistributedIndexServer extends AbstractIndexServer {
       BlurNRTIndex writer = new BlurNRTIndex();
       writer.setContext(shardContext);
       writer.setDirectory(dir);
-      writer.setCloser(_closer);
       writer.setGc(_gc);
       writer.setMergeScheduler(_mergeScheduler);
       writer.init();
@@ -487,58 +477,10 @@ public class DistributedIndexServer extends AbstractIndexServer {
   }
 
   private BlurIndex warmUp(BlurIndex index, TableDescriptor table, String shard) throws IOException
{
-    final IndexReader reader = index.getIndexReader();
-    warmUpAllSegments(reader);
-    _warmup.warmBlurIndex(table, shard, reader, index.isClosed(), new ReleaseReader() {
-      @Override
-      public void release() throws IOException {
-        // this will allow for closing of index
-        reader.decRef();
-      }
-    });
-
+    LOG.warn("Do something about warmup");
     return index;
   }
 
-  private void warmUpAllSegments(IndexReader reader) throws IOException {
-    LOG.warn("Warm up NOT supported yet.");
-    // Once the reader warm-up has been re-implemented, this code will change
-    // accordingly.
-
-    // IndexReader[] indexReaders = reader.getSequentialSubReaders();
-    // if (indexReaders != null) {
-    // for (IndexReader r : indexReaders) {
-    // warmUpAllSegments(r);
-    // }
-    // }
-    // int maxDoc = reader.maxDoc();
-    // int numDocs = reader.numDocs();
-    // FieldInfos fieldInfos = ReaderUtil.getMergedFieldInfos(reader);
-    // Collection<String> fieldNames = new ArrayList<String>();
-    // for (FieldInfo fieldInfo : fieldInfos) {
-    // if (fieldInfo.isIndexed) {
-    // fieldNames.add(fieldInfo.name);
-    // }
-    // }
-    // int primeDocCount = reader.docFreq(BlurConstants.PRIME_DOC_TERM);
-    // TermDocs termDocs = reader.termDocs(BlurConstants.PRIME_DOC_TERM);
-    // termDocs.next();
-    // termDocs.close();
-    //
-    // TermPositions termPositions =
-    // reader.termPositions(BlurConstants.PRIME_DOC_TERM);
-    // if (termPositions.next()) {
-    // if (termPositions.freq() > 0) {
-    // termPositions.nextPosition();
-    // }
-    // }
-    // termPositions.close();
-    // LOG.info("Warmup of indexreader [" + reader + "] complete, maxDocs [" +
-    // maxDoc + "], numDocs [" + numDocs + "], primeDocumentCount [" +
-    // primeDocCount + "], fieldCount ["
-    // + fieldNames.size() + "]");
-  }
-
   private synchronized Map<String, BlurIndex> openMissingShards(final String table,
Set<String> shardsToServe, final Map<String, BlurIndex> tableIndexes) throws IOException
{
     LOG.debug("Opening missing shards for table [{0}]", table);
     LOG.debug("Getting table context for table [{0}]", table);
@@ -729,18 +671,10 @@ public class DistributedIndexServer extends AbstractIndexServer {
     _refresher = refresher;
   }
 
-  public void setCache(Cache cache) {
-    _cache = cache;
-  }
-
   public void setBlurMetrics(BlurMetrics blurMetrics) {
     _blurMetrics = blurMetrics;
   }
 
-  public void setCloser(BlurIndexCloser closer) {
-    _closer = closer;
-  }
-
   public void setZookeeper(ZooKeeper zookeeper) {
     _zookeeper = zookeeper;
   }
@@ -753,10 +687,6 @@ public class DistributedIndexServer extends AbstractIndexServer {
     _safeModeDelay = safeModeDelay;
   }
 
-  public void setWarmup(BlurIndexWarmup warmup) {
-    _warmup = warmup;
-  }
-
   public void setClusterName(String cluster) {
     _cluster = cluster;
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1373ccec/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
b/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
index d74f51d..da3cbbc 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
 import org.apache.lucene.analysis.core.KeywordAnalyzer;
@@ -31,6 +32,7 @@ import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.store.Directory;
 
 public abstract class AbstractBlurIndex extends BlurIndex {
@@ -73,10 +75,11 @@ public abstract class AbstractBlurIndex extends BlurIndex {
   }
 
   @Override
-  public IndexReader getIndexReader() throws IOException {
-    IndexReader indexReader = _indexReaderRef.get();
-    indexReader.incRef();
-    return indexReader;
+  public IndexSearcherClosable getIndexReader() throws IOException {
+    throw new RuntimeException("Not implemented");
+//    IndexReader indexReader = _indexReaderRef.get();
+//    indexReader.incRef();
+//    return indexReader;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1373ccec/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
index d7dce38..cd14aca 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
@@ -20,15 +20,15 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.thrift.generated.Document;
 import org.apache.blur.thrift.generated.Query;
 import org.apache.blur.thrift.generated.Term;
 import org.apache.blur.thrift.generated.UpdatePackage;
-import org.apache.lucene.index.IndexReader;
 
 public abstract class BlurIndex {
 
-  public abstract IndexReader getIndexReader() throws IOException;
+  public abstract IndexSearcherClosable getIndexReader() throws IOException;
 
   public abstract void close() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1373ccec/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexCloser.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexCloser.java
b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexCloser.java
index e096818..239f352 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexCloser.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexCloser.java
@@ -79,13 +79,13 @@ public class BlurIndexCloser implements Runnable {
     Iterator<IndexReader> it = readers.iterator();
     while (it.hasNext()) {
       IndexReader reader = it.next();
-      if (reader.getRefCount() == 1) {
+      if (reader.getRefCount() <= 2) {
+        //2 because the NRTManger inc before giving the inital reference out.
         it.remove();
         closeInternal(reader);
       } else {
-        LOG.debug("Could not close indexreader [" + reader + "] because of ref count [" +
reader.getRefCount() + "].");
+        LOG.info("Could not close indexreader [" + reader + "] because of ref count [" +
reader.getRefCount() + "].");
       }
-      closeInternal(reader);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1373ccec/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
index 5288a4f..d0bfced 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -20,6 +20,7 @@ import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -29,6 +30,7 @@ import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceCounter;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
+import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.thrift.generated.Document;
@@ -52,38 +54,30 @@ public class BlurNRTIndex extends BlurIndex {
   private static final Log LOG = LogFactory.getLog(BlurNRTIndex.class);
   private static final boolean APPLY_ALL_DELETES = true;
 
-  private NRTManager _nrtManager;
+  private AtomicReference<NRTManager> _nrtManagerRef = new AtomicReference<NRTManager>();
   private AtomicBoolean _isClosed = new AtomicBoolean();
   private IndexWriter _writer;
   private Thread _committer;
-  private SearcherFactory _searcherFactory = new SearcherFactory();
-  private AtomicReference<IndexReader> _indexRef = new AtomicReference<IndexReader>();
+  private SearcherFactory _searcherFactory;
   private long _lastRefresh;
-  private long _timeBetweenRefreshsNanos;
 
   // externally set
   private Directory _directory;
   private NRTManagerReopenThread _refresher;
-
-  private BlurIndexCloser _closer;
   private DirectoryReferenceFileGC _gc;
   private TableContext tableContext;
   private ShardContext shardContext;
   private SharedMergeScheduler mergeScheduler;
-
-  public void setMergeScheduler(SharedMergeScheduler mergeScheduler) {
-    this.mergeScheduler = mergeScheduler;
-  }
+  private ExecutorService searchExecutor;
 
   // created
   private TransactionRecorder _recorder;
   private TrackingIndexWriter _trackingWriter;
+  
 
   public void init() throws IOException {
     tableContext = shardContext.getTableContext();
 
-    _timeBetweenRefreshsNanos = TimeUnit.MILLISECONDS.toNanos(tableContext.getTimeBetweenRefreshs());
-
     IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, tableContext.getAnalyzer());
     conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
     conf.setSimilarity(tableContext.getSimilarity());
@@ -96,19 +90,26 @@ public class BlurNRTIndex extends BlurIndex {
     _writer = new IndexWriter(referenceCounter, conf);
     _recorder = new TransactionRecorder();
     _recorder.setContext(shardContext);
-
     _recorder.init();
     _recorder.replay(_writer);
 
+    _searcherFactory = new SearcherFactory() {
+      @Override
+      public IndexSearcher newSearcher(IndexReader reader) throws IOException {
+        return new IndexSearcherClosable(reader, searchExecutor, _nrtManagerRef);
+      }
+    };
+
     _trackingWriter = new TrackingIndexWriter(_writer);
-    _nrtManager = new NRTManager(_trackingWriter, _searcherFactory, APPLY_ALL_DELETES);
-    IndexSearcher searcher = _nrtManager.acquire();
-    _indexRef.set(searcher.getIndexReader());
-    _lastRefresh = System.nanoTime();
+    _nrtManagerRef.set(new NRTManager(_trackingWriter, _searcherFactory, APPLY_ALL_DELETES));
     startCommiter();
     startRefresher();
   }
 
+  private NRTManager getNRTManager() {
+    return _nrtManagerRef.get();
+  }
+
   @Override
   public long addDocuments(boolean waitToBeVisible, boolean wal, List<Document> documents)
throws IOException {
     long generation = _recorder.addDocuments(wal, documents, _trackingWriter);
@@ -140,20 +141,18 @@ public class BlurNRTIndex extends BlurIndex {
   @Override
   public synchronized void blockUntilGenerationIsVisible(long generation, boolean forceRefresh)
throws IOException {
     if (forceRefresh) {
-      _nrtManager.maybeRefresh();
-      swap();
+      getNRTManager().maybeRefreshBlocking();
     }
     waitToBeVisible(true, generation);
   }
 
+  /**
+   * The method fetches a reference to the IndexSearcher, the caller is
+   * responsible for calling close on the searcher.
+   */
   @Override
-  public IndexReader getIndexReader() throws IOException {
-    IndexReader indexReader = _indexRef.get();
-    while (!indexReader.tryIncRef()) {
-      indexReader = _indexRef.get();
-    }
-    LOG.debug("Index fetched with ref of [{0}] [{1}]", indexReader.getRefCount(), indexReader);
-    return indexReader;
+  public IndexSearcherClosable getIndexReader() throws IOException {
+    return (IndexSearcherClosable) getNRTManager().acquire();
   }
 
   @Override
@@ -165,8 +164,7 @@ public class BlurNRTIndex extends BlurIndex {
     try {
       _recorder.close();
       _writer.close();
-      _closer.close(_indexRef.get());
-      _nrtManager.close();
+      getNRTManager().close();
     } finally {
       _directory.close();
     }
@@ -174,8 +172,8 @@ public class BlurNRTIndex extends BlurIndex {
 
   @Override
   public void refresh() throws IOException {
-    _nrtManager.maybeRefresh();
-    swap();
+    getNRTManager().maybeRefresh();
+    _lastRefresh = System.currentTimeMillis();
   }
 
   @Override
@@ -189,37 +187,25 @@ public class BlurNRTIndex extends BlurIndex {
   }
 
   private void waitToBeVisible(boolean waitToBeVisible, long generation) throws IOException
{
-    if (waitToBeVisible && _nrtManager.getCurrentSearchingGen() < generation)
{
-      // if visibility is required then reopen.
-      _nrtManager.waitForGeneration(generation);
-      swap();
-    } else {
-      long now = System.nanoTime();
-      if (_lastRefresh + _timeBetweenRefreshsNanos < now) {
-        refresh();
-        _lastRefresh = now;
-      }
+    if (needsRefresh()) {
+      refresh();
+    }
+    if (waitToBeVisible && getNRTManager().getCurrentSearchingGen() < generation)
{
+      getNRTManager().waitForGeneration(generation);
     }
   }
 
-  private void swap() {
-    IndexSearcher searcher = _nrtManager.acquire();
-    IndexReader indexReader = searcher.getIndexReader();
-    int numberOfLeaves = indexReader.leaves().size();
-    IndexReader oldIndexReader = _indexRef.getAndSet(indexReader);
-    int oldNumberOfLeaves = oldIndexReader.leaves().size();
-    _closer.close(oldIndexReader);
-    LOG.debug("Old index version had [{1}] leaves new version has [{2}] leaves, for directory
[{0}].", indexReader, oldNumberOfLeaves, numberOfLeaves);
+  private boolean needsRefresh() {
+    if (_lastRefresh + tableContext.getTimeBetweenRefreshs() < System.currentTimeMillis())
{
+      return true;
+    }
+    return false;
   }
 
   public void setDirectory(Directory directory) {
     _directory = directory;
   }
 
-  public void setCloser(BlurIndexCloser closer) {
-    _closer = closer;
-  }
-
   public DirectoryReferenceFileGC getGc() {
     return _gc;
   }
@@ -230,7 +216,7 @@ public class BlurNRTIndex extends BlurIndex {
 
   private void startRefresher() {
     double targetMinStaleSec = tableContext.getTimeBetweenRefreshs() / 1000.0;
-    _refresher = new NRTManagerReopenThread(_nrtManager, targetMinStaleSec * 10, targetMinStaleSec);
+    _refresher = new NRTManagerReopenThread(getNRTManager(), targetMinStaleSec * 10, targetMinStaleSec);
     _refresher.setName("Refresh Thread [" + tableContext.getTable() + "/" + shardContext.getShard()
+ "]");
     _refresher.setDaemon(true);
     _refresher.start();
@@ -240,22 +226,24 @@ public class BlurNRTIndex extends BlurIndex {
     _committer = new Thread(new Runnable() {
       @Override
       public void run() {
-        while (!_isClosed.get()) {
-          try {
-            LOG.debug("Committing of [{0}/{1}].", tableContext.getTable(), shardContext.getShard());
-            _recorder.commit(_writer);
-          } catch (CorruptIndexException e) {
-            LOG.error("Curruption Error during commit of [{0}/{1}].", e, tableContext.getTable(),
shardContext.getShard());
-          } catch (IOException e) {
-            LOG.error("IO Error during commit of [{0}/{1}].", e, tableContext.getTable(),
shardContext.getShard());
-          }
-          try {
-            Thread.sleep(tableContext.getTimeBetweenCommits());
-          } catch (InterruptedException e) {
-            if (_isClosed.get()) {
-              return;
+        synchronized (this) {
+          while (!_isClosed.get()) {
+            try {
+              LOG.debug("Committing of [{0}/{1}].", tableContext.getTable(), shardContext.getShard());
+              _recorder.commit(_writer);
+            } catch (CorruptIndexException e) {
+              LOG.error("Curruption Error during commit of [{0}/{1}].", e, tableContext.getTable(),
shardContext.getShard());
+            } catch (IOException e) {
+              LOG.error("IO Error during commit of [{0}/{1}].", e, tableContext.getTable(),
shardContext.getShard());
+            }
+            try {
+              wait(tableContext.getTimeBetweenCommits());
+            } catch (InterruptedException e) {
+              if (_isClosed.get()) {
+                return;
+              }
+              LOG.error("Unknown error with committer thread [{0}/{1}].", e, tableContext.getTable(),
shardContext.getShard());
             }
-            LOG.error("Unknown error with committer thread [{0}/{1}].", e, tableContext.getTable(),
shardContext.getShard());
           }
         }
       }
@@ -273,4 +261,7 @@ public class BlurNRTIndex extends BlurIndex {
     this.shardContext = context;
   }
 
+  public void setMergeScheduler(SharedMergeScheduler mergeScheduler) {
+    this.mergeScheduler = mergeScheduler;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1373ccec/src/blur-core/src/main/java/org/apache/blur/server/BlurServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/BlurServer.java b/src/blur-core/src/main/java/org/apache/blur/server/BlurServer.java
index 0157a25..e6185ea 100644
--- a/src/blur-core/src/main/java/org/apache/blur/server/BlurServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/server/BlurServer.java
@@ -33,7 +33,6 @@ import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -72,7 +71,6 @@ import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.utils.BlurValidations;
 import org.apache.blur.utils.Merge;
 import org.apache.blur.utils.ThriftLuceneConversion;
-import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.thrift.TException;
 
@@ -85,10 +83,9 @@ public class BlurServer extends TableAdmin implements Iface {
   private int _maxQueryCacheElements = 128;
   private ExecutorService _dataFetch;
   private ExecutorService _indexSearcherExecutor;
-  private ExecutorService _searchExecutor;
   private int _dataFetchThreadCount = 32;
   private TableLayout _layout;
-  private Map<String, SessionInfo> sessions = new ConcurrentHashMap<String, SessionInfo>();
+  private SessionManager _sessionManager;
   private String _nodeName;
   private QueryStatusContainer _queryStatusContainer;
 
@@ -118,8 +115,7 @@ public class BlurServer extends TableAdmin implements Iface {
 
   public void init() throws BlurException {
     _dataFetch = Executors.newThreadPool("data-fetch-", _dataFetchThreadCount);
-    _indexSearcherExecutor = Executors.newThreadPool("index-searcher-", 16);
-    _searchExecutor = Executors.newThreadPool("search-", 16);
+    _indexSearcherExecutor = Executors.newThreadPool("search-executor-", 16);
 
     if (_configuration == null) {
       throw new BException("Configuration must be set before initialization.");
@@ -128,6 +124,8 @@ public class BlurServer extends TableAdmin implements Iface {
     _maxQueryCacheElements = _configuration.getInt(BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS,
128);
     _maxTimeToLive = _configuration.getLong(BLUR_SHARD_CACHE_MAX_TIMETOLIVE, TimeUnit.MINUTES.toMillis(1));
     _queryStatusContainer = new QueryStatusContainer(_configuration.getInt(BLUR_QUERYSTATUS_TRACKING_MAX,
16384));
+    _sessionManager = new SessionManager();
+    _sessionManager.init();
   }
 
   @Override
@@ -181,16 +179,14 @@ public class BlurServer extends TableAdmin implements Iface {
     for (Entry<String, BlurIndex> entry : blurIndexes.entrySet()) {
       int index = BlurUtil.getShardIndex(entry.getKey());
       try {
-        IndexReader indexReader = entry.getValue().getIndexReader();
+        IndexSearcherClosable indexSearcher = entry.getValue().getIndexReader();
         // @TODO use new thread pool here
-        IndexSearcher indexSearcher = new IndexSearcher(indexReader, _searchExecutor);
-        sessionInfo.add(index, indexReader);
         sessionInfo.add(index, indexSearcher);
       } catch (IOException e) {
         LOG.error("Unknown error while trying to fetch index readers.", e);
       }
     }
-    sessions.put(uuid, sessionInfo);
+    _sessionManager.put(uuid, sessionInfo);
     return new Session(uuid, table);
   }
 
@@ -199,7 +195,7 @@ public class BlurServer extends TableAdmin implements Iface {
     SessionInfo info = getSessionInfo(session);
     String queryId = _queryStatusContainer.start(queryArgs);
     try {
-      Map<Integer, IndexSearcher> searchers = info.getSearchers();
+      Map<Integer, IndexSearcherClosable> searchers = info.getSearchers();
       List<Integer> shardIndexes = queryArgs.getShardIndexes();
       TableDescriptor tableDescriptor = info.getTableDescriptor();
       Collection<SearchAction> searchersToSearch = getSearchActions(tableDescriptor,
shardIndexes, searchers);
@@ -262,7 +258,8 @@ public class BlurServer extends TableAdmin implements Iface {
     return result;
   }
 
-  private Collection<SearchAction> getSearchActions(TableDescriptor tableDescriptor,
List<Integer> shardIndexes, Map<Integer, IndexSearcher> searchers) throws BlurException
{
+  private Collection<SearchAction> getSearchActions(TableDescriptor tableDescriptor,
List<Integer> shardIndexes, Map<Integer, IndexSearcherClosable> searchers)
+      throws BlurException {
     String name = tableDescriptor.getName();
     int shardCount = tableDescriptor.getShardCount();
     Collection<SearchAction> searchersToSearch = new ArrayList<SearchAction>();
@@ -285,7 +282,7 @@ public class BlurServer extends TableAdmin implements Iface {
   }
 
   private SessionInfo getSessionInfo(Session session) throws BlurException {
-    SessionInfo info = sessions.get(session.getSessionId());
+    SessionInfo info = _sessionManager.get(session.getSessionId());
     if (info == null) {
       newSession(session.getTableName(), session.getSessionId());
       return getSessionInfo(session);
@@ -298,7 +295,7 @@ public class BlurServer extends TableAdmin implements Iface {
     try {
       SessionInfo sessionInfo = getSessionInfo(session);
       TableContext context = _indexServer.getTableContext(session.getTableName());
-      Map<Integer, IndexSearcher> searchers = sessionInfo.getSearchers();
+      Map<Integer, IndexSearcherClosable> searchers = sessionInfo.getSearchers();
       List<Document> result = new ArrayList<Document>();
       for (Long docLocation : docLocations) {
         if (docLocation == null) {
@@ -332,8 +329,7 @@ public class BlurServer extends TableAdmin implements Iface {
 
   @Override
   public void closeReadSession(Session session) throws BlurException, TException {
-    SessionInfo sessionInfo = getSessionInfo(session);
-    sessionInfo.releaseReaders();
+    _sessionManager.close(session.getSessionId());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1373ccec/src/blur-core/src/main/java/org/apache/blur/server/IndexSearcherClosable.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/IndexSearcherClosable.java
b/src/blur-core/src/main/java/org/apache/blur/server/IndexSearcherClosable.java
new file mode 100644
index 0000000..b91c9c5
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/server/IndexSearcherClosable.java
@@ -0,0 +1,26 @@
+package org.apache.blur.server;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.NRTManager;
+
+public class IndexSearcherClosable extends IndexSearcher implements Closeable {
+
+  private AtomicReference<NRTManager> _nrtManagerRef;
+
+  public IndexSearcherClosable(IndexReader r, ExecutorService executor, AtomicReference<NRTManager>
nrtManagerRef) {
+    super(r, executor);
+    _nrtManagerRef = nrtManagerRef;
+  }
+
+  @Override
+  public void close() throws IOException {
+    _nrtManagerRef.get().release(this);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1373ccec/src/blur-core/src/main/java/org/apache/blur/server/SessionInfo.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/SessionInfo.java b/src/blur-core/src/main/java/org/apache/blur/server/SessionInfo.java
index b2f4522..f5898ef 100644
--- a/src/blur-core/src/main/java/org/apache/blur/server/SessionInfo.java
+++ b/src/blur-core/src/main/java/org/apache/blur/server/SessionInfo.java
@@ -25,8 +25,6 @@ import org.apache.blur.analysis.BlurAnalyzer;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.search.IndexSearcher;
 
 public class SessionInfo {
 
@@ -34,9 +32,17 @@ public class SessionInfo {
 
   private String uuid;
   private BlurAnalyzer analyzer;
-  private Map<Integer, IndexReader> readers = new HashMap<Integer, IndexReader>();
-  private Map<Integer, IndexSearcher> searchers = new HashMap<Integer, IndexSearcher>();
+  private Map<Integer, IndexSearcherClosable> searchers = new HashMap<Integer, IndexSearcherClosable>();
   private TableDescriptor tableDescriptor;
+  private long timestamp = System.currentTimeMillis();
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
 
   public BlurAnalyzer getAnalyzer() {
     return analyzer;
@@ -54,39 +60,25 @@ public class SessionInfo {
     this.uuid = uuid;
   }
 
-  public void add(int index, IndexReader indexReader) {
-    readers.put(index, indexReader);
-  }
-
-  public void add(int index, IndexSearcher indexSearcher) {
+  public void add(int index, IndexSearcherClosable indexSearcher) {
     searchers.put(index, indexSearcher);
   }
 
-  public Map<Integer, IndexReader> getReaders() {
-    return readers;
-  }
-
-  public void setReaders(Map<Integer, IndexReader> readers) {
-    this.readers = readers;
-  }
-
-  public Map<Integer, IndexSearcher> getSearchers() {
+  public Map<Integer, IndexSearcherClosable> getSearchers() {
     return searchers;
   }
 
-  public void setSearchers(Map<Integer, IndexSearcher> searchers) {
+  public void setSearchers(Map<Integer, IndexSearcherClosable> searchers) {
     this.searchers = searchers;
   }
 
-  public void releaseReaders() {
-    for (Entry<Integer, IndexReader> entry : readers.entrySet()) {
-      IndexReader reader = entry.getValue();
+  public void close() {
+    for (Entry<Integer, IndexSearcherClosable> entry : searchers.entrySet()) {
+      IndexSearcherClosable searcher = entry.getValue();
       try {
-        LOG.info("Before release reader ref count [{0}] with reader [{1}]", reader.getRefCount(),
reader);
-        reader.decRef();
-        LOG.info("After release reader ref count [{0}] with reader [{1}]", reader.getRefCount(),
reader);
+        searcher.close();
       } catch (IOException e) {
-        LOG.error("Unknown exception while trying to decRef on reader [{0}]", e, reader);
+        LOG.error("Unknown exception while trying to close on searcher [{0}]", e, searcher);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1373ccec/src/blur-core/src/main/java/org/apache/blur/server/SessionManager.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/SessionManager.java b/src/blur-core/src/main/java/org/apache/blur/server/SessionManager.java
new file mode 100644
index 0000000..efc85fe
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/server/SessionManager.java
@@ -0,0 +1,54 @@
+package org.apache.blur.server;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SessionManager extends TimerTask {
+
+  private Map<String, SessionInfo> sessions = new ConcurrentHashMap<String, SessionInfo>();
+  private Timer _timer;
+  private long _delay = 3000;
+
+  public void init() {
+    _timer = new Timer("Session-Cleanup", true);
+    _timer.scheduleAtFixedRate(this, _delay, _delay);
+  }
+
+  public SessionInfo get(String sessionId) {
+    return sessions.get(sessionId);
+  }
+
+  public void put(String uuid, SessionInfo sessionInfo) {
+    sessions.put(uuid, sessionInfo);
+  }
+
+  @Override
+  public void run() {
+    Set<Entry<String, SessionInfo>> entrySet = new HashSet<Entry<String,
SessionInfo>>(sessions.entrySet());
+    for (Entry<String, SessionInfo> entry : entrySet) {
+      SessionInfo sessionInfo = entry.getValue();
+      if (isTooOld(sessionInfo)) {
+        close(sessionInfo.getUuid());
+      }
+    }
+  }
+
+  private boolean isTooOld(SessionInfo sessionInfo) {
+    if (sessionInfo.getTimestamp() + 60000 < System.currentTimeMillis()) {
+      return true;
+    }
+    return false;
+  }
+
+  public void close(String sessionId) {
+    SessionInfo sessionInfo = sessions.remove(sessionId);
+    if (sessionInfo != null) {
+      sessionInfo.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1373ccec/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
index 4db0232..587b3dc 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurServer.java
@@ -194,11 +194,9 @@ public class ThriftBlurServer extends AbstractThriftServer {
     refresher.init();
 
     BlurFilterCache filterCache = getFilterCache(configuration);
-    BlurIndexWarmup indexWarmup = getIndexWarmup(configuration);
 
     final DistributedIndexServer indexServer = new DistributedIndexServer();
     indexServer.setBlurMetrics(blurMetrics);
-    indexServer.setCache(cache);
     indexServer.setClusterStatus(clusterStatus);
     indexServer.setClusterName(configuration.get(BLUR_CLUSTER_NAME, BLUR_CLUSTER));
     indexServer.setConfiguration(config);
@@ -208,7 +206,6 @@ public class ThriftBlurServer extends AbstractThriftServer {
     indexServer.setZookeeper(zooKeeper);
     indexServer.setFilterCache(filterCache);
     indexServer.setSafeModeDelay(configuration.getLong(BLUR_SHARD_SAFEMODEDELAY, 60000));
-    indexServer.setWarmup(indexWarmup);
     indexServer.init();
 
     TableLayout layout = new TableLayout() {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1373ccec/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
b/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
index baade2f..5ac1633 100644
--- a/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
+++ b/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
@@ -88,6 +88,7 @@ public class DirectoryReferenceCounter extends Directory {
     private IndexInput input;
     private AtomicInteger ref;
     private boolean closed = false;
+    private Throwable throwable;
 
     public RefIndexInput(String resourceDescription, IndexInput input, AtomicInteger ref)
{
       super(resourceDescription);
@@ -99,7 +100,10 @@ public class DirectoryReferenceCounter extends Directory {
     @Override
     protected void finalize() throws Throwable {
       // Seems like not all the clones are being closed...
-      close();
+      if (!closed) {
+        LOG.warn("[" + input.toString() + "] Seems like not all the clones are being closed.
Printing stack trace.", throwable);
+        close();
+      }
     }
 
     @Override
@@ -107,6 +111,7 @@ public class DirectoryReferenceCounter extends Directory {
       RefIndexInput ref = (RefIndexInput) super.clone();
       ref.input = (IndexInput) input.clone();
       ref.ref.incrementAndGet();
+      ref.throwable = new Throwable();
       return ref;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1373ccec/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
b/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
index ab85f87..ec95633 100644
--- a/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
+++ b/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
@@ -56,7 +56,7 @@ public class DirectoryReferenceFileGC extends TimerTask {
         directory.deleteFile(name);
         return true;
       } else {
-        LOG.debug("File [{0}] had too many refs [{1}]", name, counter.get());
+        LOG.info("File [{0}] had too many refs [{1}]", name, counter.get());
       }
       return false;
     }


Mime
View raw message