incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [02/13] git commit: Second patch of updates.
Date Tue, 30 Aug 2016 01:57:47 GMT
Second patch of updates.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/01416562
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/01416562
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/01416562

Branch: refs/heads/master
Commit: 014165621b11c6c1814d5233baae9f85f99c6ae6
Parents: 0e8d0e8
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sat May 7 13:10:19 2016 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sat May 7 13:10:19 2016 -0400

----------------------------------------------------------------------
 .../apache/blur/command/BaseCommandManager.java |  23 ++-
 .../java/org/apache/blur/command/Command.java   |   2 +-
 .../blur/command/ControllerClusterContext.java  |   6 +-
 .../org/apache/blur/command/ResponseFuture.java |   4 +
 .../blur/manager/stats/MergerTableStats.java    |   2 +-
 .../manager/writer/BlurIndexSimpleWriter.java   |  72 ++++----
 .../blur/manager/writer/IndexImporter.java      | 130 +++++++++++---
 .../writer/SnapshotIndexDeletionPolicy.java     |  81 +++++----
 .../blur/thrift/BlurControllerServer.java       |  80 +++------
 .../org/apache/blur/thrift/BlurShardServer.java |   6 +-
 .../blur/thrift/ThriftBlurShardServer.java      |   2 +-
 .../java/org/apache/blur/utils/GCWatcher.java   |  28 ++-
 .../org/apache/blur/utils/GCWatcherJdk6.java    |   2 +-
 .../blur/manager/writer/IndexImporterTest.java  |  17 ++
 .../index/FilterAccessControlFactory.java       |   1 +
 ...etDocumentVisibilityFilterCacheStrategy.java | 172 ++++++++++++++++++-
 16 files changed, 448 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
index ad542ef..be92e34 100644
--- a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
@@ -134,19 +134,20 @@ public abstract class BaseCommandManager implements Closeable {
   }
 
   public CommandStatus getCommandStatus(String commandExecutionId) {
-    CommandStatus cso = findCommandStatusObject(commandExecutionId, _workerRunningMap.values());
-    if (cso != null) {
-      return cso;
-    }
-    return findCommandStatusObject(commandExecutionId, _driverRunningMap.values());
+    CommandStatus cso1 = findCommandStatusObject(commandExecutionId, _workerRunningMap.values());
+    CommandStatus cso2 = findCommandStatusObject(commandExecutionId, _driverRunningMap.values());
+    return CommandStatusUtil.mergeCommandStatus(cso1, cso2);
   }
 
   private CommandStatus findCommandStatusObject(String commandExecutionId, Collection<ResponseFuture<?>> values) {
     Map<String, Map<CommandStatusState, Long>> serverStateMap = new HashMap<String, Map<CommandStatusState, Long>>();
     CommandStatus commandStatus = null;
     for (ResponseFuture<?> responseFuture : values) {
+      if (responseFuture == null) {
+        continue;
+      }
       Command<?> commandExecuting = responseFuture.getCommandExecuting();
-      if (commandExecuting.getCommandExecutionId().equals(commandExecutionId)) {
+      if (commandExecutionId.equals(commandExecuting.getCommandExecutionId())) {
         if (commandStatus == null) {
           CommandStatus originalCommandStatusObject = responseFuture.getOriginalCommandStatusObject();
           String commandName = responseFuture.getCommandExecuting().getName();
@@ -182,7 +183,10 @@ public abstract class BaseCommandManager implements Closeable {
     List<String> result = new ArrayList<String>();
     for (ResponseFuture<?> responseFuture : values) {
       Command<?> commandExecuting = responseFuture.getCommandExecuting();
-      result.add(commandExecuting.getCommandExecutionId());
+      String commandExecutionId = commandExecuting.getCommandExecutionId();
+      if (commandExecutionId != null) {
+        result.add(commandExecutionId);
+      }
     }
     return result;
   }
@@ -400,11 +404,12 @@ public abstract class BaseCommandManager implements Closeable {
   }
 
   protected Response submitDriverCallable(Callable<Response> callable, Command<?> commandExecuting,
-      CommandStatus originalCommandStatusObject, AtomicBoolean running) throws IOException, TimeoutException, ExceptionCollector {
+      CommandStatus originalCommandStatusObject, AtomicBoolean running) throws IOException, TimeoutException,
+      ExceptionCollector {
     Future<Response> future = _executorServiceDriver.submit(callable);
     Long instanceExecutionId = getInstanceExecutionId();
     _driverRunningMap.put(instanceExecutionId, new ResponseFuture<Response>(_runningCacheTombstoneTime, future,
-        commandExecuting, originalCommandStatusObject,running));
+        commandExecuting, originalCommandStatusObject, running));
     try {
       return future.get(_connectionTimeout, TimeUnit.MILLISECONDS);
     } catch (CancellationException e) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/command/Command.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/Command.java b/blur-core/src/main/java/org/apache/blur/command/Command.java
index 9cf2719..ff6f559 100644
--- a/blur-core/src/main/java/org/apache/blur/command/Command.java
+++ b/blur-core/src/main/java/org/apache/blur/command/Command.java
@@ -30,7 +30,7 @@ import org.apache.blur.thrift.generated.Blur.Iface;
 
 public abstract class Command<R> implements Cloneable {
 
-  @OptionalArgument("The ")
+  @OptionalArgument
   private String commandExecutionId;
 
   public abstract String getName();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
index b7a1a63..59f5b7c 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
@@ -23,6 +23,7 @@ import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thrift.BlurClientManager;
 import org.apache.blur.thrift.ClientPool;
 import org.apache.blur.thrift.Connection;
+import org.apache.blur.thrift.UserConverter;
 import org.apache.blur.thrift.generated.Arguments;
 import org.apache.blur.thrift.generated.Blur.Client;
 import org.apache.blur.thrift.generated.BlurException;
@@ -31,6 +32,7 @@ import org.apache.blur.thrift.generated.Response;
 import org.apache.blur.thrift.generated.TimeoutException;
 import org.apache.blur.thrift.generated.ValueObject;
 import org.apache.blur.trace.Tracer;
+import org.apache.blur.user.UserContext;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -134,7 +136,7 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl
 
     final Arguments arguments = _manager.toArguments(command);
 
-    CommandStatus originalCommandStatusObject = new CommandStatus(null, command.getName(), arguments, null, null);
+    CommandStatus originalCommandStatusObject = new CommandStatus(null, command.getName(), arguments, null, UserConverter.toThriftUser(UserContext.getUser()));
     for (Entry<Server, Client> e : clientMap.entrySet()) {
       Server server = e.getKey();
       final Client client = e.getValue();
@@ -226,7 +228,7 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl
     Set<Shard> shards = command.routeShards(this, tables);
     Map<Server, Client> clientMap = getClientMap(command, tables, shards);
     final Arguments arguments = _manager.toArguments(command);
-    CommandStatus originalCommandStatusObject = new CommandStatus(null, command.getName(), arguments, null, null);
+    CommandStatus originalCommandStatusObject = new CommandStatus(null, command.getName(), arguments, null, UserConverter.toThriftUser(UserContext.getUser()));
     for (Entry<Server, Client> e : clientMap.entrySet()) {
       Server server = e.getKey();
       final Client client = e.getValue();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java b/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java
index a5a629e..ef4a046 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java
@@ -16,6 +16,7 @@
  */
 package org.apache.blur.command;
 
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -39,6 +40,9 @@ public class ResponseFuture<T> implements Future<T> {
     _tombstone = tombstone;
     _future = future;
     _commandExecuting = commandExecuting;
+    if (_commandExecuting.getCommandExecutionId() == null) {
+      _commandExecuting.setCommandExecutionId(UUID.randomUUID().toString());
+    }
     _originalCommandStatusObject = originalCommandStatusObject;
     _running = running;
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java b/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java
index 2f89c9e..4ac5631 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java
@@ -46,7 +46,7 @@ public class MergerTableStats implements Merger<TableStats> {
 
   private TableStats merge(TableStats s1, TableStats s2) {
     s1.tableName = s2.tableName;
-    s1.bytes = Math.max(s1.bytes, s2.bytes);
+    s1.bytes = s1.bytes + s2.bytes;
     s1.recordCount = s1.recordCount + s2.recordCount;
     s1.rowCount = s1.rowCount + s2.rowCount;
     s1.segmentImportInProgressCount = s1.segmentImportInProgressCount + s2.segmentImportInProgressCount;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index e21a952..ff17e27 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -25,7 +25,6 @@ import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_WRITER_SORT_M
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_QUEUE_MAX_INMEMORY_LENGTH;
 
 import java.io.Closeable;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -77,6 +76,7 @@ import org.apache.blur.user.User;
 import org.apache.blur.user.UserContext;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -428,31 +428,31 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   }
 
   private void closeWriter() {
-    if (_lastWrite.get() + _maxWriterIdle < System.currentTimeMillis()) {
-      synchronized (_writer) {
-        _writeLock.lock();
-        try {
-          BlurIndexWriter writer = _writer.getAndSet(null);
-          if (writer != null) {
-            LOG.info("Closing idle writer for table [{0}] shard [{1}]", _tableContext.getTable(),
-                _shardContext.getShard());
-            IOUtils.cleanup(LOG, writer);
-          }
-        } finally {
-          _writeLock.unlock();
+    _writeLock.lock();
+    try {
+      if (_lastWrite.get() + _maxWriterIdle < System.currentTimeMillis()) {
+        BlurIndexWriter writer = _writer.getAndSet(null);
+        if (writer != null) {
+          LOG.info("Closing idle writer for table [{0}] shard [{1}]", _tableContext.getTable(),
+              _shardContext.getShard());
+          IOUtils.cleanup(LOG, writer);
         }
       }
+    } finally {
+      _writeLock.unlock();
     }
   }
 
+  /**
+   * Testing only.
+   */
   protected boolean isWriterClosed() {
-    synchronized (_writer) {
-      return _writer.get() == null;
-    }
+    return _writer.get() == null;
   }
 
   private BlurIndexWriter getBlurIndexWriter() throws IOException {
-    synchronized (_writer) {
+    _writeLock.lock();
+    try {
       BlurIndexWriter blurIndexWriter = _writer.get();
       if (blurIndexWriter == null) {
         blurIndexWriter = new BlurIndexWriter(_directory, _conf.clone());
@@ -460,12 +460,17 @@ public class BlurIndexSimpleWriter extends BlurIndex {
         _lastWrite.set(System.currentTimeMillis());
       }
       return blurIndexWriter;
+    } finally {
+      _writeLock.unlock();
     }
   }
 
   private void resetBlurIndexWriter() {
-    synchronized (_writer) {
+    _writeLock.lock();
+    try {
       _writer.set(null);
+    } finally {
+      _writeLock.unlock();
     }
   }
 
@@ -501,22 +506,12 @@ public class BlurIndexSimpleWriter extends BlurIndex {
 
   @Override
   public void createSnapshot(String name) throws IOException {
-    _writeLock.lock();
-    try {
-      _snapshotIndexDeletionPolicy.createSnapshot(name, _indexReader.get(), _context);
-    } finally {
-      _writeLock.unlock();
-    }
+    _snapshotIndexDeletionPolicy.createSnapshot(name, _indexReader.get(), _context);
   }
 
   @Override
   public void removeSnapshot(String name) throws IOException {
-    _writeLock.lock();
-    try {
-      _snapshotIndexDeletionPolicy.removeSnapshot(name, _context);
-    } finally {
-      _writeLock.unlock();
-    }
+    _snapshotIndexDeletionPolicy.removeSnapshot(name, _context);
   }
 
   @Override
@@ -1024,17 +1019,10 @@ public class BlurIndexSimpleWriter extends BlurIndex {
 
   @Override
   public long getOnDiskSize() throws IOException {
-    long total = 0;
-    String[] listAll = _directory.listAll();
-    for (String name : listAll) {
-      try {
-        total += _directory.fileLength(name);
-      } catch (FileNotFoundException e) {
-        // If file is not found that means that is was removed between the time
-        // we started iterating over the file names and when we asked for it's
-        // size.
-      }
-    }
-    return total;
+    Path hdfsDirPath = _shardContext.getHdfsDirPath();
+    Configuration configuration = _tableContext.getConfiguration();
+    FileSystem fileSystem = hdfsDirPath.getFileSystem(configuration);
+    ContentSummary contentSummary = fileSystem.getContentSummary(hdfsDirPath);
+    return contentSummary.getLength();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
index 42171e8..33db0ae 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
@@ -36,6 +36,7 @@ import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.search.IndexSearcherCloseable;
 import org.apache.blur.manager.BlurPartitioner;
+import org.apache.blur.manager.writer.MergeSortRowIdLookup.Action;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.server.cache.ThriftCache;
@@ -54,12 +55,16 @@ import org.apache.lucene.index.AtomicReader;
 import org.apache.lucene.index.AtomicReaderContext;
 import org.apache.lucene.index.CompositeReaderContext;
 import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.DocsEnum;
 import org.apache.lucene.index.Fields;
+import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.index.Terms;
 import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.search.DocIdSetIterator;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
 
 public class IndexImporter extends TimerTask implements Closeable {
@@ -292,7 +297,9 @@ public class IndexImporter extends TimerTask implements Closeable {
       public void performMutate(IndexSearcherCloseable searcher, IndexWriter writer) throws IOException {
         LOG.info("About to import [{0}] into [{1}/{2}]", directory, _shard, _table);
         boolean emitDeletes = searcher.getIndexReader().numDocs() != 0;
-        applyDeletes(directory, writer, _shard, emitDeletes);
+        Configuration configuration = _shardContext.getTableContext().getConfiguration();
+
+        applyDeletes(directory, writer, searcher, _shard, emitDeletes, configuration);
         LOG.info("Add index [{0}] [{1}/{2}]", directory, _shard, _table);
         writer.addIndexes(directory);
         LOG.info("Removing delete markers [{0}] on [{1}/{2}]", directory, _shard, _table);
@@ -336,40 +343,113 @@ public class IndexImporter extends TimerTask implements Closeable {
     return result;
   }
 
-  private void applyDeletes(Directory directory, IndexWriter indexWriter, String shard, boolean emitDeletes)
-      throws IOException {
-    DirectoryReader reader = DirectoryReader.open(directory);
+  private void applyDeletes(Directory directory, IndexWriter indexWriter, IndexSearcherCloseable searcher,
+      String shard, boolean emitDeletes, Configuration configuration) throws IOException {
+    DirectoryReader newReader = DirectoryReader.open(directory);
     try {
-      LOG.info("Applying deletes in reader [{0}]", reader);
-      CompositeReaderContext compositeReaderContext = reader.getContext();
-      List<AtomicReaderContext> leaves = compositeReaderContext.leaves();
+      List<AtomicReaderContext> newLeaves = newReader.getContext().leaves();
       BlurPartitioner blurPartitioner = new BlurPartitioner();
       Text key = new Text();
       int numberOfShards = _shardContext.getTableContext().getDescriptor().getShardCount();
       int shardId = ShardUtil.getShardIndex(shard);
-      for (AtomicReaderContext context : leaves) {
-        AtomicReader atomicReader = context.reader();
-        Fields fields = atomicReader.fields();
-        Terms terms = fields.terms(BlurConstants.ROW_ID);
-        if (terms != null) {
-          TermsEnum termsEnum = terms.iterator(null);
-          BytesRef ref = null;
-          while ((ref = termsEnum.next()) != null) {
-            key.set(ref.bytes, ref.offset, ref.length);
-            int partition = blurPartitioner.getPartition(key, null, numberOfShards);
-            if (shardId != partition) {
-              throw new IOException("Index is corrupted, RowIds are found in wrong shard, partition [" + partition
-                  + "] does not shard [" + shardId + "], this can happen when rows are not hashed correctly.");
-            }
-            if (emitDeletes) {
-              indexWriter.deleteDocuments(new Term(BlurConstants.ROW_ID, BytesRef.deepCopyOf(ref)));
-            }
+
+      Action action = new Action() {
+        @Override
+        public void found(AtomicReader reader, Bits liveDocs, TermsEnum termsEnum) throws IOException {
+          DocsEnum docsEnum = termsEnum.docs(liveDocs, null);
+          if (docsEnum.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
+            indexWriter.deleteDocuments(new Term(BlurConstants.ROW_ID, BytesRef.deepCopyOf(termsEnum.term())));
           }
         }
+      };
+
+      LOG.info("Applying deletes for table [{0}] shard [{1}] new reader [{2}]", _table, shard, newReader);
+      boolean skipCheckRowIds = isInternal(newReader);
+      LOG.info("Skip rowid check [{0}] for table [{1}] shard [{2}] new reader [{3}]", skipCheckRowIds, _table, shard,
+          newReader);
+      for (AtomicReaderContext context : newLeaves) {
+        AtomicReader newAtomicReader = context.reader();
+        if (isFastRowIdDeleteSupported(newAtomicReader)) {
+          runNewRowIdCheckAndDelete(indexWriter, emitDeletes, blurPartitioner, key, numberOfShards, shardId,
+              newAtomicReader, skipCheckRowIds);
+        } else {
+          runOldMergeSortRowIdCheckAndDelete(emitDeletes, searcher.getIndexReader(), blurPartitioner, key,
+              numberOfShards, shardId, action, newAtomicReader);
+        }
       }
     } finally {
-      reader.close();
+      newReader.close();
+    }
+  }
+
+  private boolean isInternal(DirectoryReader reader) throws IOException {
+    Map<String, String> map = reader.getIndexCommit().getUserData();
+    return BlurConstants.INTERNAL.equals(map.get(BlurConstants.INTERNAL));
+  }
+
+  private void runNewRowIdCheckAndDelete(IndexWriter indexWriter, boolean emitDeletes, BlurPartitioner blurPartitioner,
+      Text key, int numberOfShards, int shardId, AtomicReader atomicReader, boolean skipCheckRowIds) throws IOException {
+    Fields fields = atomicReader.fields();
+    if (skipCheckRowIds) {
+      Terms rowIdTerms = fields.terms(BlurConstants.ROW_ID);
+      if (rowIdTerms != null) {
+        LOG.info("Checking rowIds for import on table [{0}] shard [{1}]", _table, _shard);
+        TermsEnum rowIdTermsEnum = rowIdTerms.iterator(null);
+        BytesRef ref = null;
+        while ((ref = rowIdTermsEnum.next()) != null) {
+          key.set(ref.bytes, ref.offset, ref.length);
+          int partition = blurPartitioner.getPartition(key, null, numberOfShards);
+          if (shardId != partition) {
+            throw new IOException("Index is corrupted, RowIds are found in wrong shard, partition [" + partition
+                + "] does not shard [" + shardId + "], this can happen when rows are not hashed correctly.");
+          }
+        }
+      }
+    }
+    if (emitDeletes) {
+      Terms rowIdsToDeleteTerms = fields.terms(BlurConstants.UPDATE_ROW);
+      if (rowIdsToDeleteTerms != null) {
+        LOG.info("Performing deletes on rowIds for import on table [{0}] shard [{1}]", _table, _shard);
+        TermsEnum rowIdsToDeleteTermsEnum = rowIdsToDeleteTerms.iterator(null);
+        BytesRef ref = null;
+        while ((ref = rowIdsToDeleteTermsEnum.next()) != null) {
+          indexWriter.deleteDocuments(new Term(BlurConstants.ROW_ID, BytesRef.deepCopyOf(ref)));
+        }
+      }
+    }
+  }
+
+  private void runOldMergeSortRowIdCheckAndDelete(boolean emitDeletes, IndexReader currentIndexReader,
+      BlurPartitioner blurPartitioner, Text key, int numberOfShards, int shardId, Action action,
+      AtomicReader atomicReader) throws IOException {
+    MergeSortRowIdLookup lookup = new MergeSortRowIdLookup(currentIndexReader);
+    Fields fields = atomicReader.fields();
+    Terms terms = fields.terms(BlurConstants.ROW_ID);
+    if (terms != null) {
+      TermsEnum termsEnum = terms.iterator(null);
+      BytesRef ref = null;
+      while ((ref = termsEnum.next()) != null) {
+        key.set(ref.bytes, ref.offset, ref.length);
+        int partition = blurPartitioner.getPartition(key, null, numberOfShards);
+        if (shardId != partition) {
+          throw new IOException("Index is corrupted, RowIds are found in wrong shard, partition [" + partition
+              + "] does not shard [" + shardId + "], this can happen when rows are not hashed correctly.");
+        }
+        if (emitDeletes) {
+          lookup.lookup(ref, action);
+        }
+      }
+    }
+  }
+
+  private boolean isFastRowIdDeleteSupported(AtomicReader atomicReader) throws IOException {
+    if (atomicReader.fields().terms(BlurConstants.NEW_ROW) != null) {
+      return true;
+    }
+    if (atomicReader.fields().terms(BlurConstants.UPDATE_ROW) != null) {
+      return true;
     }
+    return false;
   }
 
   public void cleanupOldDirs() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java b/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java
index 30690e5..15d9272 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java
@@ -28,6 +28,8 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
@@ -54,6 +56,7 @@ public class SnapshotIndexDeletionPolicy extends IndexDeletionPolicy {
   private final Path _path;
   private final Map<String, Long> _namesToGenerations = new ConcurrentHashMap<String, Long>();
   private final Map<Long, Set<String>> _generationsToNames = new ConcurrentHashMap<Long, Set<String>>();
+  private final WriteLock _writeLock = new ReentrantReadWriteLock().writeLock();
 
   public SnapshotIndexDeletionPolicy(Configuration configuration, Path path) throws IOException {
     _configuration = configuration;
@@ -70,13 +73,18 @@ public class SnapshotIndexDeletionPolicy extends IndexDeletionPolicy {
 
   @Override
   public void onCommit(List<? extends IndexCommit> commits) throws IOException {
-    int size = commits.size();
-    for (int i = 0; i < size - 1; i++) {
-      IndexCommit indexCommit = commits.get(i);
-      long generation = indexCommit.getGeneration();
-      if (!_generationsToNames.containsKey(generation)) {
-        indexCommit.delete();
+    _writeLock.lock();
+    try {
+      int size = commits.size();
+      for (int i = 0; i < size - 1; i++) {
+        IndexCommit indexCommit = commits.get(i);
+        long generation = indexCommit.getGeneration();
+        if (!_generationsToNames.containsKey(generation)) {
+          indexCommit.delete();
+        }
       }
+    } finally {
+      _writeLock.unlock();
     }
   }
 
@@ -147,36 +155,46 @@ public class SnapshotIndexDeletionPolicy extends IndexDeletionPolicy {
   }
 
   public void createSnapshot(String name, DirectoryReader reader, String context) throws IOException {
-    if (_namesToGenerations.containsKey(name)) {
-      throw new IOException("Snapshot [" + name + "] already exists.");
-    }
-    LOG.info("Creating snapshot [{0}] in [{1}].", name, context);
-    IndexCommit indexCommit = reader.getIndexCommit();
-    long generation = indexCommit.getGeneration();
-    _namesToGenerations.put(name, generation);
-    Set<String> names = _generationsToNames.get(generation);
-    if (names == null) {
-      names = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
-      _generationsToNames.put(generation, names);
+    _writeLock.lock();
+    try {
+      if (_namesToGenerations.containsKey(name)) {
+        throw new IOException("Snapshot [" + name + "] already exists.");
+      }
+      LOG.info("Creating snapshot [{0}] in [{1}].", name, context);
+      IndexCommit indexCommit = reader.getIndexCommit();
+      long generation = indexCommit.getGeneration();
+      _namesToGenerations.put(name, generation);
+      Set<String> names = _generationsToNames.get(generation);
+      if (names == null) {
+        names = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+        _generationsToNames.put(generation, names);
+      }
+      names.add(name);
+      storeGenerations();
+    } finally {
+      _writeLock.unlock();
     }
-    names.add(name);
-    storeGenerations();
   }
 
   public void removeSnapshot(String name, String context) throws IOException {
-    Long gen = _namesToGenerations.get(name);
-    if (gen == null) {
-      LOG.info("Snapshot [{0}] does not exist in [{1}].", name, context);
-      return;
-    }
-    LOG.info("Removing snapshot [{0}] from [{1}].", name, context);
-    _namesToGenerations.remove(name);
-    Set<String> names = _generationsToNames.get(gen);
-    names.remove(name);
-    if (names.isEmpty()) {
-      _generationsToNames.remove(gen);
+    _writeLock.lock();
+    try {
+      Long gen = _namesToGenerations.get(name);
+      if (gen == null) {
+        LOG.info("Snapshot [{0}] does not exist in [{1}].", name, context);
+        return;
+      }
+      LOG.info("Removing snapshot [{0}] from [{1}].", name, context);
+      _namesToGenerations.remove(name);
+      Set<String> names = _generationsToNames.get(gen);
+      names.remove(name);
+      if (names.isEmpty()) {
+        _generationsToNames.remove(gen);
+      }
+      storeGenerations();
+    } finally {
+      _writeLock.unlock();
     }
-    storeGenerations();
   }
 
   public Collection<String> getSnapshots() {
@@ -194,5 +212,4 @@ public class SnapshotIndexDeletionPolicy extends IndexDeletionPolicy {
   public static Path getGenerationsPath(Path shardDir) {
     return new Path(shardDir, "generations");
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index 477c923..e4d29e0 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -51,6 +51,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
 import org.apache.blur.command.ArgumentOverlay;
 import org.apache.blur.command.BlurObject;
 import org.apache.blur.command.BlurObjectSerDe;
+import org.apache.blur.command.CommandStatusUtil;
 import org.apache.blur.command.CommandUtil;
 import org.apache.blur.command.ControllerCommandManager;
 import org.apache.blur.command.Response;
@@ -92,7 +93,6 @@ import org.apache.blur.thrift.generated.BlurResults;
 import org.apache.blur.thrift.generated.ColumnDefinition;
 import org.apache.blur.thrift.generated.CommandDescriptor;
 import org.apache.blur.thrift.generated.CommandStatus;
-import org.apache.blur.thrift.generated.CommandStatusState;
 import org.apache.blur.thrift.generated.ErrorType;
 import org.apache.blur.thrift.generated.FetchResult;
 import org.apache.blur.thrift.generated.HighlightOptions;
@@ -108,6 +108,7 @@ import org.apache.blur.thrift.generated.User;
 import org.apache.blur.trace.Trace;
 import org.apache.blur.trace.Trace.TraceId;
 import org.apache.blur.trace.Tracer;
+import org.apache.blur.user.UserContext;
 import org.apache.blur.utils.BlurExecutorCompletionService;
 import org.apache.blur.utils.BlurIterator;
 import org.apache.blur.utils.BlurUtil;
@@ -1514,7 +1515,8 @@ public class BlurControllerServer extends TableAdmin implements Iface {
       throws BlurException, TException {
     try {
       BlurObject args = CommandUtil.toBlurObject(arguments);
-      CommandStatus originalCommandStatusObject = new CommandStatus(null, commandName, arguments, null, null);
+      CommandStatus originalCommandStatusObject = new CommandStatus(null, commandName, arguments, null,
+          UserConverter.toThriftUser(UserContext.getUser()));
       Response response = _commandManager.execute(getTableContextFactory(), getLayoutFactory(), commandName,
           new ArgumentOverlay(args, _serDe), originalCommandStatusObject);
       return CommandUtil.fromObjectToThrift(response, _serDe);
@@ -1861,7 +1863,7 @@ public class BlurControllerServer extends TableAdmin implements Iface {
           }
         }));
       }
-      return new ArrayList<String>(result).subList(startingAt, Math.min(fetch, result.size()));
+      return new ArrayList<String>(result).subList(startingAt, startingAt + Math.min(fetch, result.size()));
     } catch (Exception e) {
       throw new BException(e.getMessage(), e);
     }
@@ -1876,7 +1878,15 @@ public class BlurControllerServer extends TableAdmin implements Iface {
         CommandStatus cs = scatterGather(cluster, new BlurCommand<CommandStatus>() {
           @Override
           public CommandStatus call(Client client) throws BlurException, TException {
-            return client.commandStatus(commandExecutionId);
+            try {
+              return client.commandStatus(commandExecutionId);
+            } catch (BlurException e) {
+              String message = e.getMessage();
+              if (message.startsWith("NOT_FOUND")) {
+                return null;
+              }
+              throw e;
+            }
           }
         }, new Merger<CommandStatus>() {
           @Override
@@ -1884,12 +1894,16 @@ public class BlurControllerServer extends TableAdmin implements Iface {
             CommandStatus commandStatus = null;
             while (service.getRemainingCount() > 0) {
               Future<CommandStatus> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true);
-              commandStatus = mergeCommandStatus(commandStatus, service.getResultThrowException(future));
+              commandStatus = CommandStatusUtil.mergeCommandStatus(commandStatus,
+                  service.getResultThrowException(future));
             }
             return commandStatus;
           }
         });
-        commandStatus = mergeCommandStatus(commandStatus, cs);
+        commandStatus = CommandStatusUtil.mergeCommandStatus(commandStatus, cs);
+      }
+      if (commandStatus == null) {
+        throw new BException("NOT_FOUND {0}", commandExecutionId);
       }
       return commandStatus;
     } catch (Exception e) {
@@ -1897,60 +1911,6 @@ public class BlurControllerServer extends TableAdmin implements Iface {
     }
   }
 
-  private static CommandStatus mergeCommandStatus(CommandStatus cs1, CommandStatus cs2) {
-    if (cs1 == null && cs2 == null) {
-      return null;
-    } else if (cs1 == null) {
-      return cs2;
-    } else if (cs2 == null) {
-      return cs1;
-    } else {
-      Map<String, Map<CommandStatusState, Long>> serverStateMap1 = cs1.getServerStateMap();
-      Map<String, Map<CommandStatusState, Long>> serverStateMap2 = cs2.getServerStateMap();
-      Map<String, Map<CommandStatusState, Long>> merge = mergeServerStateMap(serverStateMap1, serverStateMap2);
-      return new CommandStatus(cs1.getExecutionId(), cs1.getCommandName(), cs1.getArguments(), merge, cs1.getUser());
-    }
-  }
-
-  private static Map<String, Map<CommandStatusState, Long>> mergeServerStateMap(
-      Map<String, Map<CommandStatusState, Long>> serverStateMap1,
-      Map<String, Map<CommandStatusState, Long>> serverStateMap2) {
-    Map<String, Map<CommandStatusState, Long>> result = new HashMap<String, Map<CommandStatusState, Long>>();
-    Set<String> keys = new HashSet<String>();
-    keys.addAll(serverStateMap1.keySet());
-    keys.addAll(serverStateMap2.keySet());
-    for (String key : keys) {
-      Map<CommandStatusState, Long> css1 = serverStateMap2.get(key);
-      Map<CommandStatusState, Long> css2 = serverStateMap2.get(key);
-      result.put(key, mergeCommandStatusState(css1, css2));
-    }
-    return result;
-  }
-
-  private static Map<CommandStatusState, Long> mergeCommandStatusState(Map<CommandStatusState, Long> css1,
-      Map<CommandStatusState, Long> css2) {
-    if (css1 == null && css2 == null) {
-      return new HashMap<CommandStatusState, Long>();
-    } else if (css1 == null) {
-      return css2;
-    } else if (css2 == null) {
-      return css1;
-    } else {
-      Map<CommandStatusState, Long> result = new HashMap<CommandStatusState, Long>(css1);
-      for (Entry<CommandStatusState, Long> e : css2.entrySet()) {
-        CommandStatusState key = e.getKey();
-        Long l = result.get(key);
-        Long value = e.getValue();
-        if (l == null) {
-          result.put(key, value);
-        } else {
-          result.put(key, l + value);
-        }
-      }
-      return result;
-    }
-  }
-
   @Override
   public void commandCancel(String commandExecutionId) throws BlurException, TException {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index 08b4400..ff03210 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -684,7 +684,11 @@ public class BlurShardServer extends TableAdmin implements Iface {
   @Override
   public CommandStatus commandStatus(String commandExecutionId) throws BlurException, TException {
     try {
-      return _commandManager.getCommandStatus(commandExecutionId);
+      CommandStatus commandStatus = _commandManager.getCommandStatus(commandExecutionId);
+      if (commandStatus == null) {
+        throw new BException("NOT_FOUND {0}", commandExecutionId);
+      }
+      return commandStatus;
     } catch (Exception e) {
       throw new BException(e.getMessage(), e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index 0b4e290..46bd8b0 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -316,7 +316,7 @@ public class ThriftBlurShardServer extends ThriftServer {
     StreamServer streamServer;
     int streamThreadCount = configuration.getInt(BLUR_STREAM_SERVER_THREADS, 100);
     if (streamThreadCount > 0) {
-      StreamProcessor streamProcessor = new StreamProcessor(indexServer, tmpPath);
+      StreamProcessor streamProcessor = new StreamProcessor(indexServer, tmpPath, config);
       streamServer = new StreamServer(0, streamThreadCount, streamProcessor);
       streamServer.start();
       configuration.setInt(BLUR_STREAM_SERVER_RUNNING_PORT, streamServer.getPort());

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java b/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java
index c9c3774..3242931 100644
--- a/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java
+++ b/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java
@@ -16,24 +16,50 @@
  */
 package org.apache.blur.utils;
 
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryUsage;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
 
 public class GCWatcher {
 
   private static final String JAVA_VERSION = "java.version";
   private static final String _1_7 = "1.7";
+  private static final String _1_8 = "1.8";
   private static final boolean JDK7;
 
   static {
     Properties properties = System.getProperties();
     String javaVersion = properties.getProperty(JAVA_VERSION);
-    if (javaVersion.startsWith(_1_7)) {
+    if (javaVersion.startsWith(_1_7) || javaVersion.startsWith(_1_8)) {
       JDK7 = true;
     } else {
       JDK7 = false;
     }
   }
 
+  public static void main(String[] args) {
+    GCWatcher.init(0.50);
+
+    GCWatcher.registerAction(new GCAction() {
+      @Override
+      public void takeAction() throws Exception {
+        System.out.println("OOM");
+        System.exit(0);
+      }
+    });
+
+    List<byte[]> lst = new ArrayList<byte[]>();
+
+    while (true) {
+      lst.add(new byte[1_000_000]);
+      MemoryUsage heapMemoryUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+      System.out.println(heapMemoryUsage.getMax() + " " + heapMemoryUsage.getUsed());
+    }
+
+  }
+
   /**
    * Initializes the GCWatcher to watch for any garbage collection that leaves
    * more then the given ratio free. If more remains then all the given

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/main/java/org/apache/blur/utils/GCWatcherJdk6.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/utils/GCWatcherJdk6.java b/blur-core/src/main/java/org/apache/blur/utils/GCWatcherJdk6.java
index 03bf6bb..2eb9f56 100644
--- a/blur-core/src/main/java/org/apache/blur/utils/GCWatcherJdk6.java
+++ b/blur-core/src/main/java/org/apache/blur/utils/GCWatcherJdk6.java
@@ -151,7 +151,7 @@ public class GCWatcherJdk6 extends TimerTask {
         }
         _lastIndex = _gcInfo.getIndex();
       }
-    } catch (Exception e) {
+    } catch (Throwable e) {
       e.printStackTrace();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
index ae635c3..f46b184 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
@@ -41,12 +41,15 @@ import org.apache.blur.thrift.generated.Column;
 import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.document.Field;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
@@ -292,6 +295,20 @@ public class IndexImporterTest {
   }
 
   @Test
+  public void testIndexImporterWithCorrectRowIdShardCombinationWithFastImport() throws IOException {
+    List<Field> document = _fieldManager.getFields("1", genRecord("1"));
+    document.add(new StringField(BlurConstants.NEW_ROW, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+    _commitWriter.addDocument(document);
+    _commitWriter.commit();
+    _commitWriter.close();
+    _indexImporter.run();
+    assertFalse(_fileSystem.exists(_path));
+    assertFalse(_fileSystem.exists(_badRowIdsPath));
+    assertTrue(_fileSystem.exists(_inUsePath));
+    validateIndex();
+  }
+
+  @Test
   public void testIndexImporterWithWrongRowIdShardCombination() throws IOException {
     List<Field> document = _fieldManager.getFields("2", genRecord("1"));
     _commitWriter.addDocument(document);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-document-security/src/main/java/org/apache/blur/lucene/security/index/FilterAccessControlFactory.java
----------------------------------------------------------------------
diff --git a/blur-document-security/src/main/java/org/apache/blur/lucene/security/index/FilterAccessControlFactory.java b/blur-document-security/src/main/java/org/apache/blur/lucene/security/index/FilterAccessControlFactory.java
index 059ad05..ea7e5ad 100644
--- a/blur-document-security/src/main/java/org/apache/blur/lucene/security/index/FilterAccessControlFactory.java
+++ b/blur-document-security/src/main/java/org/apache/blur/lucene/security/index/FilterAccessControlFactory.java
@@ -338,6 +338,7 @@ public class FilterAccessControlFactory extends AccessControlFactory {
       }
       List<IndexableField> result = new ArrayList<IndexableField>();
       for (IndexableField field : fields) {
+        // If field is to be indexed and is to be read masked.
         if (fieldsToMask.contains(field.name())) {
           // If field is a doc value, then don't bother indexing.
           if (!isDocValue(field)) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01416562/blur-document-security/src/main/java/org/apache/blur/lucene/security/search/BitSetDocumentVisibilityFilterCacheStrategy.java
----------------------------------------------------------------------
diff --git a/blur-document-security/src/main/java/org/apache/blur/lucene/security/search/BitSetDocumentVisibilityFilterCacheStrategy.java b/blur-document-security/src/main/java/org/apache/blur/lucene/security/search/BitSetDocumentVisibilityFilterCacheStrategy.java
index 8a142cf..37e17a6 100644
--- a/blur-document-security/src/main/java/org/apache/blur/lucene/security/search/BitSetDocumentVisibilityFilterCacheStrategy.java
+++ b/blur-document-security/src/main/java/org/apache/blur/lucene/security/search/BitSetDocumentVisibilityFilterCacheStrategy.java
@@ -31,6 +31,7 @@ import org.apache.lucene.index.IndexReader.ReaderClosedListener;
 import org.apache.lucene.index.SegmentReader;
 import org.apache.lucene.search.DocIdSet;
 import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.OpenBitSet;
 
@@ -62,12 +63,16 @@ public class BitSetDocumentVisibilityFilterCacheStrategy extends DocumentVisibil
 
   @Override
   public Builder createBuilder(String fieldName, BytesRef term, final AtomicReader reader) {
-    final OpenBitSet bitSet = new OpenBitSet(reader.maxDoc());
+    int maxDoc = reader.maxDoc();
     final Key key = new Key(fieldName, term, reader.getCoreCacheKey());
     LOG.debug("Creating new bitset for key [" + key + "] on index [" + reader + "]");
     return new Builder() {
+
+      private OpenBitSet bitSet = new OpenBitSet(maxDoc);
+
       @Override
       public void or(DocIdSetIterator it) throws IOException {
+        LOG.debug("Building bitset for key [" + key + "]");
         int doc;
         while ((doc = it.nextDoc()) != DocsEnum.NO_MORE_DOCS) {
           bitSet.set(doc);
@@ -76,7 +81,6 @@ public class BitSetDocumentVisibilityFilterCacheStrategy extends DocumentVisibil
 
       @Override
       public DocIdSet getDocIdSet() throws IOException {
-        LOG.debug("Building bitset for key [" + key + "]");
         SegmentReader segmentReader = getSegmentReader(reader);
         segmentReader.addReaderClosedListener(new ReaderClosedListener() {
           @Override
@@ -88,12 +92,172 @@ public class BitSetDocumentVisibilityFilterCacheStrategy extends DocumentVisibil
             }
           }
         });
-        _cache.put(key, bitSet);
-        return bitSet;
+        long cardinality = bitSet.cardinality();
+        DocIdSet cacheDocIdSet;
+        if (isFullySet(maxDoc, bitSet, cardinality)) {
+          cacheDocIdSet = getFullySetDocIdSet(maxDoc);
+        } else if (isFullyEmpty(bitSet, cardinality)) {
+          cacheDocIdSet = getFullyEmptyDocIdSet(maxDoc);
+        } else {
+          cacheDocIdSet = bitSet;
+        }
+        _cache.put(key, cacheDocIdSet);
+        return cacheDocIdSet;
+      }
+    };
+  }
+
+  public static DocIdSet getFullyEmptyDocIdSet(int maxDoc) {
+    Bits bits = getFullyEmptyBits(maxDoc);
+    return new DocIdSet() {
+      @Override
+      public DocIdSetIterator iterator() throws IOException {
+        return getFullyEmptyDocIdSetIterator(maxDoc);
+      }
+
+      @Override
+      public Bits bits() throws IOException {
+        return bits;
+      }
+
+      @Override
+      public boolean isCacheable() {
+        return true;
+      }
+    };
+  }
+
+  public static DocIdSetIterator getFullyEmptyDocIdSetIterator(int maxDoc) {
+    return new DocIdSetIterator() {
+
+      private int _docId = -1;
+
+      @Override
+      public int docID() {
+        return _docId;
+      }
+
+      @Override
+      public int nextDoc() throws IOException {
+        return _docId = DocIdSetIterator.NO_MORE_DOCS;
+      }
+
+      @Override
+      public int advance(int target) throws IOException {
+        return _docId = DocIdSetIterator.NO_MORE_DOCS;
+      }
+
+      @Override
+      public long cost() {
+        return 0;
+      }
+    };
+  }
+
+  public static Bits getFullyEmptyBits(int maxDoc) {
+    return new Bits() {
+      @Override
+      public boolean get(int index) {
+        return false;
+      }
+
+      @Override
+      public int length() {
+        return maxDoc;
+      }
+    };
+  }
+
+  public static DocIdSet getFullySetDocIdSet(int maxDoc) {
+    Bits bits = getFullySetBits(maxDoc);
+    return new DocIdSet() {
+      @Override
+      public DocIdSetIterator iterator() throws IOException {
+        return getFullySetDocIdSetIterator(maxDoc);
+      }
+
+      @Override
+      public Bits bits() throws IOException {
+        return bits;
+      }
+
+      @Override
+      public boolean isCacheable() {
+        return true;
+      }
+    };
+  }
+
+  public static DocIdSetIterator getFullySetDocIdSetIterator(int maxDoc) {
+    return new DocIdSetIterator() {
+
+      private int _docId = -1;
+
+      @Override
+      public int advance(int target) throws IOException {
+        if (_docId == DocIdSetIterator.NO_MORE_DOCS) {
+          return DocIdSetIterator.NO_MORE_DOCS;
+        }
+        _docId = target;
+        if (_docId >= maxDoc) {
+          return _docId = DocIdSetIterator.NO_MORE_DOCS;
+        }
+        return _docId;
+      }
+
+      @Override
+      public int nextDoc() throws IOException {
+        if (_docId == DocIdSetIterator.NO_MORE_DOCS) {
+          return DocIdSetIterator.NO_MORE_DOCS;
+        }
+        _docId++;
+        if (_docId >= maxDoc) {
+          return _docId = DocIdSetIterator.NO_MORE_DOCS;
+        }
+        return _docId;
+      }
+
+      @Override
+      public int docID() {
+        return _docId;
+      }
+
+      @Override
+      public long cost() {
+        return 0l;
       }
+
     };
   }
 
+  public static Bits getFullySetBits(int maxDoc) {
+    return new Bits() {
+      @Override
+      public boolean get(int index) {
+        return true;
+      }
+
+      @Override
+      public int length() {
+        return maxDoc;
+      }
+    };
+  }
+
+  public static boolean isFullyEmpty(OpenBitSet bitSet, long cardinality) {
+    if (cardinality == 0) {
+      return true;
+    }
+    return false;
+  }
+
+  public static boolean isFullySet(int maxDoc, OpenBitSet bitSet, long cardinality) {
+    if (cardinality >= maxDoc) {
+      return true;
+    }
+    return false;
+  }
+
   public static SegmentReader getSegmentReader(IndexReader indexReader) throws IOException {
     if (indexReader instanceof SegmentReader) {
       return (SegmentReader) indexReader;


Mime
View raw message