incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [5/6] git commit: Adding the the enqueueMutate method through the Blur API stack. Still need to finish the TableQueueReader running in the controller for listening to logical queues.
Date Thu, 06 Mar 2014 21:40:45 GMT
Adding the the enqueueMutate method through the Blur API stack.  Still need to finish the TableQueueReader running in the controller for listening to logical queues.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/8de60c97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/8de60c97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/8de60c97

Branch: refs/heads/apache-blur-0.2
Commit: 8de60c973dc5496a1f8c61766fb95c5b1ad3cc35
Parents: 2b8f22c
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed Mar 5 21:52:38 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed Mar 5 21:52:38 2014 -0500

----------------------------------------------------------------------
 .../org/apache/blur/manager/IndexManager.java   |  136 +-
 .../blur/manager/writer/BaseQueueReader.java    |  124 +
 .../apache/blur/manager/writer/BlurIndex.java   |    3 +
 .../blur/manager/writer/BlurIndexReadOnly.java  |   15 +-
 .../manager/writer/BlurIndexSimpleWriter.java   |   41 +-
 .../blur/manager/writer/MutatableAction.java    |  109 +
 .../blur/manager/writer/MutationQueue.java      |   39 +
 .../apache/blur/manager/writer/QueueReader.java |  130 -
 .../blur/manager/writer/ShardQueueReader.java   |   63 +
 .../blur/manager/writer/TableQueueReader.java   |   61 +
 .../apache/blur/server/FilteredBlurServer.java  |   10 +
 .../org/apache/blur/server/TableContext.java    |   12 +-
 .../blur/thrift/BlurControllerServer.java       |  101 +
 .../org/apache/blur/thrift/BlurShardServer.java |   36 +
 .../writer/BlurIndexSimpleWriterTest.java       |   74 +-
 .../blur/manager/writer/IndexImporterTest.java  |    6 +
 .../writer/QueueReaderBasicInMemory.java        |    2 +-
 .../org/apache/blur/thrift/BlurClusterTest.java |   32 +
 .../apache/blur/shell/LoadTestDataCommand.java  |    7 +-
 .../org/apache/blur/thrift/generated/Blur.java  | 2691 ++++++++++++++----
 .../blur/thrift/generated/SafeClientGen.java    |   84 +
 .../org/apache/blur/thrift/util/LoadData.java   |   22 +-
 .../org/apache/blur/utils/BlurConstants.java    |    4 +
 .../src/main/scripts/interface/Blur.thrift      |   16 +
 .../main/scripts/interface/gen-html/Blur.html   |   16 +
 .../main/scripts/interface/gen-html/index.html  |    2 +
 .../org/apache/blur/thrift/generated/Blur.java  | 2691 ++++++++++++++----
 .../src/main/scripts/interface/gen-js/Blur.js   |  740 +++--
 .../scripts/interface/gen-perl/Blur/Blur.pm     |  705 ++++-
 .../src/main/scripts/interface/gen-rb/blur.rb   |  118 +
 docs/Blur.html                                  |   16 +
 31 files changed, 6464 insertions(+), 1642 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index c8822d0..5e47a10 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -34,7 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -78,12 +77,8 @@ import org.apache.blur.thrift.generated.FetchResult;
 import org.apache.blur.thrift.generated.FetchRowResult;
 import org.apache.blur.thrift.generated.HighlightOptions;
 import org.apache.blur.thrift.generated.QueryState;
-import org.apache.blur.thrift.generated.Record;
-import org.apache.blur.thrift.generated.RecordMutation;
-import org.apache.blur.thrift.generated.RecordMutationType;
 import org.apache.blur.thrift.generated.Row;
 import org.apache.blur.thrift.generated.RowMutation;
-import org.apache.blur.thrift.generated.RowMutationType;
 import org.apache.blur.thrift.generated.ScoreType;
 import org.apache.blur.thrift.generated.Selector;
 import org.apache.blur.trace.Trace;
@@ -1048,126 +1043,43 @@ public class IndexManager {
   }
 
   private void doMutates(List<RowMutation> mutations) throws BlurException, IOException {
-    mutations = reduceMutates(mutations);
+    mutations = MutatableAction.reduceMutates(mutations);
     Map<String, List<RowMutation>> map = getMutatesPerTable(mutations);
     for (Entry<String, List<RowMutation>> entry : map.entrySet()) {
       doMutates(entry.getKey(), entry.getValue());
     }
   }
-
-  private List<RowMutation> reduceMutates(List<RowMutation> mutations) throws BlurException {
-    Map<String, RowMutation> mutateMap = new TreeMap<String, RowMutation>();
-    for (RowMutation mutation : mutations) {
-      RowMutation rowMutation = mutateMap.get(mutation.getRowId());
-      if (rowMutation != null) {
-        mutateMap.put(mutation.getRowId(), merge(rowMutation, mutation));
-      } else {
-        mutateMap.put(mutation.getRowId(), mutation);
-      }
-    }
-    return new ArrayList<RowMutation>(mutateMap.values());
-  }
-
-  private RowMutation merge(RowMutation mutation1, RowMutation mutation2) throws BlurException {
-    RowMutationType rowMutationType1 = mutation1.getRowMutationType();
-    RowMutationType rowMutationType2 = mutation2.getRowMutationType();
-    if (!rowMutationType1.equals(rowMutationType2)) {
-      throw new BException(
-          "RowMutation conflict, cannot perform 2 different operations on the same row in the same batch. [{0}] [{1}]",
-          mutation1, mutation2);
-    }
-    if (rowMutationType1.equals(RowMutationType.DELETE_ROW)) {
-      // Since both are trying to delete the same row, just pick one and move
-      // on.
-      return mutation1;
-    } else if (rowMutationType1.equals(RowMutationType.REPLACE_ROW)) {
-      throw new BException(
-          "RowMutation conflict, cannot perform 2 different REPLACE_ROW mutations on the same row in the same batch. [{0}] [{1}]",
-          mutation1, mutation2);
-    } else {
-      // Now this is a row update, so try to merge the record mutations
-      List<RecordMutation> recordMutations1 = mutation1.getRecordMutations();
-      List<RecordMutation> recordMutations2 = mutation2.getRecordMutations();
-      List<RecordMutation> mergedRecordMutations = merge(recordMutations1, recordMutations2);
-      mutation1.setRecordMutations(mergedRecordMutations);
-      return mutation1;
-    }
-  }
-
-  private List<RecordMutation> merge(List<RecordMutation> recordMutations1, List<RecordMutation> recordMutations2)
-      throws BException {
-    Map<String, RecordMutation> recordMutationMap = new TreeMap<String, RecordMutation>();
-    merge(recordMutations1, recordMutationMap);
-    merge(recordMutations2, recordMutationMap);
-    return new ArrayList<RecordMutation>(recordMutationMap.values());
-  }
-
-  private void merge(List<RecordMutation> recordMutations, Map<String, RecordMutation> recordMutationMap)
-      throws BException {
-    for (RecordMutation recordMutation : recordMutations) {
-      Record record = recordMutation.getRecord();
-      String recordId = record.getRecordId();
-      RecordMutation existing = recordMutationMap.get(recordId);
-      if (existing != null) {
-        recordMutationMap.put(recordId, merge(recordMutation, existing));
-      } else {
-        recordMutationMap.put(recordId, recordMutation);
-      }
+  
+  public void enqueue(List<RowMutation> mutations) throws BlurException, IOException {
+    mutations = MutatableAction.reduceMutates(mutations);
+    Map<String, List<RowMutation>> map = getMutatesPerTable(mutations);
+    for (Entry<String, List<RowMutation>> entry : map.entrySet()) {
+      doEnqueue(entry.getKey(), entry.getValue());
     }
   }
-
-  private RecordMutation merge(RecordMutation recordMutation1, RecordMutation recordMutation2) throws BException {
-    RecordMutationType recordMutationType1 = recordMutation1.getRecordMutationType();
-    RecordMutationType recordMutationType2 = recordMutation2.getRecordMutationType();
-    if (!recordMutationType1.equals(recordMutationType2)) {
-      throw new BException(
-          "RecordMutation conflict, cannot perform 2 different operations on the same record in the same row in the same batch. [{0}] [{1}]",
-          recordMutation1, recordMutation2);
-    }
-
-    if (recordMutationType1.equals(RecordMutationType.DELETE_ENTIRE_RECORD)) {
-      // Since both are trying to delete the same record, just pick one and move
-      // on.
-      return recordMutation1;
-    } else if (recordMutationType1.equals(RecordMutationType.REPLACE_ENTIRE_RECORD)) {
-      throw new BException(
-          "RecordMutation conflict, cannot perform 2 different replace record operations on the same record in the same row in the same batch. [{0}] [{1}]",
-          recordMutation1, recordMutation2);
-    } else if (recordMutationType1.equals(RecordMutationType.REPLACE_COLUMNS)) {
-      throw new BException(
-          "RecordMutation conflict, cannot perform 2 different replace columns operations on the same record in the same row in the same batch. [{0}] [{1}]",
-          recordMutation1, recordMutation2);
-    } else {
-      Record record1 = recordMutation1.getRecord();
-      Record record2 = recordMutation2.getRecord();
-      String family1 = record1.getFamily();
-      String family2 = record2.getFamily();
-
-      if (isSameFamily(family1, family2)) {
-        record1.getColumns().addAll(record2.getColumns());
-        return recordMutation1;
-      } else {
-        throw new BException("RecordMutation conflict, cannot merge records with different family. [{0}] [{1}]",
-            recordMutation1, recordMutation2);
+  
+  private void doEnqueue(final String table, List<RowMutation> mutations) throws IOException, BlurException {
+    final Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
+    Map<String, List<RowMutation>> mutationsByShard = new HashMap<String, List<RowMutation>>();
+    for (int i = 0; i < mutations.size(); i++) {
+      RowMutation mutation = mutations.get(i);
+      String shard = MutationHelper.getShardName(table, mutation.rowId, getNumberOfShards(table), _blurPartitioner);
+      List<RowMutation> list = mutationsByShard.get(shard);
+      if (list == null) {
+        list = new ArrayList<RowMutation>();
+        mutationsByShard.put(shard, list);
       }
+      list.add(mutation);
     }
-  }
-
-  private boolean isSameFamily(String family1, String family2) {
-    if (family1 == null && family2 == null) {
-      return true;
-    }
-    if (family1 != null && family1.equals(family2)) {
-      return true;
+    for (Entry<String, List<RowMutation>> entry : mutationsByShard.entrySet()) {
+      BlurIndex index = indexes.get(entry.getKey());
+      index.enqueue(entry.getValue());
     }
-    return false;
   }
 
   private void doMutates(final String table, List<RowMutation> mutations) throws IOException, BlurException {
     final Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
-
     Map<String, List<RowMutation>> mutationsByShard = new HashMap<String, List<RowMutation>>();
-
     for (int i = 0; i < mutations.size(); i++) {
       RowMutation mutation = mutations.get(i);
       String shard = MutationHelper.getShardName(table, mutation.rowId, getNumberOfShards(table), _blurPartitioner);
@@ -1351,4 +1263,8 @@ public class IndexManager {
     }
   }
 
+  public void enqueue(RowMutation mutation) throws BlurException, IOException {
+    enqueue(Arrays.asList(mutation));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-core/src/main/java/org/apache/blur/manager/writer/BaseQueueReader.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BaseQueueReader.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BaseQueueReader.java
new file mode 100644
index 0000000..590ec73
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BaseQueueReader.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.writer;
+
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_QUEUE_READER_BACKOFF;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_QUEUE_READER_MAX;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.RowMutation;
+
+public abstract class BaseQueueReader implements Closeable {
+
+  private static final Log LOG = LogFactory.getLog(BaseQueueReader.class);
+
+  protected final long _backOff;
+  protected final Thread _daemon;
+  protected final AtomicBoolean _running = new AtomicBoolean();
+  protected final int _max;
+  protected final String _context;
+  protected final BlurConfiguration _configuration;
+
+  public BaseQueueReader(BlurConfiguration configuration, String context) {
+    _running.set(true);
+    _configuration = configuration;
+    _backOff = configuration.getLong(BLUR_SHARD_INDEX_QUEUE_READER_BACKOFF, 500);
+    _max = configuration.getInt(BLUR_SHARD_INDEX_QUEUE_READER_MAX, 500);
+    _context = context;
+    _daemon = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        List<RowMutation> mutations = new ArrayList<RowMutation>();
+        while (_running.get()) {
+          take(mutations, _max);
+          if (mutations.isEmpty()) {
+            try {
+              Thread.sleep(_backOff);
+            } catch (InterruptedException e) {
+              return;
+            }
+          } else {
+            try {
+              mutations = MutatableAction.reduceMutates(mutations);
+            } catch (BlurException e) {
+              LOG.error("Unknown error while trying to reduce the number of mutations and prevent data loss.", e);
+              failure();
+              mutations.clear();
+              return;
+            }
+            doMutate(mutations);
+          }
+        }
+      }
+    });
+
+    _daemon.setName("Queue Loader for [" + _context + "]");
+    _daemon.setDaemon(true);
+  }
+
+  protected abstract void doMutate(List<RowMutation> mutations);
+
+  public void listen() {
+    _daemon.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (_running.get()) {
+      _running.set(false);
+      _daemon.interrupt();
+    }
+  }
+
+  /**
+   * Takes up to the max number of {@link RowMutation}s off the queue and
+   * returns. The implementation can choose to block until new items are
+   * available. However if the method returns without adding any items to the
+   * mutations list, the loading thread will back off a configurable amount of
+   * time. <br/>
+   * <br/>
+   * Configuration setting: &quot;blur.shard.index.queue.reader.backoff&quot;
+   * 
+   * @param mutations
+   * @param max
+   */
+  public abstract void take(List<RowMutation> mutations, int max);
+
+  /**
+   * This method will be called after each successful load of data from the
+   * queue. This will allow the queue to be notified that the information has
+   * been successfully loaded.
+   */
+  public abstract void success();
+
+  /**
+   * This method will be called after each failed load of data from the queue.
+   * This will allow the queue to be notified that the information has been WAS
+   * NOT successfully loaded.
+   */
+  public abstract void failure();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
index 23464b9..f8ff71a 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.blur.manager.indexserver.BlurIndexWarmup;
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
+import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexReaderContext;
@@ -133,4 +134,6 @@ public abstract class BlurIndex {
 
   public abstract void process(IndexAction indexAction) throws IOException;
 
+  public abstract void enqueue(List<RowMutation> mutations) throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
index 1860291..c2aee75 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
@@ -21,7 +21,7 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.blur.server.IndexSearcherClosable;
-import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.thrift.generated.RowMutation;
 
 public class BlurIndexReadOnly extends BlurIndex {
 
@@ -32,14 +32,6 @@ public class BlurIndexReadOnly extends BlurIndex {
     _blurIndex = blurIndex;
   }
 
-  public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException {
-    throw new IOException("Read Only");
-  }
-
-  public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException {
-    throw new IOException("Read Only");
-  }
-
   public IndexSearcherClosable getIndexSearcher() throws IOException {
     return _blurIndex.getIndexSearcher();
   }
@@ -77,4 +69,9 @@ public class BlurIndexReadOnly extends BlurIndex {
     throw new RuntimeException("Read-only shard");
   }
 
+  @Override
+  public void enqueue(List<RowMutation> mutations) {
+    throw new RuntimeException("Read-only shard");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index a644c8f..be083c0 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -40,6 +40,7 @@ import org.apache.blur.manager.indexserver.BlurIndexWarmup;
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.trace.Trace;
 import org.apache.blur.trace.Tracer;
 import org.apache.hadoop.fs.Path;
@@ -68,7 +69,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   private final AtomicReference<BlurIndexWriter> _writer = new AtomicReference<BlurIndexWriter>();
   private final boolean _makeReaderExitable = true;
   private IndexImporter _indexImporter;
-  private QueueReader _queueReader;
+  private ShardQueueReader _queueReader;
   private final ReadWriteLock _lock = new ReentrantReadWriteLock();
   private final Lock _writeLock = _lock.writeLock();
   private final ReadWriteLock _indexRefreshLock = new ReentrantReadWriteLock();
@@ -79,6 +80,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   private final IndexDeletionPolicyReader _policy;
   private final SnapshotIndexDeletionPolicy _snapshotIndexDeletionPolicy;
   private final String _context;
+  private final MutationQueue _mutationQueue;
 
   public BlurIndexSimpleWriter(ShardContext shardContext, Directory directory, SharedMergeScheduler mergeScheduler,
       final ExecutorService searchExecutor, BlurIndexCloser indexCloser, BlurIndexWarmup indexWarmup)
@@ -103,6 +105,8 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     _policy = new IndexDeletionPolicyReader(_snapshotIndexDeletionPolicy);
     _conf.setIndexDeletionPolicy(_policy);
 
+    _mutationQueue = new MutationQueue();
+
     if (!DirectoryReader.indexExists(directory)) {
       new BlurIndexWriter(directory, _conf).close();
     }
@@ -118,6 +122,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   }
 
   private synchronized void openWriter() {
+    IOUtils.cleanup(LOG, _queueReader, _indexImporter);
     BlurIndexWriter writer = _writer.get();
     if (writer != null) {
       try {
@@ -150,7 +155,30 @@ public class BlurIndexSimpleWriter extends BlurIndex {
             _writer.notify();
           }
           _indexImporter = new IndexImporter(BlurIndexSimpleWriter.this, _shardContext, TimeUnit.SECONDS, 10);
-          _queueReader = _tableContext.getQueueReader(BlurIndexSimpleWriter.this, _shardContext);
+          _queueReader = new ShardQueueReader(BlurIndexSimpleWriter.this, _shardContext) {
+
+            private final List<RowMutation> _list = new ArrayList<RowMutation>();
+
+            @Override
+            public void take(List<RowMutation> mutations, int max) {
+              _list.clear();
+              _mutationQueue.take(mutations, max);
+              _list.addAll(mutations);
+              LOG.debug("Number of messages taken [{0}]", _list.size());
+            }
+
+            @Override
+            public void success() {
+
+            }
+
+            @Override
+            public void failure() {
+              // Do something with stored mutations
+            }
+
+          };
+          _queueReader.listen();
         } catch (IOException e) {
           LOG.error("Unknown error on index writer open.", e);
         }
@@ -344,4 +372,13 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     return _snapshotIndexDeletionPolicy.getSnapshotsDirectoryPath();
   }
 
+  @Override
+  public void enqueue(List<RowMutation> mutations) throws IOException {
+    try {
+      _mutationQueue.put(mutations);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
index 2a7159b..b245545 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
@@ -33,7 +33,9 @@ import org.apache.blur.manager.IndexManager;
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.BException;
 import org.apache.blur.thrift.MutationHelper;
+import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.Column;
 import org.apache.blur.thrift.generated.FetchResult;
 import org.apache.blur.thrift.generated.FetchRowResult;
@@ -429,4 +431,111 @@ public class MutatableAction extends IndexAction {
     }
   }
 
+  public static List<RowMutation> reduceMutates(List<RowMutation> mutations) throws BlurException {
+    Map<String, RowMutation> mutateMap = new TreeMap<String, RowMutation>();
+    for (RowMutation mutation : mutations) {
+      RowMutation rowMutation = mutateMap.get(mutation.getRowId());
+      if (rowMutation != null) {
+        mutateMap.put(mutation.getRowId(), merge(rowMutation, mutation));
+      } else {
+        mutateMap.put(mutation.getRowId(), mutation);
+      }
+    }
+    return new ArrayList<RowMutation>(mutateMap.values());
+  }
+
+  private static RowMutation merge(RowMutation mutation1, RowMutation mutation2) throws BlurException {
+    RowMutationType rowMutationType1 = mutation1.getRowMutationType();
+    RowMutationType rowMutationType2 = mutation2.getRowMutationType();
+    if (!rowMutationType1.equals(rowMutationType2)) {
+      throw new BException(
+          "RowMutation conflict, cannot perform 2 different operations on the same row in the same batch. [{0}] [{1}]",
+          mutation1, mutation2);
+    }
+    if (rowMutationType1.equals(RowMutationType.DELETE_ROW)) {
+      // Since both are trying to delete the same row, just pick one and move
+      // on.
+      return mutation1;
+    } else if (rowMutationType1.equals(RowMutationType.REPLACE_ROW)) {
+      throw new BException(
+          "RowMutation conflict, cannot perform 2 different REPLACE_ROW mutations on the same row in the same batch. [{0}] [{1}]",
+          mutation1, mutation2);
+    } else {
+      // Now this is a row update, so try to merge the record mutations
+      List<RecordMutation> recordMutations1 = mutation1.getRecordMutations();
+      List<RecordMutation> recordMutations2 = mutation2.getRecordMutations();
+      List<RecordMutation> mergedRecordMutations = merge(recordMutations1, recordMutations2);
+      mutation1.setRecordMutations(mergedRecordMutations);
+      return mutation1;
+    }
+  }
+
+  private static List<RecordMutation> merge(List<RecordMutation> recordMutations1, List<RecordMutation> recordMutations2)
+      throws BException {
+    Map<String, RecordMutation> recordMutationMap = new TreeMap<String, RecordMutation>();
+    merge(recordMutations1, recordMutationMap);
+    merge(recordMutations2, recordMutationMap);
+    return new ArrayList<RecordMutation>(recordMutationMap.values());
+  }
+
+  private static void merge(List<RecordMutation> recordMutations, Map<String, RecordMutation> recordMutationMap)
+      throws BException {
+    for (RecordMutation recordMutation : recordMutations) {
+      Record record = recordMutation.getRecord();
+      String recordId = record.getRecordId();
+      RecordMutation existing = recordMutationMap.get(recordId);
+      if (existing != null) {
+        recordMutationMap.put(recordId, merge(recordMutation, existing));
+      } else {
+        recordMutationMap.put(recordId, recordMutation);
+      }
+    }
+  }
+
+  private static RecordMutation merge(RecordMutation recordMutation1, RecordMutation recordMutation2) throws BException {
+    RecordMutationType recordMutationType1 = recordMutation1.getRecordMutationType();
+    RecordMutationType recordMutationType2 = recordMutation2.getRecordMutationType();
+    if (!recordMutationType1.equals(recordMutationType2)) {
+      throw new BException(
+          "RecordMutation conflict, cannot perform 2 different operations on the same record in the same row in the same batch. [{0}] [{1}]",
+          recordMutation1, recordMutation2);
+    }
+
+    if (recordMutationType1.equals(RecordMutationType.DELETE_ENTIRE_RECORD)) {
+      // Since both are trying to delete the same record, just pick one and move
+      // on.
+      return recordMutation1;
+    } else if (recordMutationType1.equals(RecordMutationType.REPLACE_ENTIRE_RECORD)) {
+      throw new BException(
+          "RecordMutation conflict, cannot perform 2 different replace record operations on the same record in the same row in the same batch. [{0}] [{1}]",
+          recordMutation1, recordMutation2);
+    } else if (recordMutationType1.equals(RecordMutationType.REPLACE_COLUMNS)) {
+      throw new BException(
+          "RecordMutation conflict, cannot perform 2 different replace columns operations on the same record in the same row in the same batch. [{0}] [{1}]",
+          recordMutation1, recordMutation2);
+    } else {
+      Record record1 = recordMutation1.getRecord();
+      Record record2 = recordMutation2.getRecord();
+      String family1 = record1.getFamily();
+      String family2 = record2.getFamily();
+
+      if (isSameFamily(family1, family2)) {
+        record1.getColumns().addAll(record2.getColumns());
+        return recordMutation1;
+      } else {
+        throw new BException("RecordMutation conflict, cannot merge records with different family. [{0}] [{1}]",
+            recordMutation1, recordMutation2);
+      }
+    }
+  }
+
+  private static boolean isSameFamily(String family1, String family2) {
+    if (family1 == null && family2 == null) {
+      return true;
+    }
+    if (family1 != null && family1.equals(family2)) {
+      return true;
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-core/src/main/java/org/apache/blur/manager/writer/MutationQueue.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/MutationQueue.java b/blur-core/src/main/java/org/apache/blur/manager/writer/MutationQueue.java
new file mode 100644
index 0000000..4465a1e
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/MutationQueue.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.writer;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.blur.thrift.generated.RowMutation;
+
+public class MutationQueue {
+
+  private BlockingQueue<RowMutation> _queue = new ArrayBlockingQueue<RowMutation>(1000);
+
+  public void put(List<RowMutation> mutations) throws InterruptedException {
+    for (RowMutation mutation : mutations) {
+      _queue.put(mutation);
+    }
+  }
+
+  public void take(List<RowMutation> mutations, int max) {
+    _queue.drainTo(mutations, max);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-core/src/main/java/org/apache/blur/manager/writer/QueueReader.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/QueueReader.java b/blur-core/src/main/java/org/apache/blur/manager/writer/QueueReader.java
deleted file mode 100644
index a911465..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/QueueReader.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.manager.writer;
-
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_QUEUE_READER_BACKOFF;
-import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_QUEUE_READER_MAX;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.blur.BlurConfiguration;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.server.ShardContext;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.thrift.generated.RowMutation;
-
-public abstract class QueueReader implements Closeable {
-
-  private static final Log LOG = LogFactory.getLog(QueueReader.class);
-
-  protected final ShardContext _shardContext;
-  protected final BlurIndex _index;
-  protected final long _backOff;
-  protected final Thread _daemon;
-  protected final AtomicBoolean _running = new AtomicBoolean();
-  protected final int _max;
-  protected final TableContext _tableContext;
-
-  public QueueReader(BlurIndex index, ShardContext shardContext) {
-    _running.set(true);
-    _index = index;
-    _shardContext = shardContext;
-    _tableContext = _shardContext.getTableContext();
-    BlurConfiguration configuration = _tableContext.getBlurConfiguration();
-    _backOff = configuration.getLong(BLUR_SHARD_INDEX_QUEUE_READER_BACKOFF, 500);
-    _max = configuration.getInt(BLUR_SHARD_INDEX_QUEUE_READER_MAX, 500);
-    _daemon = new Thread(new Runnable() {
-
-      @Override
-      public void run() {
-        List<RowMutation> mutations = new ArrayList<RowMutation>();
-        while (_running.get()) {
-          take(mutations, _max);
-          if (mutations.isEmpty()) {
-            try {
-              Thread.sleep(_backOff);
-            } catch (InterruptedException e) {
-              return;
-            }
-          } else {
-            MutatableAction mutatableAction = new MutatableAction(_shardContext);
-            mutatableAction.mutate(mutations);
-            try {
-              _index.process(mutatableAction);
-              success();
-            } catch (IOException e) {
-              failure();
-              LOG.error(
-                  "Unknown error during loading of rowmutations from queue [{0}] into table [{1}] and shard [{2}].",
-                  this.toString(), _tableContext.getTable(), _shardContext.getShard());
-            } finally {
-              mutations.clear();
-            }
-          }
-        }
-      }
-    });
-    _daemon.setName("Queue Loader for [" + _tableContext.getTable() + "/" + shardContext.getShard() + "]");
-    _daemon.setDaemon(true);
-  }
-
-  public void listen() {
-    _daemon.start();
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (_running.get()) {
-      _running.set(false);
-      _daemon.interrupt();
-    }
-  }
-
-  /**
-   * Takes up to the max number of {@link RowMutation}s off the queue and
-   * returns. The implementation can choose to block until new items are
-   * available. However if the method returns without adding any items to the
-   * mutations list, the loading thread will back off a configurable amount of
-   * time. <br/>
-   * <br/>
-   * Configuration setting: &quot;blur.shard.index.queue.reader.backoff&quot;
-   * 
-   * @param mutations
-   * @param max
-   */
-  public abstract void take(List<RowMutation> mutations, int max);
-
-  /**
-   * This method will be called after each successful load of data from the
-   * queue. This will allow the queue to be notified that the information has
-   * been successfully loaded.
-   */
-  public abstract void success();
-
-  /**
-   * This method will be called after each failed load of data from the queue.
-   * This will allow the queue to be notified that the information has been WAS
-   * NOT successfully loaded.
-   */
-  public abstract void failure();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-core/src/main/java/org/apache/blur/manager/writer/ShardQueueReader.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/ShardQueueReader.java b/blur-core/src/main/java/org/apache/blur/manager/writer/ShardQueueReader.java
new file mode 100644
index 0000000..34bcf2e
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/ShardQueueReader.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.writer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.RowMutation;
+
+public abstract class ShardQueueReader extends BaseQueueReader implements Closeable {
+
+  private static final Log LOG = LogFactory.getLog(ShardQueueReader.class);
+
+  private final BlurIndex _index;
+  private final ShardContext _shardContext;
+  private final TableContext _tableContext;
+
+  public ShardQueueReader(BlurIndex index, ShardContext shardContext) {
+    super(shardContext.getTableContext().getBlurConfiguration(), getContext(shardContext));
+    _index = index;
+    _shardContext = shardContext;
+    _tableContext = shardContext.getTableContext();
+  }
+
+  protected void doMutate(List<RowMutation> mutations) {
+    MutatableAction mutatableAction = new MutatableAction(_shardContext);
+    mutatableAction.mutate(mutations);
+    try {
+      _index.process(mutatableAction);
+      success();
+    } catch (IOException e) {
+      failure();
+      LOG.error("Unknown error during loading of rowmutations from queue [{0}] into table [{1}] and shard [{2}].",
+          this.toString(), _tableContext.getTable(), _shardContext.getShard());
+    } finally {
+      mutations.clear();
+    }
+  }
+
+  private static String getContext(ShardContext shardContext) {
+    return shardContext.getTableContext() + "/" + shardContext.getShard();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-core/src/main/java/org/apache/blur/manager/writer/TableQueueReader.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/TableQueueReader.java b/blur-core/src/main/java/org/apache/blur/manager/writer/TableQueueReader.java
new file mode 100644
index 0000000..2dc8101
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/TableQueueReader.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.writer;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.RowMutation;
+
+public abstract class TableQueueReader extends BaseQueueReader implements Closeable {
+
+  private static final Log LOG = LogFactory.getLog(TableQueueReader.class);
+
+  protected final TableContext _tableContext;
+  protected final Iface _client;
+
+  public TableQueueReader(Iface client, TableContext tableContext) {
+    super(tableContext.getBlurConfiguration(), tableContext.getTable());
+    _client = client;
+    _tableContext = tableContext;
+  }
+
+  @Override
+  protected void doMutate(List<RowMutation> mutations) {
+    try {
+      _client.enqueueMutateBatch(mutations);
+      success();
+    } catch (BlurException e) {
+      failure();
+      LOG.error("Unknown error during loading of rowmutations from queue [{0}] into table [{1}].", this.toString(),
+          _tableContext.getTable());
+    } catch (TException e) {
+      failure();
+      LOG.error("Unknown error during loading of rowmutations from queue [{0}] into table [{1}].", this.toString(),
+          _tableContext.getTable());
+    } finally {
+      mutations.clear();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
index 472a2a9..53e59fe 100644
--- a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
+++ b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
@@ -234,4 +234,14 @@ public class FilteredBlurServer implements Iface {
     _iface.resetLogging();
   }
 
+  @Override
+  public void enqueueMutate(RowMutation mutation) throws BlurException, TException {
+    _iface.enqueueMutate(mutation);
+  }
+
+  @Override
+  public void enqueueMutateBatch(List<RowMutation> mutations) throws BlurException, TException {
+    _iface.enqueueMutateBatch(mutations);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-core/src/main/java/org/apache/blur/server/TableContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/TableContext.java b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
index 53a912b..ea94823 100644
--- a/blur-core/src/main/java/org/apache/blur/server/TableContext.java
+++ b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
@@ -50,7 +50,7 @@ import org.apache.blur.manager.indexserver.BlurIndexWarmup;
 import org.apache.blur.manager.writer.BlurIndex;
 import org.apache.blur.manager.writer.BlurIndexCloser;
 import org.apache.blur.manager.writer.BlurIndexSimpleWriter;
-import org.apache.blur.manager.writer.QueueReader;
+import org.apache.blur.manager.writer.ShardQueueReader;
 //import org.apache.blur.manager.writer.BlurNRTIndex;
 import org.apache.blur.manager.writer.SharedMergeScheduler;
 import org.apache.blur.thrift.generated.ScoreType;
@@ -367,21 +367,21 @@ public class TableContext {
   }
 
   @SuppressWarnings("unchecked")
-  public QueueReader getQueueReader(BlurIndex blurIndex, ShardContext shardContext) throws IOException {
+  public ShardQueueReader getQueueReader(BlurIndex blurIndex, ShardContext shardContext) throws IOException {
     String className = _blurConfiguration.get(BLUR_SHARD_INDEX_QUEUE_READER_CLASS);
     if (className == null || className.trim().isEmpty()) {
       return null;
     }
-    Class<? extends QueueReader> clazz;
+    Class<? extends ShardQueueReader> clazz;
     try {
-      clazz = (Class<? extends QueueReader>) Class.forName(className);
+      clazz = (Class<? extends ShardQueueReader>) Class.forName(className);
     } catch (ClassNotFoundException e) {
       throw new IOException(e);
     }
     try {
-      Constructor<? extends QueueReader> constructor = clazz.getConstructor(new Class[] { BlurIndex.class,
+      Constructor<? extends ShardQueueReader> constructor = clazz.getConstructor(new Class[] { BlurIndex.class,
           ShardContext.class });
-      QueueReader reader = constructor.newInstance(blurIndex, shardContext);
+      ShardQueueReader reader = constructor.newInstance(blurIndex, shardContext);
 
       reader.listen();
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index f90dffb..f04c5ac 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -1065,6 +1065,38 @@ public class BlurControllerServer extends TableAdmin implements Iface {
     }
   }
 
+  @Override
+  public void enqueueMutate(final RowMutation mutation) throws BlurException, TException {
+    try {
+      checkTable(mutation.table);
+      checkForUpdates(mutation.table);
+      MutationHelper.validateMutation(mutation);
+      String table = mutation.getTable();
+
+      int numberOfShards = getShardCount(table);
+      Map<String, String> tableLayout = _shardServerLayout.get().get(table);
+      if (tableLayout.size() != numberOfShards) {
+        throw new BException("Cannot update data while shard is missing");
+      }
+
+      String shardName = MutationHelper.getShardName(table, mutation.rowId, numberOfShards, _blurPartitioner);
+      String node = tableLayout.get(shardName);
+      _client.execute(node, new BlurCommand<Void>() {
+        @Override
+        public Void call(Client client) throws BlurException, TException {
+          client.enqueueMutate(mutation);
+          return null;
+        }
+      }, _maxMutateRetries, _mutateDelay, _maxMutateDelay);
+    } catch (Exception e) {
+      LOG.error("Unknown error during enqueue mutation of [{0}]", e, mutation);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException("Unknown error during enqueue mutation of [{0}]", e, mutation);
+    }
+  }
+
   private int getShardCount(String table) throws BlurException, TException {
     Integer numberOfShards = _tableShardCountMap.get(table);
     if (numberOfShards == null) {
@@ -1143,6 +1175,75 @@ public class BlurControllerServer extends TableAdmin implements Iface {
       throw new BException("Unknown error during mutation.", e);
     }
   }
+  
+  @Override
+  public void enqueueMutateBatch(List<RowMutation> mutations) throws BlurException, TException {
+    try {
+      for (RowMutation mutation : mutations) {
+        MutationHelper.validateMutation(mutation);
+      }
+      Map<String, List<RowMutation>> batches = new HashMap<String, List<RowMutation>>();
+      for (RowMutation mutation : mutations) {
+        checkTable(mutation.table);
+        checkForUpdates(mutation.table);
+
+        MutationHelper.validateMutation(mutation);
+        String table = mutation.getTable();
+
+        int numberOfShards = getShardCount(table);
+        Map<String, String> tableLayout = _shardServerLayout.get().get(table);
+        if (tableLayout == null || tableLayout.size() != numberOfShards) {
+          throw new BException("Cannot update data while shard is missing");
+        }
+
+        String shardName = MutationHelper.getShardName(table, mutation.rowId, numberOfShards, _blurPartitioner);
+        String node = tableLayout.get(shardName);
+        List<RowMutation> list = batches.get(node);
+        if (list == null) {
+          list = new ArrayList<RowMutation>();
+          batches.put(node, list);
+        }
+        list.add(mutation);
+      }
+
+      List<Future<Void>> futures = new ArrayList<Future<Void>>();
+
+      for (Entry<String, List<RowMutation>> entry : batches.entrySet()) {
+        final String node = entry.getKey();
+        final List<RowMutation> mutationsLst = entry.getValue();
+        futures.add(_executor.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            return _client.execute(node, new BlurCommand<Void>() {
+              @Override
+              public Void call(Client client) throws BlurException, TException {
+                client.enqueueMutateBatch(mutationsLst);
+                return null;
+              }
+            }, _maxMutateRetries, _mutateDelay, _maxMutateDelay);
+          }
+        }));
+      }
+
+      for (Future<Void> future : futures) {
+        try {
+          future.get();
+        } catch (InterruptedException e) {
+          LOG.error("Unknown error during batch enqueue mutations", e);
+          throw new BException("Unknown error during batch enqueue mutations", e);
+        } catch (ExecutionException e) {
+          LOG.error("Unknown error during batch enqueue mutations", e.getCause());
+          throw new BException("Unknown error during batch enqueue mutations", e.getCause());
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Unknown error during batch mutation.", e);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException("Unknown error during mutation.", e);
+    }
+  }
 
   @Override
   public void createSnapshot(final String table, final String name) throws BlurException, TException {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index f16f814..d7ab206 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -367,6 +367,23 @@ public class BlurShardServer extends TableAdmin implements Iface {
   }
 
   @Override
+  public void enqueueMutate(RowMutation mutation) throws BlurException, TException {
+    try {
+      checkTable(_cluster, mutation.table);
+      checkForUpdates(_cluster, mutation.table);
+      resetSearchers();
+      MutationHelper.validateMutation(mutation);
+      _indexManager.enqueue(mutation);
+    } catch (Exception e) {
+      LOG.error("Unknown error during processing of [mutation={0}]", e, mutation);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
   public void mutateBatch(List<RowMutation> mutations) throws BlurException, TException {
     try {
       resetSearchers();
@@ -385,6 +402,25 @@ public class BlurShardServer extends TableAdmin implements Iface {
     }
   }
 
+  @Override
+  public void enqueueMutateBatch(List<RowMutation> mutations) throws BlurException, TException {
+    try {
+      resetSearchers();
+      for (RowMutation mutation : mutations) {
+        checkTable(_cluster, mutation.table);
+        checkForUpdates(_cluster, mutation.table);
+        MutationHelper.validateMutation(mutation);
+      }
+      _indexManager.enqueue(mutations);
+    } catch (Exception e) {
+      LOG.error("Unknown error during processing of [mutations={0}]", e, mutations);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
   public long getMaxTimeToLive() {
     return _maxTimeToLive;
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
index 4a66853..b31e30d 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
@@ -35,7 +36,11 @@ import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.thrift.generated.Column;
 import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.RecordMutation;
+import org.apache.blur.thrift.generated.RecordMutationType;
 import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.RowMutationType;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.trace.BaseTraceStorage;
 import org.apache.blur.trace.Trace;
@@ -52,6 +57,7 @@ import org.junit.Test;
 
 public class BlurIndexSimpleWriterTest {
 
+  private static final int TOTAL_ROWS_FOR_TESTS = 10000;
   private static final String TEST_TABLE = "test-table";
   private static final int TEST_NUMBER_WAIT_VISIBLE = 100;
   private static final int TEST_NUMBER = 50000;
@@ -84,19 +90,16 @@ public class BlurIndexSimpleWriterTest {
     _indexWarmup = new DefaultBlurIndexWarmup(1000000);
   }
 
-  private void setupWriter(Configuration configuration, long refresh, boolean reload) throws IOException {
+  private void setupWriter(Configuration configuration) throws IOException {
+    setupWriter(configuration, false);
+  }
+
+  private void setupWriter(Configuration configuration, boolean reload) throws IOException {
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setName(TEST_TABLE);
-    /*
-     * if reload is set to true...we create a new writer instance pointing to
-     * the same location as the old one..... so previous writer instances should
-     * be closed
-     */
-
     if (!reload && uuid == null) {
       uuid = UUID.randomUUID().toString();
     }
-
     tableDescriptor.setTableUri(new File(_base, "table-store-" + uuid).toURI().toString());
     TableContext tableContext = TableContext.create(tableDescriptor);
     File path = new File(_base, "index_" + uuid);
@@ -128,7 +131,7 @@ public class BlurIndexSimpleWriterTest {
 
   @Test
   public void testRollbackAndReopen() throws IOException {
-    setupWriter(_configuration, 5, false);
+    setupWriter(_configuration);
     {
       IndexSearcherClosable searcher = _writer.getIndexSearcher();
       IndexReader reader = searcher.getIndexReader();
@@ -164,7 +167,7 @@ public class BlurIndexSimpleWriterTest {
 
   @Test
   public void testBlurIndexWriter() throws IOException {
-    setupWriter(_configuration, 5, false);
+    setupWriter(_configuration);
     long s = System.nanoTime();
     int total = 0;
     TraceStorage oldStorage = Trace.getStorage();
@@ -204,7 +207,7 @@ public class BlurIndexSimpleWriterTest {
 
   @Test
   public void testBlurIndexWriterFaster() throws IOException, InterruptedException {
-    setupWriter(_configuration, 100, false);
+    setupWriter(_configuration);
     IndexSearcherClosable searcher1 = _writer.getIndexSearcher();
     IndexReader reader1 = searcher1.getIndexReader();
     assertEquals(0, reader1.numDocs());
@@ -246,7 +249,7 @@ public class BlurIndexSimpleWriterTest {
 
   @Test
   public void testCreateSnapshot() throws IOException {
-    setupWriter(_configuration, 5, false);
+    setupWriter(_configuration);
     _writer.createSnapshot("test_snapshot");
     assertTrue(_writer.getSnapshots().contains("test_snapshot"));
 
@@ -258,13 +261,13 @@ public class BlurIndexSimpleWriterTest {
     // create a new writer instance and test whether the snapshots are loaded
     // properly
     _writer.close();
-    setupWriter(_configuration, 5, true);
+    setupWriter(_configuration, true);
     assertTrue(_writer.getSnapshots().contains("test_snapshot"));
   }
 
   @Test
   public void testRemoveSnapshots() throws IOException {
-    setupWriter(_configuration, 5, false);
+    setupWriter(_configuration);
     Path snapshotsDirPath = _writer.getSnapshotsDirectoryPath();
     FileSystem fileSystem = snapshotsDirPath.getFileSystem(new Configuration());
     fileSystem.mkdirs(snapshotsDirPath);
@@ -274,12 +277,53 @@ public class BlurIndexSimpleWriterTest {
 
     // re-load the writer to load the snpshots
     _writer.close();
-    setupWriter(_configuration, 5, true);
+    setupWriter(_configuration, true);
     assertEquals(2, _writer.getSnapshots().size());
 
     _writer.removeSnapshot("test_snapshot2");
     assertEquals(1, _writer.getSnapshots().size());
     assertTrue(!_writer.getSnapshots().contains("test_snapshot2"));
+  }
 
+  @Test
+  public void testEnqueue() throws IOException, InterruptedException {
+    setupWriter(_configuration);
+    final String table = _writer.getShardContext().getTableContext().getTable();
+    Thread thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        for (int i = 0; i < TOTAL_ROWS_FOR_TESTS; i++) {
+          try {
+            _writer.enqueue(Arrays.asList(genRowMutation(table)));
+          } catch (IOException e) {
+            e.printStackTrace();
+          }
+        }
+      }
+    });
+    thread.start();
+    while (true) {
+      if (_writer.getIndexSearcher().getIndexReader().numDocs() == TOTAL_ROWS_FOR_TESTS) {
+        break;
+      }
+      Thread.sleep(100);
+    }
+    thread.join();
+    // YAY!!! it worked!
+  }
+
+  private RowMutation genRowMutation(String table) {
+    RowMutation rowMutation = new RowMutation();
+    rowMutation.setRowId(Long.toString(random.nextLong()));
+    rowMutation.setTable(table);
+    rowMutation.setRowMutationType(RowMutationType.REPLACE_ROW);
+    Record record = new Record();
+    record.setFamily("testing");
+    record.setRecordId(Long.toString(random.nextLong()));
+    for (int i = 0; i < 10; i++) {
+      record.addToColumns(new Column("col" + i, Long.toString(random.nextLong())));
+    }
+    rowMutation.addToRecordMutations(new RecordMutation(RecordMutationType.REPLACE_ENTIRE_RECORD, record));
+    return rowMutation;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
index 5e11ac3..821065d 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
@@ -36,6 +36,7 @@ import org.apache.blur.store.buffer.BufferStore;
 import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.blur.thrift.generated.Column;
 import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -187,6 +188,11 @@ public class IndexImporterTest {
       public void close() throws IOException {
         throw new RuntimeException("Not Implemented");
       }
+
+      @Override
+      public void enqueue(List<RowMutation> mutations) {
+        throw new RuntimeException("Not Implemented");
+      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-core/src/test/java/org/apache/blur/manager/writer/QueueReaderBasicInMemory.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/QueueReaderBasicInMemory.java b/blur-core/src/test/java/org/apache/blur/manager/writer/QueueReaderBasicInMemory.java
index 9c9507a..976aabe 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/QueueReaderBasicInMemory.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/QueueReaderBasicInMemory.java
@@ -23,7 +23,7 @@ import java.util.concurrent.BlockingQueue;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.thrift.generated.RowMutation;
 
-public class QueueReaderBasicInMemory extends QueueReader {
+public class QueueReaderBasicInMemory extends ShardQueueReader {
 
   static final BlockingQueue<RowMutation> _queue = new ArrayBlockingQueue<RowMutation>(1000);
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
index 7603d88..b332f5c 100644
--- a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
@@ -61,6 +61,7 @@ import org.apache.blur.thrift.generated.Schema;
 import org.apache.blur.thrift.generated.Selector;
 import org.apache.blur.thrift.generated.ShardState;
 import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
 import org.apache.blur.thrift.util.BlurThriftHelper;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.GCWatcher;
@@ -127,6 +128,37 @@ public class BlurClusterTest {
   }
 
   @Test
+  public void testEnqueue() throws BlurException, TException, InterruptedException, IOException {
+    String tableName = "testEnqueue";
+    createTable(tableName);
+    Blur.Iface client = getClient();
+
+    long s = System.currentTimeMillis();
+    int count = 10000;
+    for (int i = 0; i < count; i++) {
+      String rowId = UUID.randomUUID().toString();
+      RecordMutation mutation = BlurThriftHelper.newRecordMutation("test", rowId,
+          BlurThriftHelper.newColumn("test", "value"));
+      RowMutation rowMutation = BlurThriftHelper.newRowMutation(tableName, rowId, mutation);
+      client.enqueueMutate(rowMutation);
+    }
+    long e = System.currentTimeMillis();
+    double seconds = (e - s) / 1000.0;
+    double rate = count / seconds;
+    System.out.println("Load row in queue at " + rate + "/s");
+
+    for (int i = 0; i < 60; i++) {
+      TableStats stats = client.tableStats(tableName);
+      long rowCount = stats.getRowCount();
+      if (rowCount == count) {
+        return;
+      }
+      Thread.sleep(1000);
+    }
+    fail("Test failed to load all rows.");
+  }
+
+  @Test
   public void testBlurQueryWithRowId() throws BlurException, TException, InterruptedException, IOException {
     String tableName = "testBlurQueryWithRowId";
     createTable(tableName);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8de60c97/blur-shell/src/main/java/org/apache/blur/shell/LoadTestDataCommand.java
----------------------------------------------------------------------
diff --git a/blur-shell/src/main/java/org/apache/blur/shell/LoadTestDataCommand.java b/blur-shell/src/main/java/org/apache/blur/shell/LoadTestDataCommand.java
index a4516e2..8e0b865 100644
--- a/blur-shell/src/main/java/org/apache/blur/shell/LoadTestDataCommand.java
+++ b/blur-shell/src/main/java/org/apache/blur/shell/LoadTestDataCommand.java
@@ -30,11 +30,12 @@ public class LoadTestDataCommand extends Command implements TableFirstArgCommand
   @Override
   public void doit(PrintWriter out, Blur.Iface client, String[] args) throws CommandException, TException,
       BlurException {
-    if (args.length != 8) {
+    if (args.length != 9) {
       throw new CommandException("Invalid args: " + help());
     }
     int c = 1;
     String table = args[c++];
+    boolean enqueue = Boolean.parseBoolean(args[c++]);
     int numberRows = Integer.parseInt(args[c++]);
     int numberRecordsPerRow = Integer.parseInt(args[c++]);
     int numberOfFamilies = Integer.parseInt(args[c++]);
@@ -42,7 +43,7 @@ public class LoadTestDataCommand extends Command implements TableFirstArgCommand
     int numberOfWords = Integer.parseInt(args[c++]);
     int batch = Integer.parseInt(args[c++]);
     try {
-      LoadData.runLoad(client, table, numberRows, numberRecordsPerRow, numberOfFamilies, numberOfColumns,
+      LoadData.runLoad(client, enqueue, table, numberRows, numberRecordsPerRow, numberOfFamilies, numberOfColumns,
           numberOfWords, batch, out);
     } catch (IOException e) {
       out.println("Error " + e.getMessage());
@@ -59,7 +60,7 @@ public class LoadTestDataCommand extends Command implements TableFirstArgCommand
 
   @Override
   public String usage() {
-    return "<tablename> <rows> <recordsPerRow> <families> <columnsPerRecord> <wordsPerColumn> <batchSize>";
+    return "<tablename> <queue true/false> <rows> <recordsPerRow> <families> <columnsPerRecord> <wordsPerColumn> <batchSize>";
   }
 
   @Override


Mime
View raw message