incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/2] git commit: Changing the queue behavior. The queue will longer commit to the writer upon every iteration of the queue. Instead it will check to see if any other writing threads require the writer, and if so the queue will commit and release. How
Date Fri, 14 Mar 2014 01:06:02 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/apache-blur-0.2 61480125d -> b32b1854b


Changing the queue behavior.  The queue will longer commit to the writer upon every iteration of the queue.  Instead it will check to see if any other writing threads require the writer, and if so the queue will commit and release.  However there is a configurable max time that the queue will wait before committing.  Default is 5 seconds.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/cc599ec2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/cc599ec2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/cc599ec2

Branch: refs/heads/apache-blur-0.2
Commit: cc599ec2cafa6523d5e444eb552c790d58770dc2
Parents: 6148012
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Mar 13 21:00:05 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Mar 13 21:00:05 2014 -0400

----------------------------------------------------------------------
 .../blur/manager/writer/BaseQueueReader.java    | 124 -------------
 .../manager/writer/BlurIndexSimpleWriter.java   |  58 +++---
 .../apache/blur/manager/writer/IndexAction.java |  13 ++
 .../blur/manager/writer/MutationQueue.java      |   2 +-
 .../manager/writer/MutationQueueProcessor.java  | 180 +++++++++++++++++++
 .../blur/manager/writer/ShardQueueReader.java   |  63 -------
 .../blur/manager/writer/TableQueueReader.java   |  61 -------
 .../org/apache/blur/server/TableContext.java    |  37 ----
 .../writer/BlurIndexSimpleWriterTest.java       |  21 ++-
 .../blur/thrift/util/EnqueueDataAsync.java      | 146 +++++++++++++++
 .../org/apache/blur/utils/BlurConstants.java    |  33 ++--
 .../src/main/resources/blur-default.properties  |   9 +
 docs/cluster-setup.html                         |   4 +-
 13 files changed, 410 insertions(+), 341 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc599ec2/blur-core/src/main/java/org/apache/blur/manager/writer/BaseQueueReader.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BaseQueueReader.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BaseQueueReader.java
deleted file mode 100644
index 590ec73..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BaseQueueReader.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.manager.writer;
-
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_QUEUE_READER_BACKOFF;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_QUEUE_READER_MAX;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.blur.BlurConfiguration;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.RowMutation;
-
-public abstract class BaseQueueReader implements Closeable {
-
-  private static final Log LOG = LogFactory.getLog(BaseQueueReader.class);
-
-  protected final long _backOff;
-  protected final Thread _daemon;
-  protected final AtomicBoolean _running = new AtomicBoolean();
-  protected final int _max;
-  protected final String _context;
-  protected final BlurConfiguration _configuration;
-
-  public BaseQueueReader(BlurConfiguration configuration, String context) {
-    _running.set(true);
-    _configuration = configuration;
-    _backOff = configuration.getLong(BLUR_SHARD_INDEX_QUEUE_READER_BACKOFF, 500);
-    _max = configuration.getInt(BLUR_SHARD_INDEX_QUEUE_READER_MAX, 500);
-    _context = context;
-    _daemon = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        List<RowMutation> mutations = new ArrayList<RowMutation>();
-        while (_running.get()) {
-          take(mutations, _max);
-          if (mutations.isEmpty()) {
-            try {
-              Thread.sleep(_backOff);
-            } catch (InterruptedException e) {
-              return;
-            }
-          } else {
-            try {
-              mutations = MutatableAction.reduceMutates(mutations);
-            } catch (BlurException e) {
-              LOG.error("Unknown error while trying to reduce the number of mutations and prevent data loss.", e);
-              failure();
-              mutations.clear();
-              return;
-            }
-            doMutate(mutations);
-          }
-        }
-      }
-    });
-
-    _daemon.setName("Queue Loader for [" + _context + "]");
-    _daemon.setDaemon(true);
-  }
-
-  protected abstract void doMutate(List<RowMutation> mutations);
-
-  public void listen() {
-    _daemon.start();
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (_running.get()) {
-      _running.set(false);
-      _daemon.interrupt();
-    }
-  }
-
-  /**
-   * Takes up to the max number of {@link RowMutation}s off the queue and
-   * returns. The implementation can choose to block until new items are
-   * available. However if the method returns without adding any items to the
-   * mutations list, the loading thread will back off a configurable amount of
-   * time. <br/>
-   * <br/>
-   * Configuration setting: &quot;blur.shard.index.queue.reader.backoff&quot;
-   * 
-   * @param mutations
-   * @param max
-   */
-  public abstract void take(List<RowMutation> mutations, int max);
-
-  /**
-   * This method will be called after each successful load of data from the
-   * queue. This will allow the queue to be notified that the information has
-   * been successfully loaded.
-   */
-  public abstract void success();
-
-  /**
-   * This method will be called after each failed load of data from the queue.
-   * This will allow the queue to be notified that the information has been WAS
-   * NOT successfully loaded.
-   */
-  public abstract void failure();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc599ec2/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index be083c0..dee38c0 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -21,13 +21,17 @@ import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.blur.analysis.FieldManager;
 import org.apache.blur.index.ExitableReader;
@@ -69,9 +73,9 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   private final AtomicReference<BlurIndexWriter> _writer = new AtomicReference<BlurIndexWriter>();
   private final boolean _makeReaderExitable = true;
   private IndexImporter _indexImporter;
-  private ShardQueueReader _queueReader;
-  private final ReadWriteLock _lock = new ReentrantReadWriteLock();
-  private final Lock _writeLock = _lock.writeLock();
+  // private ShardQueueReader _queueReader;
+  private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
+  private final WriteLock _writeLock = _lock.writeLock();
   private final ReadWriteLock _indexRefreshLock = new ReentrantReadWriteLock();
   private final Lock _indexRefreshWriteLock = _indexRefreshLock.writeLock();
   private final Lock _indexRefreshReadLock = _indexRefreshLock.readLock();
@@ -80,7 +84,9 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   private final IndexDeletionPolicyReader _policy;
   private final SnapshotIndexDeletionPolicy _snapshotIndexDeletionPolicy;
   private final String _context;
-  private final MutationQueue _mutationQueue;
+  private final AtomicInteger _writesWaiting = new AtomicInteger();
+  private final BlockingQueue<RowMutation> _queue = new ArrayBlockingQueue<RowMutation>(100);
+  private final MutationQueueProcessor _mutationQueueProcessor;
 
   public BlurIndexSimpleWriter(ShardContext shardContext, Directory directory, SharedMergeScheduler mergeScheduler,
       final ExecutorService searchExecutor, BlurIndexCloser indexCloser, BlurIndexWarmup indexWarmup)
@@ -105,7 +111,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     _policy = new IndexDeletionPolicyReader(_snapshotIndexDeletionPolicy);
     _conf.setIndexDeletionPolicy(_policy);
 
-    _mutationQueue = new MutationQueue();
+    _mutationQueueProcessor = new MutationQueueProcessor(_queue, this, _shardContext, _writesWaiting);
 
     if (!DirectoryReader.indexExists(directory)) {
       new BlurIndexWriter(directory, _conf).close();
@@ -122,7 +128,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   }
 
   private synchronized void openWriter() {
-    IOUtils.cleanup(LOG, _queueReader, _indexImporter);
+    IOUtils.cleanup(LOG, _indexImporter);
     BlurIndexWriter writer = _writer.get();
     if (writer != null) {
       try {
@@ -155,30 +161,6 @@ public class BlurIndexSimpleWriter extends BlurIndex {
             _writer.notify();
           }
           _indexImporter = new IndexImporter(BlurIndexSimpleWriter.this, _shardContext, TimeUnit.SECONDS, 10);
-          _queueReader = new ShardQueueReader(BlurIndexSimpleWriter.this, _shardContext) {
-
-            private final List<RowMutation> _list = new ArrayList<RowMutation>();
-
-            @Override
-            public void take(List<RowMutation> mutations, int max) {
-              _list.clear();
-              _mutationQueue.take(mutations, max);
-              _list.addAll(mutations);
-              LOG.debug("Number of messages taken [{0}]", _list.size());
-            }
-
-            @Override
-            public void success() {
-
-            }
-
-            @Override
-            public void failure() {
-              // Do something with stored mutations
-            }
-
-          };
-          _queueReader.listen();
         } catch (IOException e) {
           LOG.error("Unknown error on index writer open.", e);
         }
@@ -243,7 +225,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   @Override
   public void close() throws IOException {
     _isClosed.set(true);
-    IOUtils.cleanup(LOG, _indexImporter, _queueReader, _writer.get(), _indexReader.get());
+    IOUtils.cleanup(LOG, _indexImporter, _mutationQueueProcessor, _writer.get(), _indexReader.get());
   }
 
   @Override
@@ -327,7 +309,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     DirectoryReader currentReader = _indexReader.get();
     DirectoryReader newReader = DirectoryReader.openIfChanged(currentReader);
     if (newReader == null) {
-      LOG.error("Reader should be new after commit for table [{0}] shard [{1}].", _tableContext.getTable(),
+      LOG.debug("Reader should be new after commit for table [{0}] shard [{1}].", _tableContext.getTable(),
           _shardContext.getShard());
     } else {
       DirectoryReader reader = wrap(newReader);
@@ -344,7 +326,10 @@ public class BlurIndexSimpleWriter extends BlurIndex {
 
   @Override
   public void process(IndexAction indexAction) throws IOException {
+    _writesWaiting.incrementAndGet();
     _writeLock.lock();
+    _writesWaiting.decrementAndGet();
+    indexAction.setWritesWaiting(_writesWaiting);
     waitUntilNotNull(_writer);
     BlurIndexWriter writer = _writer.get();
     IndexSearcherClosable indexSearcher = null;
@@ -374,11 +359,18 @@ public class BlurIndexSimpleWriter extends BlurIndex {
 
   @Override
   public void enqueue(List<RowMutation> mutations) throws IOException {
+    startQueueIfNeeded();
     try {
-      _mutationQueue.put(mutations);
+      for (RowMutation mutation : mutations) {
+        _queue.put(mutation);
+      }
     } catch (InterruptedException e) {
       throw new IOException(e);
     }
   }
 
+  private void startQueueIfNeeded() {
+    _mutationQueueProcessor.startIfNotRunning();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc599ec2/blur-core/src/main/java/org/apache/blur/manager/writer/IndexAction.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexAction.java b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexAction.java
index 4fee343..6d56c5a 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexAction.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexAction.java
@@ -17,12 +17,15 @@
 package org.apache.blur.manager.writer;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.lucene.index.IndexWriter;
 
 public abstract class IndexAction {
 
+  private AtomicInteger _writesWaiting;
+
   public abstract void doPreCommit(IndexSearcherClosable indexSearcher, IndexWriter writer) throws IOException;
 
   public abstract void doPostCommit(IndexWriter writer) throws IOException;
@@ -33,4 +36,14 @@ public abstract class IndexAction {
 
   public abstract void performMutate(IndexSearcherClosable searcher, IndexWriter writer) throws IOException;
 
+  public void setWritesWaiting(AtomicInteger writesWaiting) {
+    _writesWaiting = writesWaiting;
+  }
+
+  public boolean isWritersWaiting() {
+    if (_writesWaiting.get() > 0) {
+      return true;
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc599ec2/blur-core/src/main/java/org/apache/blur/manager/writer/MutationQueue.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/MutationQueue.java b/blur-core/src/main/java/org/apache/blur/manager/writer/MutationQueue.java
index 4465a1e..9957f0d 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/MutationQueue.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/MutationQueue.java
@@ -24,7 +24,7 @@ import org.apache.blur.thrift.generated.RowMutation;
 
 public class MutationQueue {
 
-  private BlockingQueue<RowMutation> _queue = new ArrayBlockingQueue<RowMutation>(1000);
+  private BlockingQueue<RowMutation> _queue = new ArrayBlockingQueue<RowMutation>(100);
 
   public void put(List<RowMutation> mutations) throws InterruptedException {
     for (RowMutation mutation : mutations) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc599ec2/blur-core/src/main/java/org/apache/blur/manager/writer/MutationQueueProcessor.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/MutationQueueProcessor.java b/blur-core/src/main/java/org/apache/blur/manager/writer/MutationQueueProcessor.java
new file mode 100644
index 0000000..4b1b701
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/MutationQueueProcessor.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.writer;
+
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_QUEUE_MAX_PAUSE_TIME_WHEN_EMPTY;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_QUEUE_MAX_QUEUE_BATCH_SIZE;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_QUEUE_MAX_WRITER_LOCK_TIME;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.server.IndexSearcherClosable;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.lucene.index.IndexWriter;
+
+public class MutationQueueProcessor implements Runnable, Closeable {
+
+  private final Log LOG = LogFactory.getLog(MutationQueueProcessor.class);
+
+  private final BlockingQueue<RowMutation> _queue;
+  private final BlurIndex _blurIndex;
+  private final int _maxQueueBatch;
+  private final long _maxProcessingTime;
+  private final ShardContext _context;
+  private final AtomicBoolean _running = new AtomicBoolean(false);
+  private final AtomicInteger _writesWaiting;
+  private final long _timeInMsThatQueueWritesPauseWhenEmpty;
+  private Thread _daemonThread;
+
+  public MutationQueueProcessor(BlockingQueue<RowMutation> queue, BlurIndex blurIndex, ShardContext context,
+      AtomicInteger writesWaiting) {
+    _queue = queue;
+    _blurIndex = blurIndex;
+    _context = context;
+    TableContext tableContext = _context.getTableContext();
+    BlurConfiguration blurConfiguration = tableContext.getBlurConfiguration();
+
+    _maxQueueBatch = blurConfiguration.getInt(BLUR_SHARD_QUEUE_MAX_QUEUE_BATCH_SIZE, 100);
+    _maxProcessingTime = TimeUnit.MILLISECONDS.toNanos(blurConfiguration.getInt(BLUR_SHARD_QUEUE_MAX_WRITER_LOCK_TIME,
+        5000));
+    _timeInMsThatQueueWritesPauseWhenEmpty = blurConfiguration
+        .getLong(BLUR_SHARD_QUEUE_MAX_PAUSE_TIME_WHEN_EMPTY, 1000);
+    _writesWaiting = writesWaiting;
+  }
+
+  public void startIfNotRunning() {
+    synchronized (_running) {
+      if (!_running.get()) {
+        _running.set(true);
+        _daemonThread = new Thread(this);
+        _daemonThread.setDaemon(true);
+        _daemonThread.setName("Queue Thread [" + _context.getTableContext().getTable() + "/" + _context.getShard()
+            + "]");
+        _daemonThread.start();
+        LOG.info("Thread [{0}] starting.", _daemonThread.getName());
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    synchronized (_running) {
+      if (_running.get()) {
+        LOG.info("Thread [{0}] stopping.", _daemonThread.getName());
+        _running.set(false);
+        _daemonThread.interrupt();
+        _daemonThread = null;
+      }
+    }
+  }
+
+  @Override
+  public void run() {
+    while (_running.get()) {
+      try {
+        MutationQueueProcessorIndexAction indexAction = new MutationQueueProcessorIndexAction();
+        _blurIndex.process(indexAction);
+        if (!indexAction.hadMutationsToIndex()) {
+          try {
+            Thread.sleep(_timeInMsThatQueueWritesPauseWhenEmpty);
+          } catch (InterruptedException e) {
+            return;
+          }
+        }
+      } catch (IOException e) {
+        LOG.error("Unknown error during processing of queue mutations.", e);
+      }
+    }
+  }
+
+  class MutationQueueProcessorIndexAction extends IndexAction {
+
+    private final long _start = System.nanoTime();
+    private boolean _didMutates = false;
+
+    private boolean shouldContinueProcessing() {
+      if (_start + _maxProcessingTime < System.nanoTime()) {
+        return false;
+      }
+      if (_writesWaiting.get() > 0) {
+        return false;
+      }
+      return true;
+    }
+
+    public boolean hadMutationsToIndex() {
+      return _didMutates;
+    }
+
+    @Override
+    public void performMutate(IndexSearcherClosable searcher, IndexWriter writer) throws IOException {
+      List<RowMutation> lst = new ArrayList<RowMutation>();
+      while (shouldContinueProcessing()) {
+        if (_queue.drainTo(lst, _maxQueueBatch) > 0) {
+          try {
+            List<RowMutation> reduceMutates = MutatableAction.reduceMutates(lst);
+            MutatableAction mutatableAction = new MutatableAction(_context);
+            mutatableAction.mutate(reduceMutates);
+            LOG.debug("Mutating [{0}]", reduceMutates.size());
+            mutatableAction.performMutate(searcher, writer);
+            _didMutates = true;
+          } catch (BlurException e) {
+            LOG.error("Unknown error during reduce of mutations.", e);
+          }
+        } else {
+          _didMutates = false;
+        }
+        lst.clear();
+      }
+    }
+
+    @Override
+    public void doPreCommit(IndexSearcherClosable indexSearcher, IndexWriter writer) throws IOException {
+
+    }
+
+    @Override
+    public void doPostCommit(IndexWriter writer) throws IOException {
+
+    }
+
+    @Override
+    public void doPreRollback(IndexWriter writer) throws IOException {
+
+    }
+
+    @Override
+    public void doPostRollback(IndexWriter writer) throws IOException {
+
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc599ec2/blur-core/src/main/java/org/apache/blur/manager/writer/ShardQueueReader.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/ShardQueueReader.java b/blur-core/src/main/java/org/apache/blur/manager/writer/ShardQueueReader.java
deleted file mode 100644
index 34bcf2e..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/ShardQueueReader.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.manager.writer;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.server.ShardContext;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.thrift.generated.RowMutation;
-
-public abstract class ShardQueueReader extends BaseQueueReader implements Closeable {
-
-  private static final Log LOG = LogFactory.getLog(ShardQueueReader.class);
-
-  private final BlurIndex _index;
-  private final ShardContext _shardContext;
-  private final TableContext _tableContext;
-
-  public ShardQueueReader(BlurIndex index, ShardContext shardContext) {
-    super(shardContext.getTableContext().getBlurConfiguration(), getContext(shardContext));
-    _index = index;
-    _shardContext = shardContext;
-    _tableContext = shardContext.getTableContext();
-  }
-
-  protected void doMutate(List<RowMutation> mutations) {
-    MutatableAction mutatableAction = new MutatableAction(_shardContext);
-    mutatableAction.mutate(mutations);
-    try {
-      _index.process(mutatableAction);
-      success();
-    } catch (IOException e) {
-      failure();
-      LOG.error("Unknown error during loading of rowmutations from queue [{0}] into table [{1}] and shard [{2}].",
-          this.toString(), _tableContext.getTable(), _shardContext.getShard());
-    } finally {
-      mutations.clear();
-    }
-  }
-
-  private static String getContext(ShardContext shardContext) {
-    return shardContext.getTableContext() + "/" + shardContext.getShard();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc599ec2/blur-core/src/main/java/org/apache/blur/manager/writer/TableQueueReader.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/TableQueueReader.java b/blur-core/src/main/java/org/apache/blur/manager/writer/TableQueueReader.java
deleted file mode 100644
index 2dc8101..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/TableQueueReader.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.manager.writer;
-
-import java.io.Closeable;
-import java.util.List;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.thirdparty.thrift_0_9_0.TException;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.RowMutation;
-
-public abstract class TableQueueReader extends BaseQueueReader implements Closeable {
-
-  private static final Log LOG = LogFactory.getLog(TableQueueReader.class);
-
-  protected final TableContext _tableContext;
-  protected final Iface _client;
-
-  public TableQueueReader(Iface client, TableContext tableContext) {
-    super(tableContext.getBlurConfiguration(), tableContext.getTable());
-    _client = client;
-    _tableContext = tableContext;
-  }
-
-  @Override
-  protected void doMutate(List<RowMutation> mutations) {
-    try {
-      _client.enqueueMutateBatch(mutations);
-      success();
-    } catch (BlurException e) {
-      failure();
-      LOG.error("Unknown error during loading of rowmutations from queue [{0}] into table [{1}].", this.toString(),
-          _tableContext.getTable());
-    } catch (TException e) {
-      failure();
-      LOG.error("Unknown error during loading of rowmutations from queue [{0}] into table [{1}].", this.toString(),
-          _tableContext.getTable());
-    } finally {
-      mutations.clear();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc599ec2/blur-core/src/main/java/org/apache/blur/server/TableContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/TableContext.java b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
index ea94823..788d34f 100644
--- a/blur-core/src/main/java/org/apache/blur/server/TableContext.java
+++ b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
@@ -19,7 +19,6 @@ package org.apache.blur.server;
 import static org.apache.blur.utils.BlurConstants.BLUR_FIELDTYPE;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLURINDEX_CLASS;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_DELETION_POLICY_MAXAGE;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_QUEUE_READER_CLASS;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_SIMILARITY;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_READ_INTERCEPTOR;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_TIME_BETWEEN_COMMITS;
@@ -50,7 +49,6 @@ import org.apache.blur.manager.indexserver.BlurIndexWarmup;
 import org.apache.blur.manager.writer.BlurIndex;
 import org.apache.blur.manager.writer.BlurIndexCloser;
 import org.apache.blur.manager.writer.BlurIndexSimpleWriter;
-import org.apache.blur.manager.writer.ShardQueueReader;
 //import org.apache.blur.manager.writer.BlurNRTIndex;
 import org.apache.blur.manager.writer.SharedMergeScheduler;
 import org.apache.blur.thrift.generated.ScoreType;
@@ -366,39 +364,4 @@ public class TableContext {
     return _readInterceptor;
   }
 
-  @SuppressWarnings("unchecked")
-  public ShardQueueReader getQueueReader(BlurIndex blurIndex, ShardContext shardContext) throws IOException {
-    String className = _blurConfiguration.get(BLUR_SHARD_INDEX_QUEUE_READER_CLASS);
-    if (className == null || className.trim().isEmpty()) {
-      return null;
-    }
-    Class<? extends ShardQueueReader> clazz;
-    try {
-      clazz = (Class<? extends ShardQueueReader>) Class.forName(className);
-    } catch (ClassNotFoundException e) {
-      throw new IOException(e);
-    }
-    try {
-      Constructor<? extends ShardQueueReader> constructor = clazz.getConstructor(new Class[] { BlurIndex.class,
-          ShardContext.class });
-      ShardQueueReader reader = constructor.newInstance(blurIndex, shardContext);
-
-      reader.listen();
-
-      return reader;
-    } catch (NoSuchMethodException e) {
-      throw new IOException(e);
-    } catch (SecurityException e) {
-      throw new IOException(e);
-    } catch (InstantiationException e) {
-      throw new IOException(e);
-    } catch (IllegalAccessException e) {
-      throw new IOException(e);
-    } catch (IllegalArgumentException e) {
-      throw new IOException(e);
-    } catch (InvocationTargetException e) {
-      throw new IOException(e);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc599ec2/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
index b31e30d..eefba67 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.fail;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
@@ -46,6 +48,7 @@ import org.apache.blur.trace.BaseTraceStorage;
 import org.apache.blur.trace.Trace;
 import org.apache.blur.trace.TraceCollector;
 import org.apache.blur.trace.TraceStorage;
+import org.apache.blur.utils.BlurConstants;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -101,6 +104,12 @@ public class BlurIndexSimpleWriterTest {
       uuid = UUID.randomUUID().toString();
     }
     tableDescriptor.setTableUri(new File(_base, "table-store-" + uuid).toURI().toString());
+    Map<String, String> tableProperties = new HashMap<String, String>();
+    tableProperties.put(BlurConstants.BLUR_SHARD_QUEUE_MAX_PAUSE_TIME_WHEN_EMPTY, "500");
+    tableProperties.put(BlurConstants.BLUR_SHARD_QUEUE_MAX_QUEUE_BATCH_SIZE, "500");
+    tableProperties.put(BlurConstants.BLUR_SHARD_QUEUE_MAX_WRITER_LOCK_TIME, "1000");
+
+    tableDescriptor.setTableProperties(tableProperties);
     TableContext tableContext = TableContext.create(tableDescriptor);
     File path = new File(_base, "index_" + uuid);
     path.mkdirs();
@@ -288,11 +297,15 @@ public class BlurIndexSimpleWriterTest {
   @Test
   public void testEnqueue() throws IOException, InterruptedException {
     setupWriter(_configuration);
+    runQueueTest(TOTAL_ROWS_FOR_TESTS, TOTAL_ROWS_FOR_TESTS);
+  }
+
+  private void runQueueTest(final int mutatesToAdd, int numberOfValidDocs) throws IOException, InterruptedException {
     final String table = _writer.getShardContext().getTableContext().getTable();
     Thread thread = new Thread(new Runnable() {
       @Override
       public void run() {
-        for (int i = 0; i < TOTAL_ROWS_FOR_TESTS; i++) {
+        for (int i = 0; i < mutatesToAdd; i++) {
           try {
             _writer.enqueue(Arrays.asList(genRowMutation(table)));
           } catch (IOException e) {
@@ -302,14 +315,16 @@ public class BlurIndexSimpleWriterTest {
       }
     });
     thread.start();
+    long start = System.currentTimeMillis();
     while (true) {
-      if (_writer.getIndexSearcher().getIndexReader().numDocs() == TOTAL_ROWS_FOR_TESTS) {
+      if (_writer.getIndexSearcher().getIndexReader().numDocs() == numberOfValidDocs) {
+        long end = System.currentTimeMillis();
+        System.out.println("[" + TOTAL_ROWS_FOR_TESTS + "] Mutations in [" + (end - start) + " ms]");
         break;
       }
       Thread.sleep(100);
     }
     thread.join();
-    // YAY!!! it worked!
   }
 
   private RowMutation genRowMutation(String table) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc599ec2/blur-thrift/src/main/java/org/apache/blur/thrift/util/EnqueueDataAsync.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/util/EnqueueDataAsync.java b/blur-thrift/src/main/java/org/apache/blur/thrift/util/EnqueueDataAsync.java
new file mode 100644
index 0000000..b405422
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/util/EnqueueDataAsync.java
@@ -0,0 +1,146 @@
+package org.apache.blur.thrift.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thirdparty.thrift_0_9_0.async.AsyncMethodCallback;
+import org.apache.blur.thrift.AsyncClientPool;
+import org.apache.blur.thrift.generated.Blur;
+import org.apache.blur.thrift.generated.Blur.AsyncClient.enqueueMutate_call;
+import org.apache.blur.thrift.generated.Blur.AsyncIface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.RecordMutation;
+import org.apache.blur.thrift.generated.RecordMutationType;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.RowMutationType;
+
+public class EnqueueDataAsync {
+
+  private static Random random = new Random();
+  private static List<String> words = new ArrayList<String>();
+
+  public static void main(String[] args) throws BlurException, TException, IOException {
+    loadWords();
+    final int numberOfColumns = 3;
+    int numberRows = 100000;
+    final int numberRecordsPerRow = 2;
+    final int numberOfFamilies = 3;
+    final int numberOfWords = 30;
+    int count = 0;
+    int max = 1000;
+    long start = System.currentTimeMillis();
+    final String table = "test";
+    AsyncClientPool pool = new AsyncClientPool();
+    AsyncIface client = pool.getClient(Blur.AsyncIface.class, args[0]);
+    for (int i = 0; i < numberRows; i++) {
+      if (count >= max) {
+        double seconds = (System.currentTimeMillis() - start) / 1000.0;
+        double rate = i / seconds;
+        System.out.println("Rows indexed [" + i + "] at [" + rate + "/s]");
+        count = 0;
+      }
+      client.enqueueMutate(
+          getRowMutation(table, numberRecordsPerRow, numberOfColumns, numberOfFamilies, numberOfWords),
+          new AsyncMethodCallback<Blur.AsyncClient.enqueueMutate_call>() {
+            @Override
+            public void onError(Exception exception) {
+
+            }
+
+            @Override
+            public void onComplete(enqueueMutate_call response) {
+
+            }
+          });
+      count++;
+    }
+  }
+
+  private static RowMutation getRowMutation(String table, int numberRecordsPerRow, int numberOfColumns,
+      int numberOfFamilies, int numberOfWords) {
+    RowMutation mutation = new RowMutation();
+    mutation.setTable(table);
+    String rowId = getRowId();
+    mutation.setRowId(rowId);
+    mutation.setRowMutationType(RowMutationType.REPLACE_ROW);
+    for (int j = 0; j < numberRecordsPerRow; j++) {
+      mutation.addToRecordMutations(getRecordMutation(numberOfColumns, numberOfFamilies, numberOfWords));
+    }
+    return mutation;
+  }
+
+  private static void loadWords() throws IOException {
+    InputStream inputStream = EnqueueDataAsync.class.getResourceAsStream("words.txt");
+    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
+    String word;
+    while ((word = reader.readLine()) != null) {
+      words.add(word.trim());
+    }
+    reader.close();
+  }
+
+  protected static RecordMutation getRecordMutation(int numberOfColumns, int numberOfFamilies, int numberOfWords) {
+    RecordMutation recordMutation = new RecordMutation();
+    recordMutation.setRecord(getRecord(numberOfColumns, numberOfFamilies, numberOfWords));
+    recordMutation.setRecordMutationType(RecordMutationType.REPLACE_ENTIRE_RECORD);
+    return recordMutation;
+  }
+
+  private static Record getRecord(int numberOfColumns, int numberOfFamilies, int numberOfWords) {
+    Record record = new Record();
+    record.setRecordId(getRowId());
+    record.setFamily(getFamily(numberOfFamilies));
+    for (int i = 0; i < numberOfColumns; i++) {
+      record.addToColumns(new Column("col" + i, getWords(numberOfWords)));
+    }
+    return record;
+  }
+
+  private static String getWords(int numberOfWords) {
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < numberOfWords; i++) {
+      if (i != 0) {
+        builder.append(' ');
+      }
+      builder.append(getWord());
+    }
+    return builder.toString();
+  }
+
+  private static String getFamily(int numberOfFamilies) {
+    return "fam" + random.nextInt(numberOfFamilies);
+  }
+
+  private static String getWord() {
+    return words.get(random.nextInt(words.size()));
+  }
+
+  protected static String getRowId() {
+    return Long.toString(Math.abs(random.nextLong())) + "-" + Long.toString(Math.abs(random.nextLong()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc599ec2/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
index 9d7b3e7..0edb2ce 100644
--- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -36,15 +36,15 @@ public class BlurConstants {
   public static final String DEFAULT_FAMILY = "_default_";
   public static final String SUPER = "super";
   public static final String SEP = ".";
-  
-  public static final String BLUR_SHARD_INDEX_QUEUE_READER_CLASS = "blur.shard.index.queue.reader.class";
-  public static final String BLUR_SHARD_INDEX_QUEUE_READER_BACKOFF = "blur.shard.index.queue.reader.backoff";
-  public static final String BLUR_SHARD_INDEX_QUEUE_READER_MAX = "blur.shard.index.queue.reader.max";
-  
-  public static final String BLUR_TABLE_INDEX_QUEUE_READER_CLASS = "blur.table.index.queue.reader.class";
-  public static final String BLUR_TABLE_INDEX_QUEUE_READER_BACKOFF = "blur.table.index.queue.reader.backoff";
-  public static final String BLUR_TABLE_INDEX_QUEUE_READER_MAX = "blur.table.index.queue.reader.max";
-  
+
+  public static final String BLUR_SHARD_QUEUE_MAX_PAUSE_TIME_WHEN_EMPTY = "blur.shard.queue.max.pause.time.when.empty";
+  public static final String BLUR_SHARD_QUEUE_MAX_WRITER_LOCK_TIME = "blur.shard.queue.max.writer.lock.time";
+  public static final String BLUR_SHARD_QUEUE_MAX_QUEUE_BATCH_SIZE = "blur.shard.queue.max.queue.batch.size";
+
+//  public static final String BLUR_TABLE_INDEX_QUEUE_READER_CLASS = "blur.table.index.queue.reader.class";
+//  public static final String BLUR_TABLE_INDEX_QUEUE_READER_BACKOFF = "blur.table.index.queue.reader.backoff";
+//  public static final String BLUR_TABLE_INDEX_QUEUE_READER_MAX = "blur.table.index.queue.reader.max";
+
   public static final String FAST_DECOMPRESSION = "FAST_DECOMPRESSION";
   public static final String FAST = "FAST";
   public static final String HIGH_COMPRESSION = "HIGH_COMPRESSION";
@@ -97,7 +97,6 @@ public class BlurConstants {
   public static final String BLUR_SHARD_THRIFT_ACCEPT_QUEUE_SIZE_PER_THREAD = "blur.shard.thrift.accept.queue.size.per.thread";
   public static final String BLUR_SHARD_DISTRIBUTED_LAYOUT_FACTORY_CLASS = "blur.shard.distributed.layout.factory.class";
   public static final String BLUR_SHARD_WARMUP_DISABLED = "blur.shard.warmup.disabled";
-  
 
   public static final String BLUR_SHARD_BLOCK_CACHE_V2_READ_CACHE_EXT = "blur.shard.block.cache.v2.read.cache.ext";
   public static final String BLUR_SHARD_BLOCK_CACHE_V2_READ_NOCACHE_EXT = "blur.shard.block.cache.v2.read.nocache.ext";
@@ -112,7 +111,7 @@ public class BlurConstants {
   public static final String BLUR_SHARD_BLOCK_CACHE_V2_FILE_BUFFER_SIZE = "blur.shard.block.cache.v2.fileBufferSize";
   public static final String BLUR_SHARD_BLOCK_CACHE_V2_CACHE_BLOCK_SIZE = "blur.shard.block.cache.v2.cacheBlockSize";
   public static final String BLUR_SHARD_BLURINDEX_CLASS = "blur.shard.blurindex.class";
-  
+
   public static final String BLUR_FIELDTYPE = "blur.fieldtype.";
 
   public static final String BLUR_SHARD_TIME_BETWEEN_COMMITS = "blur.shard.time.between.commits";
@@ -121,7 +120,7 @@ public class BlurConstants {
   public static final String BLUR_CONTROLLER_SERVER_THRIFT_THREAD_COUNT = "blur.controller.server.thrift.thread.count";
   public static final String BLUR_CONTROLLER_SERVER_REMOTE_THREAD_COUNT = "blur.controller.server.remote.thread.count";
   public static final String BLUR_CONTROLLER_REMOTE_FETCH_COUNT = "blur.controller.remote.fetch.count";
-  
+
   public static final String BLUR_CONTROLLER_SHARD_CONNECTION_TIMEOUT = "blur.controller.shard.connection.timeout";
   public static final String BLUR_CONTROLLER_RETRY_MAX_MUTATE_RETRIES = "blur.controller.retry.max.mutate.retries";
   public static final String BLUR_CONTROLLER_RETRY_MAX_DEFAULT_RETRIES = "blur.controller.retry.max.default.retries";
@@ -138,12 +137,12 @@ public class BlurConstants {
   public static final String BLUR_CLIENTPOOL_CLIENT_CLOSE_THRESHOLD = "blur.clientpool.client.close.threshold";
   public static final String BLUR_CLIENTPOOL_CLIENT_CLEAN_FREQUENCY = "blur.clientpool.client.clean.frequency";
   public static final String BLUR_LUCENE_FST_BYTEARRAY_FACTORY = "blur.lucene.fst.bytearray.factory";
-  
+
   public static final String BLUR_THRIFT_MAX_FRAME_SIZE = "blur.thrift.max.frame.size";
-  
+
   public static final String BLUR_SHARD_FILTERED_SERVER_CLASS = "blur.shard.filtered.server.class";
   public static final String BLUR_CONTROLLER_FILTERED_SERVER_CLASS = "blur.controller.filtered.server.class";
-  
+
   public static final String BLUR_GUI_CONTROLLER_PORT = "blur.gui.controller.port";
   public static final String BLUR_GUI_SHARD_PORT = "blur.gui.shard.port";
 
@@ -154,9 +153,9 @@ public class BlurConstants {
   public static final long ZK_WAIT_TIME = TimeUnit.SECONDS.toMillis(5);
   public static final String DELETE_MARKER_VALUE = "delete";
   public static final String DELETE_MARKER = "_deletemarker_";
-  
+
   public static final String SHARED_MERGE_SCHEDULER = "sharedMergeScheduler";
-  
+
   public static final String BLUR_FILTER_ALIAS = "blur.filter.alias.";
 
   static {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc599ec2/blur-util/src/main/resources/blur-default.properties
----------------------------------------------------------------------
diff --git a/blur-util/src/main/resources/blur-default.properties b/blur-util/src/main/resources/blur-default.properties
index 10f8f77..0dd0f19 100644
--- a/blur-util/src/main/resources/blur-default.properties
+++ b/blur-util/src/main/resources/blur-default.properties
@@ -187,6 +187,15 @@ blur.shard.read.interceptor=
 # Defines the byte array factory class that blur will use to manage the FST trees in Lucene (extends org.apache.blur.lucene.fst.ByteArrayFactory).
 blur.lucene.fst.bytearray.factory=
 
+# The maximum amount of time to pause before checking the queue for RowMutations.
+blur.shard.queue.max.pause.time.when.empty=1000
+
+# The maximum amount of time that the queue can lock the writer before committing.  NOTE: Any other writer event will cause the queue to release it's lock on the writer.
+blur.shard.queue.max.writer.lock.time=5000
+
+# The maximum number of the RowMutations the writer can drain from the queue at a time. 
+blur.shard.queue.max.queue.batch.size=100
+
 
 ### Controller Server Configuration
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc599ec2/docs/cluster-setup.html
----------------------------------------------------------------------
diff --git a/docs/cluster-setup.html b/docs/cluster-setup.html
index 3b2929c..177786e 100644
--- a/docs/cluster-setup.html
+++ b/docs/cluster-setup.html
@@ -132,7 +132,7 @@ blur.cluster.default.table.uri=hdfs://namenode/blur/tables</code>
 			<h4>Default Properties</h4>
 			<table class="table-bordered table-striped table-condensed">
 			<tr><td>Property</td><td>Description</td></tr>
-<tr><td>blur.zookeeper.timeout (90000)</td><td>The zookeeper session timeout</td></tr><tr><td>blur.zookeeper.trace.path (/blur/traces)</td><td>The path in ZooKeeper where the distributed traces will be stored, if blank trace output will be written to the log or the HDFS store.</td></tr><tr><td>blur.hdfs.trace.path</td><td>The path in HDFS where the distributed traces will be stored, if blank trace output will be written to the log or the ZooKeeper store.</td></tr><tr><td>blur.query.max.results.fetch (1000)</td><td>The maximum number of results that can be fetched in a single request</td></tr><tr><td>blur.query.max.row.fetch (100)</td><td>The maximum number of rows that can be fetched in a single request</td></tr><tr><td>blur.query.max.record.fetch (1000)</td><td>The maximum number of records that can be fetched in a single request</td></tr><tr><td>blur.metrics.reporters</td><td>Setup metric reporter</td></tr><tr><td>blur.thrift.max.frame.size (16384000)</td><td>Thrift max frame size
 </td></tr>
+<tr><td>blur.zookeeper.timeout (90000)</td><td>The zookeeper session timeout</td></tr><tr><td>blur.hdfs.trace.path</td><td>The path in HDFS where the distributed traces will be stored, if blank trace output will be written to the log or the ZooKeeper store.</td></tr><tr><td>blur.query.max.results.fetch (1000)</td><td>The maximum number of results that can be fetched in a single request</td></tr><tr><td>blur.query.max.row.fetch (100)</td><td>The maximum number of rows that can be fetched in a single request</td></tr><tr><td>blur.query.max.record.fetch (1000)</td><td>The maximum number of records that can be fetched in a single request</td></tr><tr><td>blur.metrics.reporters</td><td>Setup metric reporter</td></tr><tr><td>blur.thrift.max.frame.size (16384000)</td><td>Thrift max frame size</td></tr>
 			</table>
             <h3 id="general-hadoop">Hadoop</h3>
 <p>
@@ -204,7 +204,7 @@ Swap can kill java perform, you may want to consider disabling swap.</div>
 			<h4>Default Properties</h4>
 			<table class="table-bordered table-striped table-condensed">
 			<tr><td>Property</td><td>Description</td></tr>
-<tr><td>blur.shard.hostname</td><td>The hostname for the shard, if blank the hostname is automatically detected</td></tr><tr><td>blur.shard.bind.address (0.0.0.0)</td><td>The binding address of the shard</td></tr><tr><td>blur.shard.bind.port (40020)</td><td>The default binding port of the shard server</td></tr><tr><td>blur.shard.data.fetch.thread.count (8)</td><td>The number of fetcher threads</td></tr><tr><td>blur.shard.server.thrift.thread.count (8)</td><td>The number of the thrift threads</td></tr><tr><td>blur.shard.thrift.selector.threads (2)</td><td>The number of threads used for selector processing inside the thrift server.</td></tr><tr><td>blur.shard.thrift.max.read.buffer.bytes (9223372036854775807)</td><td>The maximum number of bytes used for reading requests in the thrift server.</td></tr><tr><td>blur.shard.thrift.accept.queue.size.per.thread (4)</td><td>The size of the blocking queue per selector thread for passing accepted connections to the selector thread.</td></tr><tr
 ><td>blur.shard.opener.thread.count (8)</td><td>The number of threads that are used for opening indexes</td></tr><tr><td>blur.shard.cache.max.querycache.elements (128)</td><td>The number of cached queries</td></tr><tr><td>blur.shard.cache.max.timetolive (60000)</td><td>The time to live for the cache query</td></tr><tr><td>blur.shard.filter.cache.class (org.apache.blur.manager.DefaultBlurFilterCache)</td><td>Default implementation of the blur cache filter, which is a pass through filter that does nothing</td></tr><tr><td>blur.shard.index.warmup.class (org.apache.blur.manager.indexserver.DefaultBlurIndexWarmup)</td><td>Default Blur index warmup class that warms the fields provided in the table descriptor</td></tr><tr><td>blur.shard.index.warmup.throttle (30000000)</td><td>Throttles the warmup to 30MB/s across all the warmup threads</td></tr><tr><td>blur.shard.block.cache.version (v2)</td><td>By default the v2 version of the block cache is enabled</td></tr><tr><td>blur.shard.block.cach
 e.total.size</td><td>By default the total amount of memory block cache will use is -XX:MaxDirectMemorySize - 64 MiB</td></tr><tr><td>blur.shard.blockcache.direct.memory.allocation (true)</td><td>v1 version of block cache only. By default the block cache using off heap memory</td></tr><tr><td>blur.shard.blockcache.slab.count (-1)</td><td>v1 version of block cache only. The slabs in the blockcache are automatically configured by default (-1) otherwise 1 slab equals 128MB.  The auto config is detected through the MaxDirectoryMemorySize provided to the JVM</td></tr><tr><td>blur.shard.block.cache.v2.fileBufferSize (8192)</td><td>v2 version of block cache only. File buffer size, this is the buffer size used to read and write to data to HDFS.  For production this will likely be increased.</td></tr><tr><td>blur.shard.block.cache.v2.cacheBlockSize (8192)</td><td>v2 version of block cache only. The is the size of the blocks in the off heap cache, it is good practice to have this match 'blur.s
 hard.block.cache.v2.fileBufferSize'.  For production this will likely be increased.</td></tr><tr><td>blur.shard.block.cache.v2.cacheBlockSize.filter (33554432)</td><td>blur.shard.block.cache.v2.cacheBlockSize.<ext>=</td></tr><tr><td>blur.shard.block.cache.v2.store (OFF_HEAP)</td><td>v2 version of block cache only. This is used to control if the block are created on or off heap.  Values are OFF_HEAP | ON_HEAP</td></tr><tr><td>blur.shard.block.cache.v2.read.cache.ext</td><td>v2 version of block cache only. This specifies what file types should be cached during reads.  Comma delimited list.</td></tr><tr><td>blur.shard.block.cache.v2.read.nocache.ext (fdt)</td><td>v2 version of block cache only. This specifies what file types should NOT be cached during reads.  Comma delimited list.</td></tr><tr><td>blur.shard.block.cache.v2.read.default (true)</td><td>v2 version of block cache only. This specifies the default behavior if a file type is not specified in the cache or nocache lists during
  reads.  Values true | false</td></tr><tr><td>blur.shard.block.cache.v2.write.cache.ext</td><td>v2 version of block cache only. This specifies what file types should be cached during writes.  Comma delimited list.</td></tr><tr><td>blur.shard.block.cache.v2.write.nocache.ext (fdt)</td><td>v2 version of block cache only. This specifies what file types should NOT be cached during writes.  Comma delimited list.</td></tr><tr><td>blur.shard.block.cache.v2.write.default (true)</td><td>v2 version of block cache only. This specifies the default behavior if a file type is not specified in the cache or nocache lists during writes.  Values true | false</td></tr><tr><td>blur.shard.index.compressionmode (FAST)</td><td>Sets the compression of used in the storing of the fields. Valid entries are FAST FAST_DECOMPRESSION HIGH_COMPRESSION</td></tr><tr><td>blur.shard.index.chunksize (16384)</td><td>Sets the chunksize of the compression in the storing of the fields. Larger values may produce smaller fdt
  files at the small cost of fetch performance.</td></tr><tr><td>blur.shard.buffercache.8192 (67108864)</td><td>The amount of memory to be used by 8K byte buffers.  Note if you change the "blur.shard.block.cache.v2.cacheBlockSize" or "blur.shard.block.cache.v2.fileBufferSize" you should adjust the buffer sizes as well as the total memory allocated.  For example if you increased the "blur.shard.block.cache.v2.fileBufferSize" to 64K (65536) then this property should to "blur.shard.buffercache.65536".  You can also define as many of these properties as needed.</td></tr><tr><td>blur.shard.buffercache.1024 (8388608)</td><td>The amount of memory to be used by 1K byte buffers.  Note if you change the "blur.shard.block.cache.v2.cacheBlockSize" or "blur.shard.block.cache.v2.fileBufferSize" you should adjust the buffer sizes as well as the total memory allocated.</td></tr><tr><td>blur.shard.safemodedelay (5000)</td><td>The number of milliseconds to wait for the cluster to settle once changes h
 ave ceased</td></tr><tr><td>blur.shard.time.between.commits (30000)</td><td>The default time between index commits</td></tr><tr><td>blur.shard.time.between.refreshs (3000)</td><td>The default time between index refreshs</td></tr><tr><td>blur.shard.merge.thread.count (3)</td><td>The max number of threads used during index merges</td></tr><tr><td>blur.max.clause.count (1024)</td><td>The maximum number of clauses in a BooleanQuery</td></tr><tr><td>blur.indexmanager.search.thread.count (8)</td><td>The number of thread used for parallel searching in the index manager</td></tr><tr><td>blur.indexmanager.mutate.thread.count (8)</td><td>The number of thread used for parallel mutating in the index manager</td></tr><tr><td>blur.shard.internal.search.thread.count (8)</td><td>The number of threads used for parallel searching in the index searchers</td></tr><tr><td>blur.shard.warmup.thread.count (8)</td><td>Number of threads used for warming up the index</td></tr><tr><td>blur.shard.fetchcount (10
 0)</td><td>The fetch count per Lucene search, this fetches pointers to hits</td></tr><tr><td>blur.max.heap.per.row.fetch (10000000)</td><td>Heap limit on row fetch, once this limit has been reached the request will return</td></tr><tr><td>blur.max.records.per.row.fetch.request (1000)</td><td>The maximum number of records in a single row fetch</td></tr><tr><td>blur.gui.shard.port (40090)</td><td>The http status page port for the shard server</td></tr><tr><td>blur.shard.filtered.server.class</td><td>To intercept the calls made to the shard server and perform server side changes to the calls extend org.apache.blur.server.FilteredBlurServer.</td></tr><tr><td>blur.shard.blurindex.class</td><td>Defines the blur index class to be used to handle index requests.  This class has to extend org.apache.blur.manager.writer.BlurIndex.  This can be defined globally as well as per table.</td></tr><tr><td>blur.shard.read.interceptor</td><td>Defines the blur read interceptor class that can mask data f
 rom query results as well as data fetches.</td></tr>
+<tr><td>blur.shard.hostname</td><td>The hostname for the shard, if blank the hostname is automatically detected</td></tr><tr><td>blur.shard.bind.address (0.0.0.0)</td><td>The binding address of the shard</td></tr><tr><td>blur.shard.bind.port (40020)</td><td>The default binding port of the shard server</td></tr><tr><td>blur.shard.data.fetch.thread.count (8)</td><td>The number of fetcher threads</td></tr><tr><td>blur.shard.server.thrift.thread.count (8)</td><td>The number of the thrift threads</td></tr><tr><td>blur.shard.thrift.selector.threads (2)</td><td>The number of threads used for selector processing inside the thrift server.</td></tr><tr><td>blur.shard.thrift.max.read.buffer.bytes (9223372036854775807)</td><td>The maximum number of bytes used for reading requests in the thrift server.</td></tr><tr><td>blur.shard.thrift.accept.queue.size.per.thread (4)</td><td>The size of the blocking queue per selector thread for passing accepted connections to the selector thread.</td></tr><tr
 ><td>blur.shard.opener.thread.count (8)</td><td>The number of threads that are used for opening indexes</td></tr><tr><td>blur.shard.cache.max.querycache.elements (128)</td><td>The number of cached queries</td></tr><tr><td>blur.shard.cache.max.timetolive (60000)</td><td>The time to live for the cache query</td></tr><tr><td>blur.shard.filter.cache.class (org.apache.blur.manager.DefaultBlurFilterCache)</td><td>Default implementation of the blur cache filter, which is a pass through filter that does nothing</td></tr><tr><td>blur.shard.warmup.disabled (false)</td><td>Globally disable index warmup.</td></tr><tr><td>blur.shard.index.warmup.class (org.apache.blur.manager.indexserver.DefaultBlurIndexWarmup)</td><td>Default Blur index warmup class that warms the fields provided in the table descriptor</td></tr><tr><td>blur.shard.index.warmup.throttle (30000000)</td><td>Throttles the warmup to 30MB/s across all the warmup threads</td></tr><tr><td>blur.shard.block.cache.version (v2)</td><td>By 
 default the v2 version of the block cache is enabled</td></tr><tr><td>blur.shard.block.cache.total.size</td><td>By default the total amount of memory block cache will use is -XX:MaxDirectMemorySize - 64 MiB</td></tr><tr><td>blur.shard.blockcache.direct.memory.allocation (true)</td><td>v1 version of block cache only. By default the block cache using off heap memory</td></tr><tr><td>blur.shard.blockcache.slab.count (-1)</td><td>v1 version of block cache only. The slabs in the blockcache are automatically configured by default (-1) otherwise 1 slab equals 128MB.  The auto config is detected through the MaxDirectoryMemorySize provided to the JVM</td></tr><tr><td>blur.shard.block.cache.v2.fileBufferSize (8192)</td><td>v2 version of block cache only. File buffer size, this is the buffer size used to read and write to data to HDFS.  For production this will likely be increased.</td></tr><tr><td>blur.shard.block.cache.v2.cacheBlockSize (8192)</td><td>v2 version of block cache only. The is t
 he size of the blocks in the off heap cache, it is good practice to have this match 'blur.shard.block.cache.v2.fileBufferSize'.  For production this will likely be increased.</td></tr><tr><td>blur.shard.block.cache.v2.cacheBlockSize.filter (33554432)</td><td>blur.shard.block.cache.v2.cacheBlockSize.<ext>=</td></tr><tr><td>blur.shard.block.cache.v2.store (OFF_HEAP)</td><td>v2 version of block cache only. This is used to control if the block are created on or off heap.  Values are OFF_HEAP | ON_HEAP</td></tr><tr><td>blur.shard.block.cache.v2.read.cache.ext</td><td>v2 version of block cache only. This specifies what file types should be cached during reads.  Comma delimited list.</td></tr><tr><td>blur.shard.block.cache.v2.read.nocache.ext (fdt)</td><td>v2 version of block cache only. This specifies what file types should NOT be cached during reads.  Comma delimited list.</td></tr><tr><td>blur.shard.block.cache.v2.read.default (true)</td><td>v2 version of block cache only. This specifie
 s the default behavior if a file type is not specified in the cache or nocache lists during reads.  Values true | false</td></tr><tr><td>blur.shard.block.cache.v2.write.cache.ext</td><td>v2 version of block cache only. This specifies what file types should be cached during writes.  Comma delimited list.</td></tr><tr><td>blur.shard.block.cache.v2.write.nocache.ext (fdt)</td><td>v2 version of block cache only. This specifies what file types should NOT be cached during writes.  Comma delimited list.</td></tr><tr><td>blur.shard.block.cache.v2.write.default (true)</td><td>v2 version of block cache only. This specifies the default behavior if a file type is not specified in the cache or nocache lists during writes.  Values true | false</td></tr><tr><td>blur.shard.index.compressionmode (FAST)</td><td>Sets the compression of used in the storing of the fields. Valid entries are FAST FAST_DECOMPRESSION HIGH_COMPRESSION</td></tr><tr><td>blur.shard.index.chunksize (16384)</td><td>Sets the chunk
 size of the compression in the storing of the fields. Larger values may produce smaller fdt files at the small cost of fetch performance.</td></tr><tr><td>blur.shard.buffercache.8192 (67108864)</td><td>The amount of memory to be used by 8K byte buffers.  Note if you change the "blur.shard.block.cache.v2.cacheBlockSize" or "blur.shard.block.cache.v2.fileBufferSize" you should adjust the buffer sizes as well as the total memory allocated.  For example if you increased the "blur.shard.block.cache.v2.fileBufferSize" to 64K (65536) then this property should to "blur.shard.buffercache.65536".  You can also define as many of these properties as needed.</td></tr><tr><td>blur.shard.buffercache.1024 (8388608)</td><td>The amount of memory to be used by 1K byte buffers.  Note if you change the "blur.shard.block.cache.v2.cacheBlockSize" or "blur.shard.block.cache.v2.fileBufferSize" you should adjust the buffer sizes as well as the total memory allocated.</td></tr><tr><td>blur.shard.safemodedelay
  (5000)</td><td>The number of milliseconds to wait for the cluster to settle once changes have ceased</td></tr><tr><td>blur.shard.time.between.commits (30000)</td><td>The default time between index commits</td></tr><tr><td>blur.shard.time.between.refreshs (3000)</td><td>The default time between index refreshs</td></tr><tr><td>blur.shard.merge.thread.count (8)</td><td>The max number of threads used during index merges</td></tr><tr><td>blur.max.clause.count (1024)</td><td>The maximum number of clauses in a BooleanQuery</td></tr><tr><td>blur.indexmanager.search.thread.count (8)</td><td>The number of thread used for parallel searching in the index manager</td></tr><tr><td>blur.indexmanager.mutate.thread.count (8)</td><td>The number of thread used for parallel mutating in the index manager</td></tr><tr><td>blur.indexmanager.facet.thread.count (8)</td><td>The number of thread used for parallel faceting in the index manager</td></tr><tr><td>blur.shard.warmup.thread.count (8)</td><td>Number
  of threads used for warming up the index</td></tr><tr><td>blur.shard.fetchcount (110)</td><td>The fetch count per Lucene search, this fetches pointers to hits</td></tr><tr><td>blur.max.heap.per.row.fetch (10000000)</td><td>Heap limit on row fetch, once this limit has been reached the request will return</td></tr><tr><td>blur.max.records.per.row.fetch.request (1000)</td><td>The maximum number of records in a single row fetch</td></tr><tr><td>blur.gui.shard.port (40090)</td><td>The http status page port for the shard server</td></tr><tr><td>blur.shard.filtered.server.class</td><td>To intercept the calls made to the shard server and perform server side changes to the calls extend org.apache.blur.server.FilteredBlurServer.</td></tr><tr><td>blur.shard.blurindex.class</td><td>Defines the blur index class to be used to handle index requests.  This class has to extend org.apache.blur.manager.writer.BlurIndex.  This can be defined globally as well as per table.</td></tr><tr><td>blur.shard.r
 ead.interceptor</td><td>Defines the blur read interceptor class that can mask data from query results as well as data fetches.</td></tr><tr><td>blur.lucene.fst.bytearray.factory</td><td>Defines the byte array factory class that blur will use to manage the FST trees in Lucene (extends org.apache.blur.lucene.fst.ByteArrayFactory).</td></tr><tr><td>blur.shard.queue.max.pause.time.when.empty (1000)</td><td>The maximum amount of time to pause before checking the queue for RowMutations.</td></tr><tr><td>blur.shard.queue.max.writer.lock.time (5000)</td><td>The maximum amount of time that the queue can lock the writer before committing.  NOTE: Any other writer event will cause the queue to release it's lock on the writer.</td></tr><tr><td>blur.shard.queue.max.queue.batch.size (100)</td><td>The maximum number of the RowMutations the writer can drain from the queue at a time.</td></tr>
 			</table>
 
             <h3 id="shard-blur-env">blur-env.sh</h3>


Mime
View raw message