incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/2] More updates the NRT code is still broken, but it's getting closer.
Date Tue, 19 Mar 2013 02:16:55 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
index 79ba2b8..97dc448 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.blur.thrift.generated.Row;
 import org.apache.lucene.index.IndexReader;
 
-
 public abstract class BlurIndex {
 
   public abstract void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException;
@@ -39,4 +38,5 @@ public abstract class BlurIndex {
 
   public abstract void optimize(int numberOfSegmentsPerShard) throws IOException;
 
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
index 48fe298..28e393a 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
@@ -16,18 +16,18 @@ package org.apache.blur.manager.writer;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
+
 import java.io.IOException;
 
 import org.apache.blur.index.IndexWriter;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.thrift.generated.Row;
-import org.apache.lucene.analysis.KeywordAnalyzer;
-import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.analysis.core.KeywordAnalyzer;
+import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.store.Directory;
-import org.apache.lucene.util.Version;
-
 
 public class BlurIndexReader extends AbstractBlurIndex {
 
@@ -36,11 +36,11 @@ public class BlurIndexReader extends AbstractBlurIndex {
   public void init() throws IOException {
     initIndexWriterConfig();
     Directory directory = getDirectory();
-    if (!IndexReader.indexExists(directory)) {
-      IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, new KeywordAnalyzer());
+    if (!DirectoryReader.indexExists(directory)) {
+      IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, new KeywordAnalyzer());
       new IndexWriter(directory, conf).close();
     }
-    initIndexReader(IndexReader.open(directory));
+    initIndexReader(DirectoryReader.open(directory));
   }
 
   @Override
@@ -69,4 +69,5 @@ public class BlurIndexReader extends AbstractBlurIndex {
   public void optimize(int numberOfSegmentsPerShard) throws IOException {
     // Do nothing
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
index e0e6d48..751a4dc 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -16,7 +16,7 @@ package org.apache.blur.manager.writer;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import static org.apache.blur.lucene.LuceneConstant.LUCENE_VERSION;
+import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
 
 import java.io.IOException;
 import java.util.List;
@@ -28,10 +28,13 @@ import org.apache.blur.analysis.BlurAnalyzer;
 import org.apache.blur.index.IndexWriter;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.store.refcounter.DirectoryReferenceCounter;
+import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
 import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.thrift.generated.Row;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.lucene.codecs.appending.AppendingCodec;
 import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.IndexDeletionPolicy;
 import org.apache.lucene.index.IndexReader;
@@ -42,10 +45,9 @@ import org.apache.lucene.search.NRTManager;
 import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
 import org.apache.lucene.search.NRTManagerReopenThread;
 import org.apache.lucene.search.SearcherFactory;
-import org.apache.lucene.search.Similarity;
+import org.apache.lucene.search.similarities.Similarity;
 import org.apache.lucene.store.Directory;
 
-
 public class BlurNRTIndex extends BlurIndex {
 
   private static final Log LOG = LogFactory.getLog(BlurNRTIndex.class);
@@ -93,72 +95,36 @@ public class BlurNRTIndex extends BlurIndex {
   // };
 
   public void init() throws IOException {
-    Path walTablePath = new Path(_walPath, _table);
-    Path walShardPath = new Path(walTablePath, _shard);
-
-    _timeBetweenRefreshsNanos = TimeUnit.MILLISECONDS.toNanos(_timeBetweenRefreshs);
+    tableContext = shardContext.getTableContext();
 
-    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, _analyzer);
+    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, tableContext.getAnalyzer());
     conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
-    conf.setSimilarity(_similarity);
-    conf.setIndexDeletionPolicy(_indexDeletionPolicy);
+    conf.setSimilarity(tableContext.getSimilarity());
+    conf.setIndexDeletionPolicy(tableContext.getIndexDeletionPolicy());
+    // conf.setCodec(new AppendingCodec());
     TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
     mergePolicy.setUseCompoundFile(false);
-    DirectoryReferenceCounter referenceCounter = new DirectoryReferenceCounter(_directory, _gc);
+    conf.setMergeScheduler(mergeScheduler);
+    DirectoryReferenceCounter referenceCounter = new DirectoryReferenceCounter(_directory, _gc, _closer);
     _writer = new IndexWriter(referenceCounter, conf);
     _recorder = new TransactionRecorder();
-    _recorder.setAnalyzer(_analyzer);
-    _recorder.setConfiguration(_configuration);
-    _recorder.setWalPath(walShardPath);
+    _recorder.setContext(shardContext);
     _recorder.init();
     _recorder.replay(_writer);
 
+    _searcherFactory = new SearcherFactory() {
+      @Override
+      public IndexSearcher newSearcher(IndexReader reader) throws IOException {
+        return new IndexSearcherClosable(reader, searchExecutor, _nrtManagerRef);
+      }
+    };
+
     _trackingWriter = new TrackingIndexWriter(_writer);
-    _nrtManager = new NRTManager(_trackingWriter, _searcherFactory, APPLY_ALL_DELETES);
-    IndexSearcher searcher = _nrtManager.acquire();
-    _indexRef.set(searcher.getIndexReader());
-    _lastRefresh = System.nanoTime();
+    _nrtManagerRef.set(new NRTManager(_trackingWriter, _searcherFactory, APPLY_ALL_DELETES));
     startCommiter();
     startRefresher();
   }
 
-  private void startRefresher() {
-    double targetMinStaleSec = _timeBetweenRefreshs / 1000.0;
-    _refresher = new NRTManagerReopenThread(_nrtManager, targetMinStaleSec * 10, targetMinStaleSec);
-    _refresher.setName("Refresh Thread [" + _table + "/" + _shard + "]");
-    _refresher.setDaemon(true);
-    _refresher.start();
-  }
-
-  private void startCommiter() {
-    _committer = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        while (!_isClosed.get()) {
-          try {
-            LOG.info("Committing of [{0}/{1}].", _table, _shard);
-            _recorder.commit(_writer);
-          } catch (CorruptIndexException e) {
-            LOG.error("Curruption Error during commit of [{0}/{1}].", e, _table, _shard);
-          } catch (IOException e) {
-            LOG.error("IO Error during commit of [{0}/{1}].", e, _table, _shard);
-          }
-          try {
-            Thread.sleep(_timeBetweenCommits);
-          } catch (InterruptedException e) {
-            if (_isClosed.get()) {
-              return;
-            }
-            LOG.error("Unknown error with committer thread [{0}/{1}].", e, _table, _shard);
-          }
-        }
-      }
-    });
-    _committer.setDaemon(true);
-    _committer.setName("Commit Thread [" + _table + "/" + _shard + "]");
-    _committer.start();
-  }
-
   @Override
   public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException {
     List<Record> records = row.records;
@@ -291,4 +257,41 @@ public class BlurNRTIndex extends BlurIndex {
     _gc = gc;
   }
 
-}
+  private void startRefresher() {
+    double targetMinStaleSec = _timeBetweenRefreshs / 1000.0;
+    _refresher = new NRTManagerReopenThread(_nrtManager, targetMinStaleSec * 10, targetMinStaleSec);
+    _refresher.setName("Refresh Thread [" + _table + "/" + _shard + "]");
+    _refresher.setDaemon(true);
+    _refresher.start();
+  }
+
+  private void startCommiter() {
+    _committer = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (!_isClosed.get()) {
+          try {
+            LOG.info("Committing of [{0}/{1}].", _table, _shard);
+            _recorder.commit(_writer);
+          } catch (CorruptIndexException e) {
+            LOG.error("Curruption Error during commit of [{0}/{1}].", e, _table, _shard);
+          } catch (IOException e) {
+            LOG.error("IO Error during commit of [{0}/{1}].", e, _table, _shard);
+          }
+          try {
+            Thread.sleep(_timeBetweenCommits);
+          } catch (InterruptedException e) {
+            if (_isClosed.get()) {
+              return;
+            }
+            LOG.error("Unknown error with committer thread [{0}/{1}].", e, _table, _shard);
+          }
+        }
+      }
+    });
+    _committer.setDaemon(true);
+    _committer.setName("Commit Thread [" + _table + "/" + _shard + "]");
+    _committer.start();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/manager/writer/DirectoryReferenceCounter.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/DirectoryReferenceCounter.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/DirectoryReferenceCounter.java
deleted file mode 100644
index 124b06d..0000000
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/DirectoryReferenceCounter.java
+++ /dev/null
@@ -1,279 +0,0 @@
-package org.apache.blur.manager.writer;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.lucene.index.IndexFileNames;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.Lock;
-import org.apache.lucene.store.LockFactory;
-
-
-public class DirectoryReferenceCounter extends Directory {
-
-  private final static Log LOG = LogFactory.getLog(DirectoryReferenceCounter.class);
-  private Directory directory;
-  private Map<String, AtomicInteger> refs = new ConcurrentHashMap<String, AtomicInteger>();
-  private DirectoryReferenceFileGC gc;
-
-  public DirectoryReferenceCounter(Directory directory, DirectoryReferenceFileGC gc) {
-    this.directory = directory;
-    this.gc = gc;
-  }
-
-  public void deleteFile(String name) throws IOException {
-    if (name.equals(IndexFileNames.SEGMENTS_GEN)) {
-      deleteFile(name);
-      return;
-    }
-    AtomicInteger counter = refs.get(name);
-    if (counter != null && counter.get() > 0) {
-      addToFileGC(name);
-    } else {
-      LOG.debug("Delete file [{0}]", name);
-      directory.deleteFile(name);
-    }
-  }
-
-  private void addToFileGC(String name) {
-    if (gc != null) {
-      LOG.debug("Add file [{0}] to be GCed once refs are closed.", name);
-      gc.add(directory, name, refs);
-    }
-  }
-
-  public IndexOutput createOutput(String name) throws IOException {
-    if (name.equals(IndexFileNames.SEGMENTS_GEN)) {
-      return directory.createOutput(name);
-    }
-    LOG.debug("Create file [{0}]", name);
-    AtomicInteger counter = refs.get(name);
-    if (counter != null) {
-      LOG.error("Unknown error while trying to create ref counter for [{0}] reference exists.", name);
-      throw new IOException("Reference exists [" + name + "]");
-    }
-    counter = new AtomicInteger(0);
-    refs.put(name, counter);
-    return directory.createOutput(name);
-  }
-
-  public IndexInput openInput(String name) throws IOException {
-    IndexInput input = directory.openInput(name);
-    if (name.equals(IndexFileNames.SEGMENTS_GEN)) {
-      return input;
-    }
-    return wrap(name, input);
-  }
-
-  public IndexInput openInput(String name, int bufferSize) throws IOException {
-    IndexInput input = directory.openInput(name, bufferSize);
-    if (name.equals(IndexFileNames.SEGMENTS_GEN)) {
-      return input;
-    }
-    return wrap(name, input);
-  }
-
-  private IndexInput wrap(String name, IndexInput input) {
-    AtomicInteger counter = refs.get(name);
-    if (counter == null) {
-      counter = new AtomicInteger();
-      refs.put(name, counter);
-    }
-    return new RefIndexInput(input, counter);
-  }
-
-  @SuppressWarnings("deprecation")
-  public static class RefIndexInput extends IndexInput {
-
-    private IndexInput input;
-    private AtomicInteger ref;
-    private boolean closed = false;
-
-    public RefIndexInput(IndexInput input, AtomicInteger ref) {
-      this.input = input;
-      this.ref = ref;
-      ref.incrementAndGet();
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-      // Seems like not all the clones are being closed...
-      close();
-    }
-
-    public Object clone() {
-      RefIndexInput ref = (RefIndexInput) super.clone();
-      ref.input = (IndexInput) input.clone();
-      ref.ref.incrementAndGet();
-      return ref;
-    }
-
-    public void skipChars(int length) throws IOException {
-      input.skipChars(length);
-    }
-
-    public void setModifiedUTF8StringsMode() {
-      input.setModifiedUTF8StringsMode();
-    }
-
-    public void close() throws IOException {
-      if (!closed) {
-        input.close();
-        ref.decrementAndGet();
-        closed = true;
-      }
-    }
-
-    public short readShort() throws IOException {
-      return input.readShort();
-    }
-
-    public int readInt() throws IOException {
-      return input.readInt();
-    }
-
-    public void seek(long pos) throws IOException {
-      input.seek(pos);
-    }
-
-    public void copyBytes(IndexOutput out, long numBytes) throws IOException {
-      input.copyBytes(out, numBytes);
-    }
-
-    public int readVInt() throws IOException {
-      return input.readVInt();
-    }
-
-    public String toString() {
-      return input.toString();
-    }
-
-    public long readLong() throws IOException {
-      return input.readLong();
-    }
-
-    public long readVLong() throws IOException {
-      return input.readVLong();
-    }
-
-    public String readString() throws IOException {
-      return input.readString();
-    }
-
-    public long getFilePointer() {
-      return input.getFilePointer();
-    }
-
-    public byte readByte() throws IOException {
-      return input.readByte();
-    }
-
-    public void readBytes(byte[] b, int offset, int len) throws IOException {
-      input.readBytes(b, offset, len);
-    }
-
-    public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
-      input.readBytes(b, offset, len, useBuffer);
-    }
-
-    public long length() {
-      return input.length();
-    }
-
-    public void readChars(char[] buffer, int start, int length) throws IOException {
-      input.readChars(buffer, start, length);
-    }
-
-    public Map<String, String> readStringStringMap() throws IOException {
-      return input.readStringStringMap();
-    }
-
-  }
-
-  @SuppressWarnings("deprecation")
-  public void touchFile(String name) throws IOException {
-    directory.touchFile(name);
-  }
-
-  @SuppressWarnings("deprecation")
-  public void sync(String name) throws IOException {
-    directory.sync(name);
-  }
-
-  public void sync(Collection<String> names) throws IOException {
-    directory.sync(names);
-  }
-
-  public void clearLock(String name) throws IOException {
-    directory.clearLock(name);
-  }
-
-  public void close() throws IOException {
-    directory.close();
-  }
-
-  public void setLockFactory(LockFactory lockFactory) throws IOException {
-    directory.setLockFactory(lockFactory);
-  }
-
-  public String getLockID() {
-    return directory.getLockID();
-  }
-
-  public void copy(Directory to, String src, String dest) throws IOException {
-    directory.copy(to, src, dest);
-  }
-
-  public boolean fileExists(String name) throws IOException {
-    return directory.fileExists(name);
-  }
-
-  @SuppressWarnings("deprecation")
-  public long fileModified(String name) throws IOException {
-    return directory.fileModified(name);
-  }
-
-  public long fileLength(String name) throws IOException {
-    return directory.fileLength(name);
-  }
-
-  public LockFactory getLockFactory() {
-    return directory.getLockFactory();
-  }
-
-  public String[] listAll() throws IOException {
-    return directory.listAll();
-  }
-
-  public Lock makeLock(String name) {
-    return directory.makeLock(name);
-  }
-
-  public String toString() {
-    return directory.toString();
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/manager/writer/DirectoryReferenceFileGC.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/DirectoryReferenceFileGC.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/DirectoryReferenceFileGC.java
deleted file mode 100644
index 0aa64f4..0000000
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/DirectoryReferenceFileGC.java
+++ /dev/null
@@ -1,98 +0,0 @@
-package org.apache.blur.manager.writer;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.lucene.store.Directory;
-
-
-public class DirectoryReferenceFileGC extends TimerTask {
-
-  private static final Log LOG = LogFactory.getLog(DirectoryReferenceFileGC.class);
-
-  private Timer _timer;
-  private long _delay = 5000;
-  private LinkedBlockingQueue<Value> _queue;
-
-  public static class Value {
-    public Value(Directory directory, String name, Map<String, AtomicInteger> refs) {
-      this.directory = directory;
-      this.name = name;
-      this.refs = refs;
-    }
-
-    Directory directory;
-    String name;
-    Map<String, AtomicInteger> refs;
-
-    public boolean tryToDelete() throws IOException {
-      AtomicInteger counter = refs.get(name);
-      if (counter.get() <= 0) {
-        refs.remove(name);
-        LOG.debug("Removing file [{0}]", name);
-        directory.deleteFile(name);
-        return true;
-      } else {
-        LOG.debug("File [{0}] had too many refs [{1}]", name, counter.get());
-      }
-      return false;
-    }
-  }
-
-  public void init() {
-    _timer = new Timer("Blur-File-GC", true);
-    _timer.scheduleAtFixedRate(this, _delay, _delay);
-    _queue = new LinkedBlockingQueue<Value>();
-  }
-
-  public void add(Directory directory, String name, Map<String, AtomicInteger> refs) {
-    try {
-      _queue.put(new Value(directory, name, refs));
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public void close() {
-    _timer.purge();
-    _timer.cancel();
-  }
-
-  @Override
-  public void run() {
-    Iterator<Value> iterator = _queue.iterator();
-    while (iterator.hasNext()) {
-      Value value = iterator.next();
-      try {
-        if (value.tryToDelete()) {
-          iterator.remove();
-        }
-      } catch (IOException e) {
-        LOG.error("Unknown error", e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
index 9d8f1c7..7e69a8f 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
@@ -48,11 +48,11 @@ import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.Field.Index;
 import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.FieldType;
 import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
 
-
 public class TransactionRecorder {
 
   enum TYPE {
@@ -80,7 +80,17 @@ public class TransactionRecorder {
   }
 
   private static final Log LOG = LogFactory.getLog(TransactionRecorder.class);
-  private static final Term ROW_ID = new Term(BlurConstants.ROW_ID);
+  private static FieldType ID_TYPE;
+
+  static {
+    ID_TYPE = new FieldType();
+    ID_TYPE.setIndexed(true);
+    ID_TYPE.setTokenized(false);
+    ID_TYPE.setOmitNorms(true);
+    ID_TYPE.setStored(true);
+    ID_TYPE.freeze();
+  }
+
   private AtomicBoolean running = new AtomicBoolean(true);
   private Path walPath;
   private Configuration configuration;
@@ -127,12 +137,12 @@ public class TransactionRecorder {
       switch (lookup) {
       case ROW:
         Row row = readRow(dataInputStream);
-        writer.updateDocuments(ROW_ID.createTerm(row.id), getDocs(row, analyzer));
+        writer.updateDocuments(createRowId(row.id), getDocs(row, analyzer));
         updateCount++;
         continue;
       case DELETE:
         String deleteRowId = readString(dataInputStream);
-        writer.deleteDocuments(ROW_ID.createTerm(deleteRowId));
+        writer.deleteDocuments(createRowId(deleteRowId));
         deleteCount++;
         continue;
       default:
@@ -273,7 +283,7 @@ public class TransactionRecorder {
         sync(baos.toByteArray());
       }
     }
-    Term term = ROW_ID.createTerm(row.id);
+    Term term = createRowId(row.id);
     List<Document> docs = getDocs(row, analyzer);
     return writer.updateDocuments(term, docs);
   }
@@ -289,7 +299,7 @@ public class TransactionRecorder {
         sync(baos.toByteArray());
       }
     }
-    return writer.deleteDocuments(ROW_ID.createTerm(rowId));
+    return writer.deleteDocuments(createRowId(rowId));
   }
 
   public void setWalPath(Path walPath) {
@@ -321,7 +331,7 @@ public class TransactionRecorder {
     for (int i = 0; i < size; i++) {
       Document document = convert(rowId, records.get(i), builder, analyzer);
       if (i == 0) {
-        document.add(BlurConstants.PRIME_DOC_FIELD);
+        document.add(new Field(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO, Index.NOT_ANALYZED_NO_NORMS));
       }
       docs.add(document);
     }
@@ -330,8 +340,8 @@ public class TransactionRecorder {
 
   public static Document convert(String rowId, Record record, StringBuilder builder, BlurAnalyzer analyzer) {
     Document document = new Document();
-    document.add(new Field(BlurConstants.ROW_ID, rowId, Store.YES, Index.NOT_ANALYZED_NO_NORMS));
-    document.add(new Field(BlurConstants.RECORD_ID, record.recordId, Store.YES, Index.NOT_ANALYZED_NO_NORMS));
+    document.add(new Field(BlurConstants.ROW_ID, rowId, ID_TYPE));
+    document.add(new Field(BlurConstants.RECORD_ID, record.recordId, ID_TYPE));
     RowIndexWriter.addColumns(document, analyzer, builder, record.family, record.columns);
     return document;
   }
@@ -339,4 +349,9 @@ public class TransactionRecorder {
   public void setAnalyzer(BlurAnalyzer analyzer) {
     this.analyzer = analyzer;
   }
-}
+
+  private Term createRowId(String id) {
+    return new Term(BlurConstants.ROW_ID, id);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/manager/writer/lucene/SnapshotIndexReader.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/lucene/SnapshotIndexReader.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/lucene/SnapshotIndexReader.java
deleted file mode 100644
index 171b3b6..0000000
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/lucene/SnapshotIndexReader.java
+++ /dev/null
@@ -1,170 +0,0 @@
-package org.apache.blur.manager.writer.lucene;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.lucene.index.FilterIndexReader;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.MultiReader;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.index.TermDocs;
-import org.apache.lucene.index.TermPositions;
-
-public class SnapshotIndexReader extends FilterIndexReader {
-
-  private final int maxDocs;
-  private final int numDocs;
-
-  public SnapshotIndexReader(IndexReader in, int maxDocs) {
-    super(in);
-    this.numDocs = in.numDocs();
-    this.maxDocs = maxDocs;
-  }
-
-  public static IndexReader wrap(IndexReader reader, int maxDocs) throws IOException {
-    IndexReader[] readers = reader.getSequentialSubReaders();
-    if (readers == null) {
-      return wrapInternal(reader, maxDocs);
-    } else {
-      IndexReader[] result = new IndexReader[readers.length];
-      for (int i = 0; i < readers.length; i++) {
-        result[i] = wrapInternal(readers[i], maxDocs);
-      }
-      return new MultiReader(result, false);
-    }
-  }
-
-  private static IndexReader wrapInternal(IndexReader reader, int maxDocs) throws IOException {
-    return new SnapshotIndexReader(reader, maxDocs);
-  }
-
-  @Override
-  public IndexReader[] getSequentialSubReaders() {
-    return null;
-  }
-
-  @Override
-  public int numDocs() {
-    return numDocs;
-  }
-
-  @Override
-  public int maxDoc() {
-    return maxDocs;
-  }
-
-  @Override
-  public TermDocs termDocs() throws IOException {
-    return new SnapshotTermDocs(in.termDocs(), maxDocs);
-  }
-
-  public TermDocs termDocs(Term term) throws IOException {
-    ensureOpen();
-    TermDocs termDocs = termDocs();
-    termDocs.seek(term);
-    return termDocs;
-  }
-
-  @Override
-  public TermPositions termPositions() throws IOException {
-    return new SnapshotTermPositions(in.termPositions(), maxDocs);
-  }
-
-  // public TermPositions termPositions(Term term) throws IOException {
-  // ensureOpen();
-  // TermPositions termPositions = termPositions();
-  // termPositions.seek(term);
-  // return termPositions;
-  // }
-
-  public static class SnapshotTermPositions extends FilterTermPositions {
-
-    private final int maxDocs;
-
-    public SnapshotTermPositions(TermPositions termPositions, int maxDocs) {
-      super(termPositions);
-      this.maxDocs = maxDocs;
-    }
-
-    @Override
-    public boolean next() throws IOException {
-      boolean next = super.next();
-      if (next) {
-        if (doc() >= maxDocs) {
-          return false;
-        }
-      }
-      return next;
-    }
-
-    @Override
-    public int read(int[] docs, int[] freqs) throws IOException {
-      int read = super.read(docs, freqs);
-      if (read == 0) {
-        return 0;
-      }
-      if (doc() >= maxDocs) {
-        return checkResults(docs, maxDocs);
-      }
-      return read;
-    }
-  }
-
-  public static class SnapshotTermDocs extends FilterTermDocs {
-
-    private final int maxDocs;
-
-    public SnapshotTermDocs(TermDocs termDocs, int maxDocs) {
-      super(termDocs);
-      this.maxDocs = maxDocs;
-    }
-
-    @Override
-    public boolean next() throws IOException {
-      boolean next = super.next();
-      if (next) {
-        if (doc() >= maxDocs) {
-          return false;
-        }
-      }
-      return next;
-    }
-
-    @Override
-    public int read(int[] docs, int[] freqs) throws IOException {
-      int read = super.read(docs, freqs);
-      if (read == 0) {
-        return 0;
-      }
-      if (doc() >= maxDocs) {
-        return checkResults(docs, maxDocs);
-      }
-      return read;
-    }
-  }
-
-  private static int checkResults(int[] docs, int maxDocs) {
-    int length = docs.length;
-    for (int i = 0; i < length; i++) {
-      if (docs[i] >= maxDocs) {
-        return i;
-      }
-    }
-    return length;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/manager/writer/lucene/SoftDeleteIndexReader.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/lucene/SoftDeleteIndexReader.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/lucene/SoftDeleteIndexReader.java
deleted file mode 100644
index f81e619..0000000
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/lucene/SoftDeleteIndexReader.java
+++ /dev/null
@@ -1,213 +0,0 @@
-package org.apache.blur.manager.writer.lucene;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.lucene.index.FilterIndexReader;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.MultiReader;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.index.TermDocs;
-import org.apache.lucene.index.TermPositions;
-import org.apache.lucene.util.BitVector;
-
-public class SoftDeleteIndexReader extends FilterIndexReader {
-
-  private final boolean baseHasDeletions;
-  private final BitVector deletes;
-  private final int deleteCount;
-  private final int numDocs;
-
-  public SoftDeleteIndexReader(IndexReader in, BitVector deletes) {
-    super(in);
-    this.baseHasDeletions = in.hasDeletions();
-    if (deletes == null) {
-      throw new RuntimeException("No deletes, use regular indexreader.");
-    }
-    this.deletes = deletes;
-    this.deleteCount = deletes.count();
-    this.numDocs = in.numDocs() - deleteCount;
-  }
-
-  public static IndexReader wrap(IndexReader reader, Collection<Term> deleteTerms) throws IOException {
-    IndexReader[] readers = reader.getSequentialSubReaders();
-    if (readers == null) {
-      return wrapInternal(reader, deleteTerms);
-    } else {
-      IndexReader[] result = new IndexReader[readers.length];
-      for (int i = 0; i < readers.length; i++) {
-        result[i] = wrapInternal(readers[i], deleteTerms);
-      }
-      return new MultiReader(result, false);
-    }
-  }
-
-  private static IndexReader wrapInternal(IndexReader reader, Collection<Term> deleteTerms) throws IOException {
-    BitVector deletes = getDeletes(reader, deleteTerms);
-    if (deletes == null) {
-      return reader;
-    }
-    return new SoftDeleteIndexReader(reader, deletes);
-  }
-
-  private static BitVector getDeletes(IndexReader reader, Collection<Term> deleteTerms) throws IOException {
-    BitVector deletes = null;
-    TermDocs termDocs = reader.termDocs();
-    for (Term t : deleteTerms) {
-      termDocs.seek(t);
-      while (termDocs.next()) {
-        if (deletes == null) {
-          deletes = new BitVector(reader.maxDoc());
-        }
-        int doc = termDocs.doc();
-        deletes.set(doc);
-      }
-    }
-    termDocs.close();
-    return deletes;
-  }
-
-  @Override
-  public IndexReader[] getSequentialSubReaders() {
-    return null;
-  }
-
-  @Override
-  public int numDocs() {
-    return numDocs;
-  }
-
-  @Override
-  public boolean isDeleted(int n) {
-    if (baseHasDeletions && in.isDeleted(n)) {
-      return true;
-    }
-    return deletes.get(n);
-  }
-
-  @Override
-  public boolean hasDeletions() {
-    return baseHasDeletions;
-  }
-
-  @Override
-  public TermDocs termDocs() throws IOException {
-    return new SoftDeleteTermDocs(in.termDocs(), deletes);
-  }
-
-  public TermDocs termDocs(Term term) throws IOException {
-    ensureOpen();
-    TermDocs termDocs = termDocs();
-    termDocs.seek(term);
-    return termDocs;
-  }
-
-  @Override
-  public TermPositions termPositions() throws IOException {
-    return new SoftDeleteTermPositions(in.termPositions(), deletes);
-  }
-
-  // public TermPositions termPositions(Term term) throws IOException {
-  // ensureOpen();
-  // TermPositions termPositions = termPositions();
-  // termPositions.seek(term);
-  // return termPositions;
-  // }
-
-  public static class SoftDeleteTermPositions extends FilterTermPositions {
-
-    private BitVector deletes;
-
-    public SoftDeleteTermPositions(TermPositions termPositions, BitVector deletes) {
-      super(termPositions);
-      this.deletes = deletes;
-    }
-
-    @Override
-    public boolean next() throws IOException {
-      while (super.next()) {
-        if (!deletes.get(doc())) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public int read(int[] docs, int[] freqs) throws IOException {
-      int read = super.read(docs, freqs);
-      if (read == 0) {
-        return 0;
-      }
-      int validResults = removeDeletes(docs, freqs, read, deletes);
-      if (validResults == 0) {
-        return read(docs, freqs);
-      }
-      return validResults;
-    }
-  }
-
-  public static class SoftDeleteTermDocs extends FilterTermDocs {
-
-    private BitVector deletes;
-
-    public SoftDeleteTermDocs(TermDocs termDocs, BitVector deletes) {
-      super(termDocs);
-      this.deletes = deletes;
-    }
-
-    @Override
-    public boolean next() throws IOException {
-      while (super.next()) {
-        if (!deletes.get(doc())) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public int read(int[] docs, int[] freqs) throws IOException {
-      int read = super.read(docs, freqs);
-      if (read == 0) {
-        return 0;
-      }
-      int validResults = removeDeletes(docs, freqs, read, deletes);
-      if (validResults == 0) {
-        return read(docs, freqs);
-      }
-      return validResults;
-    }
-  }
-
-  private static int removeDeletes(int[] docs, int[] freqs, int validLength, BitVector deletes) {
-    int targetPosition = 0;
-    for (int i = 0; i < validLength; i++) {
-      int doc = docs[i];
-      if (!deletes.get(doc)) {
-        if (targetPosition != i) {
-          docs[targetPosition] = doc;
-          freqs[targetPosition] = freqs[i];
-        }
-        targetPosition++;
-      }
-    }
-    return targetPosition;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/server/Configurable.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/Configurable.java b/src/blur-core/src/main/java/org/apache/blur/server/Configurable.java
new file mode 100644
index 0000000..01b9268
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/server/Configurable.java
@@ -0,0 +1,25 @@
+package org.apache.blur.server;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public interface Configurable {
+
+  public void setTableContext(TableContext context);
+
+  public TableContext getTableContext();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/server/Configured.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/Configured.java b/src/blur-core/src/main/java/org/apache/blur/server/Configured.java
new file mode 100644
index 0000000..484a148
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/server/Configured.java
@@ -0,0 +1,42 @@
+package org.apache.blur.server;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public abstract class Configured implements Configurable {
+
+  private TableContext context;
+
+  public Configured() {
+    this(null);
+  }
+
+  public Configured(TableContext context) {
+    this.context = context;
+  }
+
+  @Override
+  public TableContext getTableContext() {
+    return context;
+  }
+
+  @Override
+  public void setTableContext(TableContext context) {
+    this.context = context;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/server/ShardContext.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/ShardContext.java b/src/blur-core/src/main/java/org/apache/blur/server/ShardContext.java
new file mode 100644
index 0000000..0b0f808
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/server/ShardContext.java
@@ -0,0 +1,87 @@
+package org.apache.blur.server;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.Directory;
+
+public class ShardContext {
+
+  private String shard;
+  private Path walShardPath;
+  private Path hdfsDirPath;
+  private Directory directory;
+  private TableContext tableContext;
+
+  public TableContext getTableContext() {
+    return tableContext;
+  }
+
+  public void setTableContext(TableContext tableContext) {
+    this.tableContext = tableContext;
+  }
+
+  protected ShardContext() {
+
+  }
+
+  public Directory getDirectory() {
+    return directory;
+  }
+
+  public void setDirectory(Directory directory) {
+    this.directory = directory;
+  }
+
+  public Path getHdfsDirPath() {
+    return hdfsDirPath;
+  }
+
+  public void setHdfsDirPath(Path hdfsDirPath) {
+    this.hdfsDirPath = hdfsDirPath;
+  }
+
+  public String getShard() {
+    return shard;
+  }
+
+  public void setShard(String shard) {
+    this.shard = shard;
+  }
+
+  public Path getWalShardPath() {
+    return walShardPath;
+  }
+
+  public void setWalShardPath(Path walShardPath) {
+    this.walShardPath = walShardPath;
+  }
+
+  public static ShardContext create(TableContext tableContext, String shard) throws IOException {
+    ShardContext shardContext = new ShardContext();
+    shardContext.tableContext = tableContext;
+    shardContext.walShardPath = new Path(tableContext.getWalTablePath(), shard);
+    shardContext.hdfsDirPath = new Path(tableContext.getTablePath(), shard);
+    shardContext.shard = shard;
+    shardContext.directory = new HdfsDirectory(tableContext.getConfiguration(), shardContext.hdfsDirPath);
+    return shardContext;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/server/TableContext.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/server/TableContext.java b/src/blur-core/src/main/java/org/apache/blur/server/TableContext.java
new file mode 100644
index 0000000..091f424
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/server/TableContext.java
@@ -0,0 +1,165 @@
+package org.apache.blur.server;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.utils.BlurConstants.BLUR_SAHRD_INDEX_SIMILARITY;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_DELETION_POLICY_MAXAGE;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_COMMITS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_REFRESHS;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.search.ScoreType;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.similarities.DefaultSimilarity;
+import org.apache.lucene.search.similarities.Similarity;
+
+public class TableContext {
+
+
+  private static final Log LOG = LogFactory.getLog(TableContext.class);
+
+  private static final String LOGS = "logs";
+
+  private Path tablePath;
+  private Path walTablePath;
+  private BlurAnalyzer analyzer;
+  private String defaultFieldName;
+  private String table;
+  private IndexDeletionPolicy indexDeletionPolicy;
+  private Similarity similarity;
+  private Configuration configuration;
+  private TableDescriptor descriptor;
+  private long timeBetweenCommits;
+  private long timeBetweenRefreshs;
+  private ScoreType defaultScoreType;
+  private Term defaultPrimeDocTerm;
+
+  private static ConcurrentHashMap<String, TableContext> cache = new ConcurrentHashMap<String, TableContext>();
+
+  protected TableContext() {
+
+  }
+  
+  public static void clear() {
+    cache.clear();
+  }
+
+  public static TableContext create(TableDescriptor tableDescriptor) {
+    TableContext tableContext = cache.get(tableDescriptor.getName());
+    if (tableContext != null) {
+      return tableContext;
+    }
+    LOG.info("Creating table context for table [{0}]", tableDescriptor.getName());
+    Configuration configuration = new Configuration();
+//    Map<String, String> properties = tableDescriptor.getProperties();
+//    if (properties != null) {
+//      for (Entry<String, String> prop : properties.entrySet()) {
+//        configuration.set(prop.getKey(), prop.getValue());
+//      }
+//    }
+
+    tableContext = new TableContext();
+    tableContext.configuration = configuration;
+    tableContext.tablePath = new Path(tableDescriptor.getTableUri());
+    tableContext.walTablePath = new Path(tableContext.tablePath, LOGS);
+    tableContext.analyzer = new BlurAnalyzer(tableDescriptor.getAnalyzerDefinition());
+//    tableContext.defaultFieldName = tableDescriptor.getDefaultFieldName();
+    tableContext.table = tableDescriptor.getName();
+    tableContext.descriptor = tableDescriptor;
+    tableContext.timeBetweenCommits = configuration.getLong(BLUR_SHARD_TIME_BETWEEN_COMMITS, 60000);
+    tableContext.timeBetweenRefreshs = configuration.getLong(BLUR_SHARD_TIME_BETWEEN_REFRESHS, 5000);
+    tableContext.defaultPrimeDocTerm = new Term("_primedoc_", "true");
+    tableContext.defaultScoreType = ScoreType.SUPER;
+
+    Class<?> c1 = configuration.getClass(BLUR_SHARD_INDEX_DELETION_POLICY_MAXAGE, KeepOnlyLastCommitDeletionPolicy.class);
+    tableContext.indexDeletionPolicy = (IndexDeletionPolicy) configure(ReflectionUtils.newInstance(c1, configuration), tableContext);
+    Class<?> c2 = configuration.getClass(BLUR_SAHRD_INDEX_SIMILARITY, DefaultSimilarity.class);
+    tableContext.similarity = (Similarity) configure(ReflectionUtils.newInstance(c2, configuration), tableContext);
+    
+    cache.put(tableDescriptor.getName(), tableContext);
+    return tableContext;
+  }
+
+  private static Object configure(Object o, TableContext tableContext) {
+    if (o instanceof Configurable) {
+      ((Configurable) o).setTableContext(tableContext);
+    }
+    return o;
+  }
+
+  public IndexDeletionPolicy getIndexDeletionPolicy() {
+    return indexDeletionPolicy;
+  }
+
+  public Similarity getSimilarity() {
+    return similarity;
+  }
+
+  public long getTimeBetweenCommits() {
+    return timeBetweenCommits;
+  }
+
+  public long getTimeBetweenRefreshs() {
+    return timeBetweenRefreshs;
+  }
+
+  public BlurAnalyzer getAnalyzer() {
+    return analyzer;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+
+  public TableDescriptor getDescriptor() {
+    return descriptor;
+  }
+
+  public Path getTablePath() {
+    return tablePath;
+  }
+
+  public Path getWalTablePath() {
+    return walTablePath;
+  }
+
+  public String getDefaultFieldName() {
+    return defaultFieldName;
+  }
+
+  public Term getDefaultPrimeDocTerm() {
+    return defaultPrimeDocTerm;
+  }
+
+  public ScoreType getDefaultScoreType() {
+    return defaultScoreType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index 55b6c60..0e83031 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -16,8 +16,8 @@ package org.apache.blur.thrift;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER_NAME;
 import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER;
+import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER_NAME;
 import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_GUI_SHARD_PORT;
@@ -27,9 +27,6 @@ import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_ADDRESS;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_SLAB_COUNT;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_TIMETOLIVE;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_DATA_FETCH_THREAD_COUNT;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_FILTER_CACHE_CLASS;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_HOSTNAME;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_WARMUP_CLASS;
@@ -42,8 +39,6 @@ import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE;
 import static org.apache.blur.utils.BlurUtil.quietClose;
 
-import java.util.concurrent.TimeUnit;
-
 import org.apache.blur.BlurConfiguration;
 import org.apache.blur.concurrent.SimpleUncaughtExceptionHandler;
 import org.apache.blur.concurrent.ThreadWatcher;
@@ -57,16 +52,16 @@ import org.apache.blur.manager.IndexManager;
 import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
 import org.apache.blur.manager.indexserver.BlurIndexWarmup;
 import org.apache.blur.manager.indexserver.BlurServerShutDown;
+import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
 import org.apache.blur.manager.indexserver.DefaultBlurIndexWarmup;
 import org.apache.blur.manager.indexserver.DistributedIndexServer;
-import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
 import org.apache.blur.manager.writer.BlurIndexRefresher;
 import org.apache.blur.metrics.BlurMetrics;
-import org.apache.blur.store.BufferStore;
 import org.apache.blur.store.blockcache.BlockCache;
 import org.apache.blur.store.blockcache.BlockDirectory;
 import org.apache.blur.store.blockcache.BlockDirectoryCache;
 import org.apache.blur.store.blockcache.Cache;
+import org.apache.blur.store.buffer.BufferStore;
 import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.zookeeper.ZkUtils;
@@ -132,11 +127,13 @@ public class ThriftBlurShardServer extends ThriftServer {
       LOG.info("Number of slabs of block cache [{0}] with direct memory allocation set to [{1}]", slabCount, directAllocation);
       LOG.info("Block cache target memory usage, slab size of [{0}] will allocate [{1}] slabs and use ~[{2}] bytes", slabSize, slabCount, ((long) slabCount * (long) slabSize));
 
-      BufferStore.init(configuration, blurMetrics);
-
+      int _1024Size = configuration.getInt("blur.shard.buffercache.1024", 8192);
+      int _8192Size = configuration.getInt("blur.shard.buffercache.8192", 8192);
+      BufferStore.init(_1024Size, _8192Size);
+      
       try {
         long totalMemory = (long) slabCount * (long) numberOfBlocksPerSlab * (long) blockSize;
-        blockCache = new BlockCache(blurMetrics, directAllocation, totalMemory, slabSize, blockSize);
+        blockCache = new BlockCache(directAllocation, totalMemory, slabSize);
       } catch (OutOfMemoryError e) {
         if ("Direct buffer memory".equals(e.getMessage())) {
           System.err
@@ -145,7 +142,7 @@ public class ThriftBlurShardServer extends ThriftServer {
         }
         throw e;
       }
-      cache = new BlockDirectoryCache(blockCache, blurMetrics);
+      cache = new BlockDirectoryCache(blockCache);
     } else {
       cache = BlockDirectory.NO_CACHE;
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/BlurConstants.java b/src/blur-core/src/main/java/org/apache/blur/utils/BlurConstants.java
index cd0f0bc..73df878 100644
--- a/src/blur-core/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -25,9 +25,6 @@ import org.apache.blur.manager.results.BlurResultComparator;
 import org.apache.blur.manager.results.BlurResultPeekableIteratorComparator;
 import org.apache.blur.manager.results.PeekableIterator;
 import org.apache.blur.thrift.generated.BlurResult;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.Field.Index;
-import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.index.Term;
 
 
@@ -72,6 +69,7 @@ public class BlurConstants {
   public static final String BLUR_SHARD_OPENER_THREAD_COUNT = "blur.shard.opener.thread.count";
   public static final String BLUR_SHARD_INDEX_DELETION_POLICY_MAXAGE = "blur.shard.index.deletion.policy.maxage";
   public static final String BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE = "blur.zookeeper.system.time.tolerance";
+  public static final String BLUR_SAHRD_INDEX_SIMILARITY = "blur.sahrd.index.similarity";
 
   public static final String BLUR_SHARD_TIME_BETWEEN_COMMITS = "blur.shard.time.between.commits";
   public static final String BLUR_SHARD_TIME_BETWEEN_REFRESHS = "blur.shard.time.between.refreshs";
@@ -102,7 +100,6 @@ public class BlurConstants {
   public static final long ZK_WAIT_TIME = TimeUnit.SECONDS.toMillis(5);
 
   public static final Term PRIME_DOC_TERM = new Term(PRIME_DOC, BlurConstants.PRIME_DOC_VALUE);
-  public static final Field PRIME_DOC_FIELD = new Field(PRIME_DOC, PRIME_DOC_VALUE, Store.YES, Index.NOT_ANALYZED_NO_NORMS);
 
   static {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java b/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
index e0b4b9d..22cf6c4 100644
--- a/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
@@ -25,7 +25,6 @@ import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import java.lang.reflect.Field;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -49,7 +48,7 @@ import org.apache.blur.manager.clusterstatus.ZookeeperPathConstants;
 import org.apache.blur.manager.results.BlurResultIterable;
 import org.apache.blur.metrics.BlurMetrics;
 import org.apache.blur.metrics.BlurMetrics.MethodCall;
-import org.apache.blur.thrift.BException;
+import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurQuery;
 import org.apache.blur.thrift.generated.BlurResult;
 import org.apache.blur.thrift.generated.BlurResults;
@@ -63,20 +62,20 @@ import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.thrift.generated.RowMutationType;
 import org.apache.blur.thrift.generated.Selector;
 import org.apache.blur.thrift.generated.SimpleQuery;
-import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.AtomicReader;
 import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.SegmentReader;
-import org.apache.lucene.search.Filter;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.Sort;
-import org.apache.lucene.util.PagedBytes.PagedBytesDataInput;
-import org.apache.lucene.util.packed.PackedInts;
-import org.apache.lucene.util.packed.PackedInts.Reader;
+import org.apache.lucene.index.SlowCompositeReaderWrapper;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.util.RamUsageEstimator;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TJSONProtocol;
@@ -95,6 +94,7 @@ public class BlurUtil {
   private static final Class<?>[] EMPTY_PARAMETER_TYPES = new Class[] {};
   private static final Log LOG = LogFactory.getLog(BlurUtil.class);
   private static final String UNKNOWN = "UNKNOWN";
+  private static final int ALL = Integer.MAX_VALUE;
 
   @SuppressWarnings("unchecked")
   public static <T extends Iface> T recordMethodCallsAndAverageTimes(final BlurMetrics metrics, final T t, Class<T> clazz) {
@@ -369,69 +369,6 @@ public class BlurUtil {
     return results;
   }
 
-  public static Query readQuery(byte[] bs) throws BException {
-    return readObject(bs);
-  }
-
-  public static byte[] writeQuery(Query query) throws BException {
-    return writeObject(query);
-  }
-
-  public static Sort readSort(byte[] bs) throws BException {
-    return readObject(bs);
-  }
-
-  public static byte[] writeSort(Sort sort) throws BException {
-    return writeObject(sort);
-  }
-
-  public static Filter readFilter(byte[] bs) throws BException {
-    return readObject(bs);
-  }
-
-  public static byte[] writeFilter(Filter filter) throws BException {
-    return writeObject(filter);
-  }
-
-  private static byte[] writeObject(Serializable o) throws BException {
-    if (o == null) {
-      return null;
-    }
-    try {
-      ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-      ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream);
-      outputStream.writeObject(o);
-      outputStream.close();
-      return byteArrayOutputStream.toByteArray();
-    } catch (IOException e) {
-      throw new BException("Unknown error", e);
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  private static <T> T readObject(byte[] bs) throws BException {
-    if (bs == null) {
-      return null;
-    }
-    ObjectInputStream inputStream = null;
-    try {
-      inputStream = new ObjectInputStream(new ByteArrayInputStream(bs));
-      return (T) inputStream.readObject();
-    } catch (IOException e) {
-      throw new BException("Unknown error", e);
-    } catch (ClassNotFoundException e) {
-      throw new BException("Unknown error", e);
-    } finally {
-      if (inputStream != null) {
-        try {
-          inputStream.close();
-        } catch (IOException e) {
-          throw new BException("Unknown error", e);
-        }
-      }
-    }
-  }
-
   public static void setStartTime(BlurQuery query) {
     if (query.startTime == 0) {
       query.startTime = System.currentTimeMillis();
@@ -543,82 +480,9 @@ public class BlurUtil {
     return seconds / TimeUnit.HOURS.toSeconds(1);
   }
 
-  @SuppressWarnings("unchecked")
   public static long getMemoryUsage(IndexReader r) {
-    try {
-      if (r instanceof SegmentReader) {
-        long size = 0;
-        SegmentReader segmentReader = (SegmentReader) r;
-        Object segmentCoreReaders = getSegmentCoreReaders(segmentReader);
-        Object termInfosReader = getTermInfosReader(segmentCoreReaders);
-        Object termInfosReaderIndex = getTermInfosReaderIndex(termInfosReader);
-        PagedBytesDataInput dataInput = getDataInput(termInfosReaderIndex);
-        PackedInts.Reader indexToDataOffset = getIndexToDataOffset(termInfosReaderIndex);
-
-        Object pagedBytes = BlurUtil.getField("this$0", dataInput);
-        List<byte[]> blocks = (List<byte[]>) BlurUtil.getField("blocks", pagedBytes);
-        for (byte[] block : blocks) {
-          size += block.length;
-        }
-
-        try {
-          Class<? extends Reader> clazz = indexToDataOffset.getClass();
-          Method method = clazz.getMethod("ramBytesUsed", EMPTY_PARAMETER_TYPES);
-          method.setAccessible(true);
-          Long ramBytesUsed = (Long) method.invoke(indexToDataOffset, EMPTY_OBJECT_ARRAY);
-          size += ramBytesUsed;
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-        return size;
-      }
-      IndexReader[] readers = r.getSequentialSubReaders();
-      long total = 0;
-      if (readers != null) {
-        for (IndexReader reader : readers) {
-          total += getMemoryUsage(reader);
-        }
-      }
-      return total;
-    } catch (Exception e) {
-      LOG.error("Unknown error during getMemoryUsage call", e);
-      return 0;
-    }
-  }
-
-  private static PackedInts.Reader getIndexToDataOffset(Object termInfosReaderIndex) {
-    return (Reader) getField("indexToDataOffset", termInfosReaderIndex);
-  }
-
-  private static PagedBytesDataInput getDataInput(Object termInfosReaderIndex) {
-    return (PagedBytesDataInput) getField("dataInput", termInfosReaderIndex);
-  }
-
-  private static Object getTermInfosReaderIndex(Object termInfosReader) {
-    return getField("index", termInfosReader);
-  }
-
-  private static Object getTermInfosReader(Object segmentCoreReaders) {
-    return getField("tis", segmentCoreReaders);
-  }
-
-  private static Object getSegmentCoreReaders(SegmentReader segmentReader) {
-    return getField("core", segmentReader, SegmentReader.class);
-  }
-
-  private static Object getField(String name, Object o) {
-    Class<? extends Object> clazz = o.getClass();
-    return getField(name, o, clazz);
-  }
-
-  private static Object getField(String name, Object o, Class<? extends Object> clazz) {
-    try {
-      Field field = clazz.getDeclaredField(name);
-      field.setAccessible(true);
-      return field.get(o);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
+    long sizeOf = RamUsageEstimator.sizeOf(r);
+    return sizeOf;
   }
 
   public static void createPath(ZooKeeper zookeeper, String path, byte[] data) throws KeeperException, InterruptedException {
@@ -655,7 +519,7 @@ public class BlurUtil {
       String name = path.getName();
       if (name.startsWith(SHARD_PREFIX)) {
         int index = name.indexOf('-');
-        String shardIndexStr = name.substring(index+1);
+        String shardIndexStr = name.substring(index + 1);
         int shardIndex = Integer.parseInt(shardIndexStr);
         if (shardIndex >= shardCount) {
           LOG.error("Number of directories in table path [" + path + "] exceeds definition of [" + shardCount + "] shard count.");
@@ -776,4 +640,33 @@ public class BlurUtil {
     recordMutation.setRecordMutationType(RecordMutationType.REPLACE_ENTIRE_RECORD);
     return recordMutation;
   }
+
+  /**
+   * NOTE: This is a potentially dangerous call, it will return all the
+   * documents that match the term.
+   * 
+   * @throws IOException
+   */
+  public static List<Document> termSearch(IndexReader reader, Term term, ResetableDocumentStoredFieldVisitor fieldSelector) throws IOException {
+    IndexSearcher indexSearcher = new IndexSearcher(reader);
+    TopDocs topDocs = indexSearcher.search(new TermQuery(term), ALL);
+    List<Document> docs = new ArrayList<Document>();
+    for (int i = 0; i < topDocs.totalHits; i++) {
+      int doc = topDocs.scoreDocs[i].doc;
+      indexSearcher.doc(doc, fieldSelector);
+      docs.add(fieldSelector.getDocument());
+      fieldSelector.reset();
+    }
+    return docs;
+  }
+
+  public static AtomicReader getAtomicReader(IndexReader reader) throws IOException {
+    return SlowCompositeReaderWrapper.wrap(reader);
+  }
+
+  public static int getShardIndex(String shard) {
+    int index = shard.indexOf('-');
+    return Integer.parseInt(shard.substring(index + 1));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/utils/PrimeDocCache.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/PrimeDocCache.java b/src/blur-core/src/main/java/org/apache/blur/utils/PrimeDocCache.java
index 4ca0e7d..f10502d 100644
--- a/src/blur-core/src/main/java/org/apache/blur/utils/PrimeDocCache.java
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/PrimeDocCache.java
@@ -22,9 +22,13 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.lucene.index.AtomicReaderContext;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexReader.ReaderClosedListener;
-import org.apache.lucene.index.TermDocs;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.util.OpenBitSet;
 
 
@@ -54,13 +58,31 @@ public class PrimeDocCache {
         }
       });
       LOG.debug("Prime Doc BitSet missing for segment [" + reader + "] current size [" + primeDocMap.size() + "]");
-      bitSet = new OpenBitSet(reader.maxDoc());
-      primeDocMap.put(key, bitSet);
-      TermDocs termDocs = reader.termDocs(BlurConstants.PRIME_DOC_TERM);
-      while (termDocs.next()) {
-        bitSet.set(termDocs.doc());
-      }
-      termDocs.close();
+      final OpenBitSet bs = new OpenBitSet(reader.maxDoc());
+      primeDocMap.put(key, bs);
+      IndexSearcher searcher = new IndexSearcher(reader);
+      searcher.search(new TermQuery(BlurConstants.PRIME_DOC_TERM), new Collector() {
+        
+        @Override
+        public void setScorer(Scorer scorer) throws IOException {
+          
+        }
+        
+        @Override
+        public void setNextReader(AtomicReaderContext atomicReaderContext) throws IOException {
+        }
+        
+        @Override
+        public void collect(int doc) throws IOException {
+          bs.set(doc);
+        }
+        
+        @Override
+        public boolean acceptsDocsOutOfOrder() {
+          return false;
+        }
+      });
+      return bs;
     }
     return bitSet;
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/utils/ResetableDocumentStoredFieldVisitor.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/ResetableDocumentStoredFieldVisitor.java b/src/blur-core/src/main/java/org/apache/blur/utils/ResetableDocumentStoredFieldVisitor.java
new file mode 100644
index 0000000..24fee3e
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/ResetableDocumentStoredFieldVisitor.java
@@ -0,0 +1,118 @@
+package org.apache.blur.utils;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.FieldType;
+import org.apache.lucene.document.StoredField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.StoredFieldVisitor;
+
+/** A {@link StoredFieldVisitor} that creates a {@link
+ *  Document} containing all stored fields, or only specific
+ *  requested fields provided to {@link #DocumentStoredFieldVisitor(Set)}.
+ *  <p>
+ *  This is used by {@link IndexReader#document(int)} to load a
+ *  document.
+ *
+ * @lucene.experimental */
+
+public class ResetableDocumentStoredFieldVisitor extends StoredFieldVisitor {
+  private Document doc = new Document();
+  private final Set<String> fieldsToAdd;
+
+  /** Load only fields named in the provided <code>Set&lt;String&gt;</code>. */
+  public ResetableDocumentStoredFieldVisitor(Set<String> fieldsToAdd) {
+    this.fieldsToAdd = fieldsToAdd;
+  }
+
+  /** Load only fields named in the provided <code>Set&lt;String&gt;</code>. */
+  public ResetableDocumentStoredFieldVisitor(String... fields) {
+    fieldsToAdd = new HashSet<String>(fields.length);
+    for(String field : fields) {
+      fieldsToAdd.add(field);
+    }
+  }
+
+  /** Load all stored fields. */
+  public ResetableDocumentStoredFieldVisitor() {
+    this.fieldsToAdd = null;
+  }
+
+  @Override
+  public void binaryField(FieldInfo fieldInfo, byte[] value) throws IOException {
+    doc.add(new StoredField(fieldInfo.name, value));
+  }
+
+  @Override
+  public void stringField(FieldInfo fieldInfo, String value) throws IOException {
+    final FieldType ft = new FieldType(TextField.TYPE_STORED);
+    ft.setStoreTermVectors(fieldInfo.hasVectors());
+    ft.setIndexed(fieldInfo.isIndexed());
+    ft.setOmitNorms(fieldInfo.omitsNorms());
+    ft.setIndexOptions(fieldInfo.getIndexOptions());
+    doc.add(new Field(fieldInfo.name, value, ft));
+  }
+
+  @Override
+  public void intField(FieldInfo fieldInfo, int value) {
+    doc.add(new StoredField(fieldInfo.name, value));
+  }
+
+  @Override
+  public void longField(FieldInfo fieldInfo, long value) {
+    doc.add(new StoredField(fieldInfo.name, value));
+  }
+
+  @Override
+  public void floatField(FieldInfo fieldInfo, float value) {
+    doc.add(new StoredField(fieldInfo.name, value));
+  }
+
+  @Override
+  public void doubleField(FieldInfo fieldInfo, double value) {
+    doc.add(new StoredField(fieldInfo.name, value));
+  }
+
+  @Override
+  public Status needsField(FieldInfo fieldInfo) throws IOException {
+    return fieldsToAdd == null || fieldsToAdd.contains(fieldInfo.name) ? Status.YES : Status.NO;
+  }
+
+  /**
+   * Retrieve the visited document.
+   * @return Document populated with stored fields. Note that only
+   *         the stored information in the field instances is valid,
+   *         data such as boosts, indexing options, term vector options,
+   *         etc is not set.
+   */
+  public Document getDocument() {
+    return doc;
+  }
+  
+  public void reset() {
+    doc = new Document();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/utils/RowDocumentUtil.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/RowDocumentUtil.java b/src/blur-core/src/main/java/org/apache/blur/utils/RowDocumentUtil.java
index 25bca7d..34f1955 100644
--- a/src/blur-core/src/main/java/org/apache/blur/utils/RowDocumentUtil.java
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/RowDocumentUtil.java
@@ -26,7 +26,7 @@ import org.apache.blur.thrift.generated.FetchRecordResult;
 import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.thrift.generated.Row;
 import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.index.IndexableField;
 
 
 public class RowDocumentUtil {
@@ -70,7 +70,7 @@ public class RowDocumentUtil {
   public static String readRecord(Document document, ReaderBlurRecord reader) {
     String rowId = null;
     String family = null;
-    for (Fieldable field : document.getFields()) {
+    for (IndexableField field : document.getFields()) {
       if (field.name().equals(ROW_ID)) {
         rowId = field.stringValue();
       } else if (field.name().equals(RECORD_ID)) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/utils/RowIndexWriter.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/RowIndexWriter.java b/src/blur-core/src/main/java/org/apache/blur/utils/RowIndexWriter.java
index 314c399..9b98a46 100644
--- a/src/blur-core/src/main/java/org/apache/blur/utils/RowIndexWriter.java
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/RowIndexWriter.java
@@ -94,7 +94,7 @@ public class RowIndexWriter {
     document.add(new Field(RECORD_ID, recordId, Store.YES, Index.NOT_ANALYZED_NO_NORMS));
     if (addColumns(document, _analyzer, builder, family, record.columns)) {
       if (!primeDocSet) {
-        document.add(BlurConstants.PRIME_DOC_FIELD);
+        document.add(new Field(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO, Index.NOT_ANALYZED_NO_NORMS));
         primeDocSet = true;
       }
       documents.add(document);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a0b462f/src/blur-core/src/main/java/org/apache/blur/utils/TermDocIterable.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/TermDocIterable.java b/src/blur-core/src/main/java/org/apache/blur/utils/TermDocIterable.java
index 8312360..e47cba8 100644
--- a/src/blur-core/src/main/java/org/apache/blur/utils/TermDocIterable.java
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/TermDocIterable.java
@@ -20,18 +20,27 @@ import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.lucene.document.Document;
-import org.apache.lucene.document.FieldSelector;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.TermDocs;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.DocsEnum;
+import org.apache.lucene.index.MultiFields;
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.util.Bits;
 
 public class TermDocIterable implements Iterable<Document> {
 
-  private TermDocs termDocs;
-  private IndexReader reader;
-  private FieldSelector fieldSelector;
+  private DocsEnum docsEnum;
+  private AtomicReader reader;
+  private ResetableDocumentStoredFieldVisitor fieldSelector;
 
-  public TermDocIterable(TermDocs termDocs, IndexReader reader, FieldSelector fieldSelector) {
-    this.termDocs = termDocs;
+  public TermDocIterable(DocsEnum docsEnum, AtomicReader reader) {
+    this(docsEnum, reader, new ResetableDocumentStoredFieldVisitor());
+  }
+
+  public TermDocIterable(DocsEnum docsEnum, AtomicReader reader, ResetableDocumentStoredFieldVisitor fieldSelector) {
+    if (docsEnum == null) {
+      throw new NullPointerException("docsEnum can not be null.");
+    }
+    this.docsEnum = docsEnum;
     this.reader = reader;
     this.fieldSelector = fieldSelector;
   }
@@ -66,23 +75,24 @@ public class TermDocIterable implements Iterable<Document> {
   }
 
   private Document getDoc() throws IOException {
-    return reader.document(termDocs.doc(), fieldSelector);
+    fieldSelector.reset();
+    reader.document(docsEnum.docID(), fieldSelector);
+    return fieldSelector.getDocument();
   }
 
   private boolean getNext() {
     try {
-      boolean next = termDocs.next();
-      if (!next) {
-        termDocs.close();
-        return next;
+      int next = docsEnum.nextDoc();
+      if (next == DocIdSetIterator.NO_MORE_DOCS) {
+        return false;
       }
-      while (reader.isDeleted(termDocs.doc())) {
-        next = termDocs.next();
-      }
-      if (!next) {
-        termDocs.close();
+      Bits liveDocs = MultiFields.getLiveDocs(reader);
+      if (liveDocs != null) {
+        while (!liveDocs.get(docsEnum.docID())) {
+          next = docsEnum.nextDoc();
+        }
       }
-      return next;
+      return next == DocIdSetIterator.NO_MORE_DOCS ? false : true;
     } catch (IOException e) {
       throw new RuntimeException(e);
     }


Mime
View raw message