incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [4/4] git commit: Big changes to the entire mutate stack, huge performance improvements. Also there is no longer any delay in data visibility.
Date Mon, 06 Jan 2014 19:52:32 GMT
Big changes to the entire mutate stack, huge performance improvements.  Also there is no longer any delay in data visibility.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/813ccd7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/813ccd7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/813ccd7d

Branch: refs/heads/apache-blur-0.2
Commit: 813ccd7d0bed42e75d9bb82ad784b2248498f037
Parents: a0be2c9
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Jan 6 14:49:19 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Jan 6 14:49:19 2014 -0500

----------------------------------------------------------------------
 .../org/apache/blur/manager/IndexManager.java   | 203 +++-------
 .../manager/indexserver/LocalIndexServer.java   |   9 +-
 .../apache/blur/manager/writer/BlurIndex.java   |   9 +-
 .../blur/manager/writer/BlurIndexReadOnly.java  |   5 +
 .../blur/manager/writer/BlurIndexReader.java    |   5 +
 .../manager/writer/BlurIndexSimpleWriter.java   |  13 +
 .../blur/manager/writer/BlurNRTIndex.java       |   5 +
 .../blur/manager/writer/MutatableAction.java    | 376 +++++++++++++++++++
 .../apache/blur/manager/IndexManagerTest.java   |   2 +-
 .../writer/BlurIndexSimpleWriterTest.java       |   3 +-
 .../blur/manager/writer/IndexImporterTest.java  |  90 ++---
 .../manager/writer/MutatableActionTest.java     | 287 ++++++++++++++
 .../apache/blur/store/hdfs/HdfsDirectory.java   |   8 +
 13 files changed, 807 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/813ccd7d/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index c258769..c72a227 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -18,7 +18,6 @@ package org.apache.blur.manager;
  */
 import static org.apache.blur.metrics.MetricsConstants.BLUR;
 import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
-import static org.apache.blur.thrift.util.BlurThriftHelper.findRecordMutation;
 import static org.apache.blur.utils.BlurConstants.FAMILY;
 import static org.apache.blur.utils.BlurConstants.PRIME_DOC;
 import static org.apache.blur.utils.BlurConstants.RECORD_ID;
@@ -28,9 +27,9 @@ import static org.apache.blur.utils.RowDocumentUtil.getRow;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -60,7 +59,9 @@ import org.apache.blur.manager.results.MergerBlurResultIterable;
 import org.apache.blur.manager.status.QueryStatus;
 import org.apache.blur.manager.status.QueryStatusManager;
 import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.manager.writer.MutatableAction;
 import org.apache.blur.server.IndexSearcherClosable;
+import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.ShardServerContext;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.thrift.BException;
@@ -68,7 +69,6 @@ import org.apache.blur.thrift.MutationHelper;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.BlurQuery;
 import org.apache.blur.thrift.generated.BlurQueryStatus;
-import org.apache.blur.thrift.generated.Column;
 import org.apache.blur.thrift.generated.ErrorType;
 import org.apache.blur.thrift.generated.Facet;
 import org.apache.blur.thrift.generated.FetchResult;
@@ -141,8 +141,8 @@ public class IndexManager {
   private final IndexServer _indexServer;
   private final ClusterStatus _clusterStatus;
   private final ExecutorService _executor;
-  private final ExecutorService _mutateExecutor;
   private final ExecutorService _facetExecutor;
+  private final ExecutorService _mutateExecutor;
 
   private final QueryStatusManager _statusManager = new QueryStatusManager();
   private final AtomicBoolean _closed = new AtomicBoolean(false);
@@ -924,7 +924,7 @@ public class IndexManager {
 
   public void mutate(final RowMutation mutation) throws BlurException, IOException {
     long s = System.nanoTime();
-    doMutate(mutation);
+    doMutates(Arrays.asList(mutation));
     long e = System.nanoTime();
     LOG.debug("doMutate took [{0} ms] to complete", (e - s) / 1000000.0);
   }
@@ -964,13 +964,7 @@ public class IndexManager {
     for (Entry<String, List<RowMutation>> entry : mutationsByShard.entrySet()) {
       final String shard = entry.getKey();
       final List<RowMutation> value = entry.getValue();
-      futures.add(_mutateExecutor.submit(new Callable<Void>() {
-        @Override
-        public Void call() throws Exception {
-          executeMutates(table, shard, indexes, value);
-          return null;
-        }
-      }));
+      futures.add(executeMutates(table, shard, indexes, value));
     }
 
     for (Future<Void> future : futures) {
@@ -984,42 +978,46 @@ public class IndexManager {
     }
   }
 
-  private void executeMutates(String table, String shard, Map<String, BlurIndex> indexes, List<RowMutation> mutations)
-      throws BlurException, IOException {
+  private Future<Void> executeMutates(String table, String shard, Map<String, BlurIndex> indexes,
+      List<RowMutation> mutations) throws BlurException, IOException {
     long s = System.nanoTime();
-    boolean waitToBeVisible = false;
-    for (int i = 0; i < mutations.size(); i++) {
-      RowMutation mutation = mutations.get(i);
-      if (mutation.waitToBeVisible) {
-        waitToBeVisible = true;
-      }
-      BlurIndex blurIndex = indexes.get(shard);
+    try {
+      final BlurIndex blurIndex = indexes.get(shard);
       if (blurIndex == null) {
         throw new BException("Shard [" + shard + "] in table [" + table + "] is not being served by this server.");
       }
-
-      boolean waitVisiblity = false;
-      if (i + 1 == mutations.size()) {
-        waitVisiblity = waitToBeVisible;
-      }
-      RowMutationType type = mutation.rowMutationType;
-      switch (type) {
-      case REPLACE_ROW:
-        Row row = MutationHelper.getRowFromMutations(mutation.rowId, mutation.recordMutations);
-        blurIndex.replaceRow(waitVisiblity, mutation.wal, updateMetrics(row));
-        break;
-      case UPDATE_ROW:
-        doUpdateRowMutation(mutation, blurIndex);
-        break;
-      case DELETE_ROW:
-        blurIndex.deleteRow(waitVisiblity, mutation.wal, mutation.rowId);
-        break;
-      default:
-        throw new RuntimeException("Not supported [" + type + "]");
+      ShardContext shardContext = blurIndex.getShardContext();
+      final MutatableAction mutatableAction = new MutatableAction(shardContext);
+      for (int i = 0; i < mutations.size(); i++) {
+        RowMutation mutation = mutations.get(i);
+        RowMutationType type = mutation.rowMutationType;
+        switch (type) {
+        case REPLACE_ROW:
+          Row row = MutationHelper.getRowFromMutations(mutation.rowId, mutation.recordMutations);
+          mutatableAction.replaceRow(updateMetrics(row));
+          break;
+        case UPDATE_ROW:
+          doUpdateRowMutation(mutation, mutatableAction);
+          break;
+        case DELETE_ROW:
+          mutatableAction.deleteRow(mutation.rowId);
+          break;
+        default:
+          throw new RuntimeException("Not supported [" + type + "]");
+        }
       }
+
+      return _mutateExecutor.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          blurIndex.process(mutatableAction);
+          return null;
+        }
+      });
+    } finally {
+      long e = System.nanoTime();
+      LOG.debug("executeMutates took [{0} ms] to complete", (e - s) / 1000000.0);
     }
-    long e = System.nanoTime();
-    LOG.debug("executeMutates took [" + (e - s) / 1000000.0 + " ms] to complete");
   }
 
   private Map<String, List<RowMutation>> getMutatesPerTable(List<RowMutation> mutations) {
@@ -1036,33 +1034,6 @@ public class IndexManager {
     return map;
   }
 
-  private void doMutate(RowMutation mutation) throws BlurException, IOException {
-    String table = mutation.table;
-    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
-    MutationHelper.validateMutation(mutation);
-    String shard = MutationHelper.getShardName(table, mutation.rowId, getNumberOfShards(table), _blurPartitioner);
-    BlurIndex blurIndex = indexes.get(shard);
-    if (blurIndex == null) {
-      throw new BException("Shard [" + shard + "] in table [" + table + "] is not being served by this server.");
-    }
-
-    RowMutationType type = mutation.rowMutationType;
-    switch (type) {
-    case REPLACE_ROW:
-      Row row = MutationHelper.getRowFromMutations(mutation.rowId, mutation.recordMutations);
-      blurIndex.replaceRow(mutation.waitToBeVisible, mutation.wal, updateMetrics(row));
-      break;
-    case UPDATE_ROW:
-      doUpdateRowMutation(mutation, blurIndex);
-      break;
-    case DELETE_ROW:
-      blurIndex.deleteRow(mutation.waitToBeVisible, mutation.wal, mutation.rowId);
-      break;
-    default:
-      throw new RuntimeException("Not supported [" + type + "]");
-    }
-  }
-
   private Row updateMetrics(Row row) {
     _writeRowMeter.mark();
     List<Record> records = row.getRecords();
@@ -1072,102 +1043,30 @@ public class IndexManager {
     return row;
   }
 
-  private void doUpdateRowMutation(RowMutation mutation, BlurIndex blurIndex) throws BlurException, IOException {
-    FetchResult fetchResult = new FetchResult();
-    Selector selector = new Selector();
-    selector.setRowId(mutation.rowId);
-    fetchRow(mutation.table, selector, fetchResult, true);
-    Row existingRow;
-    if (fetchResult.exists) {
-      // We will examine the contents of the existing row and add records
-      // onto a new replacement row based on the mutation we have been given.
-      existingRow = fetchResult.rowResult.row;
-    } else {
-      // The row does not exist, create empty new row.
-      existingRow = new Row().setId(mutation.getRowId());
-      existingRow.records = new ArrayList<Record>();
-    }
-    Row newRow = new Row().setId(existingRow.id);
-
-    // Create a local copy of the mutation we can modify
-    RowMutation mutationCopy = mutation.deepCopy();
-
-    // Match existing records against record mutations. Once a record
-    // mutation has been processed, remove it from our local copy.
-    for (Record existingRecord : existingRow.records) {
-      RecordMutation recordMutation = findRecordMutation(mutationCopy, existingRecord);
-      if (recordMutation != null) {
-        mutationCopy.recordMutations.remove(recordMutation);
-        doUpdateRecordMutation(recordMutation, existingRecord, newRow);
-      } else {
-        // Copy existing records over to the new row unmodified if there
-        // is no matching mutation.
-        newRow.addToRecords(existingRecord);
-      }
-    }
+  private void doUpdateRowMutation(RowMutation mutation, MutatableAction mutatableAction) throws BlurException,
+      IOException {
+    String rowId = mutation.getRowId();
 
-    // Examine all remaining record mutations. For any record replacements
-    // we need to create a new record in the table even though an existing
-    // record did not match. Record deletions are also ok here since the
-    // record is effectively already deleted. Other record mutations are
-    // an error and should generate an exception.
-    for (RecordMutation recordMutation : mutationCopy.recordMutations) {
+    for (RecordMutation recordMutation : mutation.getRecordMutations()) {
       RecordMutationType type = recordMutation.recordMutationType;
+      Record record = recordMutation.getRecord();
       switch (type) {
       case DELETE_ENTIRE_RECORD:
-        // do nothing as missing record is already in desired state
+        mutatableAction.deleteRecord(rowId, record.getRecordId());
         break;
       case APPEND_COLUMN_VALUES:
+        mutatableAction.appendColumns(rowId, record);
+        break;
       case REPLACE_ENTIRE_RECORD:
+        mutatableAction.replaceRecord(rowId, record);
+        break;
       case REPLACE_COLUMNS:
-        // If record do not exist, create new record in Row
-        newRow.addToRecords(recordMutation.record);
+        mutatableAction.replaceColumns(rowId, record);
         break;
       default:
         throw new RuntimeException("Unsupported record mutation type [" + type + "]");
       }
     }
-
-    // Finally, replace the existing row with the new row we have built.
-    blurIndex.replaceRow(mutation.waitToBeVisible, mutation.wal, updateMetrics(newRow));
-
-  }
-
-  private static void doUpdateRecordMutation(RecordMutation recordMutation, Record existingRecord, Row newRow) {
-    Record mutationRecord = recordMutation.record;
-    switch (recordMutation.recordMutationType) {
-    case DELETE_ENTIRE_RECORD:
-      return;
-    case APPEND_COLUMN_VALUES:
-      for (Column column : mutationRecord.columns) {
-        if (column.getValue() == null) {
-          continue;
-        }
-        existingRecord.addToColumns(column);
-      }
-      newRow.addToRecords(existingRecord);
-      break;
-    case REPLACE_ENTIRE_RECORD:
-      newRow.addToRecords(mutationRecord);
-      break;
-    case REPLACE_COLUMNS:
-      Set<String> removeColumnNames = new HashSet<String>();
-      for (Column column : mutationRecord.getColumns()) {
-        removeColumnNames.add(column.getName());
-      }
-
-      for (Column column : existingRecord.getColumns()) {
-        // skip columns in existing record that are contained in the mutation
-        // record
-        if (!removeColumnNames.contains(column.getName())) {
-          mutationRecord.addToColumns(column);
-        }
-      }
-      newRow.addToRecords(mutationRecord);
-      break;
-    default:
-      break;
-    }
   }
 
   private int getNumberOfShards(String table) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/813ccd7d/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
index e0c6b38..f851875 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
@@ -36,7 +36,8 @@ import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
 import org.apache.blur.manager.writer.BlurIndex;
-import org.apache.blur.manager.writer.BlurNRTIndex;
+import org.apache.blur.manager.writer.BlurIndexCloser;
+import org.apache.blur.manager.writer.BlurIndexSimpleWriter;
 import org.apache.blur.manager.writer.SharedMergeScheduler;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
@@ -68,6 +69,7 @@ public class LocalIndexServer extends AbstractIndexServer {
   private final Closer _closer;
   private final boolean _ramDir;
   private final BlurIndexWarmup _indexWarmup;
+  private final BlurIndexCloser _indexCloser;
 
   public LocalIndexServer(TableDescriptor tableDescriptor) throws IOException {
     this(tableDescriptor, false);
@@ -81,6 +83,7 @@ public class LocalIndexServer extends AbstractIndexServer {
     _searchExecutor = Executors.newCachedThreadPool();
     _closer.register(new CloseableExecutorService(_searchExecutor));
     _ramDir = ramDir;
+    _indexCloser = _closer.register(new BlurIndexCloser());
     getIndexes(_tableContext.getTable());
     _indexWarmup = BlurIndexWarmup.getIndexWarmup(_tableContext.getBlurConfiguration());
   }
@@ -158,8 +161,8 @@ public class LocalIndexServer extends AbstractIndexServer {
 
   private BlurIndex openIndex(String table, String shard, Directory dir) throws CorruptIndexException, IOException {
     ShardContext shardContext = ShardContext.create(_tableContext, shard);
-    BlurNRTIndex index = new BlurNRTIndex(shardContext, dir, _mergeScheduler, _gc, _searchExecutor, null, null,
-        _indexWarmup);
+    BlurIndexSimpleWriter index = new BlurIndexSimpleWriter(shardContext, dir, _mergeScheduler, _gc, _searchExecutor,
+        _indexCloser, null, _indexWarmup);
     return index;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/813ccd7d/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
index f70401d..8e52dd3 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
@@ -40,11 +40,12 @@ public abstract class BlurIndex {
   private static final long ONE_MINUTE = TimeUnit.MINUTES.toMillis(1);
   private long _lastMemoryCheck = 0;
   private long _memoryUsage = 0;
+  protected ShardContext _shardContext;
 
   public BlurIndex(ShardContext shardContext, Directory directory, SharedMergeScheduler mergeScheduler,
       DirectoryReferenceFileGC gc, ExecutorService searchExecutor, BlurIndexCloser indexCloser,
       BlurIndexRefresher refresher, BlurIndexWarmup indexWarmup) throws IOException {
-
+    _shardContext = shardContext;
   }
 
   public abstract void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException;
@@ -133,4 +134,10 @@ public abstract class BlurIndex {
     }
   }
 
+  public ShardContext getShardContext() {
+    return _shardContext;
+  }
+
+  public abstract void process(MutatableAction mutatableAction) throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/813ccd7d/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
index 3009a87..06d3979 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
@@ -72,4 +72,9 @@ public class BlurIndexReadOnly extends BlurIndex {
     return _blurIndex.getSnapshots();
   }
 
+  @Override
+  public void process(MutatableAction mutatableAction) {
+    throw new RuntimeException("Read-only shard");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/813ccd7d/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
index 4071777..c852e75 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
@@ -153,4 +153,9 @@ public class BlurIndexReader extends BlurIndex {
   public List<String> getSnapshots() throws IOException {
     throw new RuntimeException("Read-only shard");
   }
+
+  @Override
+  public void process(MutatableAction mutatableAction) {
+    throw new RuntimeException("Read-only shard");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/813ccd7d/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index 7620108..d56df25 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -279,4 +279,17 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     trace3.done();
   }
 
+  @Override
+  public void process(MutatableAction mutatableAction) throws IOException {
+    _writeLock.lock();
+    try {
+      waitUntilNotNull(_writer);
+      BlurIndexWriter writer = _writer.get();
+      mutatableAction.performMutate(_indexReader.get(), writer);
+      commit();
+    } finally {
+      _writeLock.unlock();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/813ccd7d/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
index e1192b1..45215cf 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -440,4 +440,9 @@ public class BlurNRTIndex extends BlurIndex {
     Configuration configuration = _shardContext.getTableContext().getConfiguration();
     return shardHdfsDirPath.getFileSystem(configuration);
   }
+
+  @Override
+  public void process(MutatableAction mutatableAction) {
+    throw new RuntimeException("Not supported.");
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/813ccd7d/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
new file mode 100644
index 0000000..ccb1fa6
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.writer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.blur.analysis.FieldManager;
+import org.apache.blur.manager.IndexManager;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurThriftRecord;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.utils.ResetableDocumentStoredFieldVisitor;
+import org.apache.blur.utils.RowDocumentUtil;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.BooleanClause.Occur;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+
+public class MutatableAction {
+
+  static abstract class InternalAction {
+    abstract void performAction(IndexReader reader, IndexWriter writer) throws IOException;
+  }
+
+  private final List<InternalAction> _actions = new ArrayList<InternalAction>();
+  private final FieldManager _fieldManager;
+  private final String _shard;
+  private final String _table;
+  private final Term _primeDocTerm;
+  private final int _maxHeap = Integer.MAX_VALUE;
+
+  public MutatableAction(ShardContext context) {
+    TableContext tableContext = context.getTableContext();
+    _shard = context.getShard();
+    _table = tableContext.getTable();
+    _fieldManager = tableContext.getFieldManager();
+    _primeDocTerm = tableContext.getDefaultPrimeDocTerm();
+  }
+
+  public void deleteRow(final String rowId) {
+    _actions.add(new InternalAction() {
+      @Override
+      void performAction(IndexReader reader, IndexWriter writer) throws IOException {
+        writer.deleteDocuments(createRowId(rowId));
+      }
+    });
+  }
+
+  public void replaceRow(final Row row) {
+    _actions.add(new InternalAction() {
+      @Override
+      void performAction(IndexReader reader, IndexWriter writer) throws IOException {
+        List<List<Field>> docs = TransactionRecorder.getDocs(row, _fieldManager);
+        Term rowId = createRowId(row.getId());
+        writer.updateDocuments(rowId, docs);
+      }
+    });
+  }
+
+  public void deleteRecord(final String rowId, final String recordId) {
+    _actions.add(new InternalAction() {
+      @Override
+      void performAction(IndexReader reader, IndexWriter writer) throws IOException {
+        Term rowIdTerm = createRowId(rowId);
+        BooleanQuery query = new BooleanQuery();
+        query.add(new TermQuery(rowIdTerm), Occur.MUST);
+        query.add(new TermQuery(BlurUtil.PRIME_DOC_TERM), Occur.MUST);
+
+        IndexSearcher searcher = new IndexSearcher(reader);
+        TopDocs topDocs = searcher.search(query, 1);
+        if (topDocs.totalHits == 0) {
+          // do nothing
+        } else if (topDocs.totalHits == 1) {
+          Selector selector = new Selector();
+          selector.setStartRecord(0);
+          selector.setMaxRecordsToFetch(Integer.MAX_VALUE);
+          selector.setLocationId(_shard + "/" + topDocs.scoreDocs[0].doc);
+          ResetableDocumentStoredFieldVisitor fieldVisitor = IndexManager.getFieldSelector(selector);
+          List<Document> docs = new ArrayList<Document>(BlurUtil.fetchDocuments(reader, fieldVisitor, selector,
+              _maxHeap, _table + "/" + _shard, _primeDocTerm, null));
+
+          boolean found = false;
+          for (int i = 0; i < docs.size(); i++) {
+            Document document = docs.get(i);
+            if (document.get(BlurConstants.RECORD_ID).equals(recordId)) {
+              docs.remove(i);
+              found = true;
+              break;
+            }
+          }
+          if (found) {
+            if (docs.size() == 0) {
+              writer.deleteDocuments(rowIdTerm);
+            } else {
+              Row row = new Row(rowId, toRecords(docs), docs.size());
+              List<List<Field>> docsToUpdate = TransactionRecorder.getDocs(row, _fieldManager);
+              writer.updateDocuments(rowIdTerm, docsToUpdate);
+            }
+          }
+        } else {
+          throw new IOException("RowId [" + rowId + "] found more than one row primedoc.");
+        }
+      }
+    });
+  }
+
+  public void replaceRecord(final String rowId, final Record record) {
+    _actions.add(new InternalAction() {
+      @Override
+      void performAction(IndexReader reader, IndexWriter writer) throws IOException {
+        Term rowIdTerm = createRowId(rowId);
+        BooleanQuery query = new BooleanQuery();
+        query.add(new TermQuery(rowIdTerm), Occur.MUST);
+        query.add(new TermQuery(BlurUtil.PRIME_DOC_TERM), Occur.MUST);
+
+        IndexSearcher searcher = new IndexSearcher(reader);
+        TopDocs topDocs = searcher.search(query, 1);
+        if (topDocs.totalHits == 0) {
+          // just add
+          List<Field> doc = TransactionRecorder.getDoc(_fieldManager, rowId, record);
+          doc.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+          writer.addDocument(doc);
+        } else if (topDocs.totalHits == 1) {
+          Selector selector = new Selector();
+          selector.setStartRecord(0);
+          selector.setMaxRecordsToFetch(Integer.MAX_VALUE);
+          selector.setLocationId(_shard + "/" + topDocs.scoreDocs[0].doc);
+          ResetableDocumentStoredFieldVisitor fieldVisitor = IndexManager.getFieldSelector(selector);
+          List<Document> docs = new ArrayList<Document>(BlurUtil.fetchDocuments(reader, fieldVisitor, selector,
+              _maxHeap, _table + "/" + _shard, _primeDocTerm, null));
+          List<Field> doc = TransactionRecorder.getDoc(_fieldManager, rowId, record);
+
+          for (int i = 0; i < docs.size(); i++) {
+            Document document = docs.get(i);
+            if (document.get(BlurConstants.RECORD_ID).equals(record.getRecordId())) {
+              docs.remove(i);
+              break;
+            }
+          }
+          docs.add(toDocument(doc));
+          Row row = new Row(rowId, toRecords(docs), docs.size());
+          List<List<Field>> docsToUpdate = TransactionRecorder.getDocs(row, _fieldManager);
+          writer.updateDocuments(rowIdTerm, docsToUpdate);
+        } else {
+          throw new IOException("RowId [" + rowId + "] found more than one row primedoc.");
+        }
+      }
+    });
+  }
+
+  private List<Record> toRecords(List<Document> docs) {
+    List<Record> records = new ArrayList<Record>();
+    for (Document document : docs) {
+      BlurThriftRecord existingRecord = new BlurThriftRecord();
+      RowDocumentUtil.readRecord(document, existingRecord);
+      records.add(existingRecord);
+    }
+    return records;
+  }
+
+  public void appendColumns(final String rowId, final Record record) {
+    _actions.add(new InternalAction() {
+      @Override
+      void performAction(IndexReader reader, IndexWriter writer) throws IOException {
+        Term rowIdTerm = createRowId(rowId);
+        BooleanQuery query = new BooleanQuery();
+        query.add(new TermQuery(rowIdTerm), Occur.MUST);
+        query.add(new TermQuery(BlurUtil.PRIME_DOC_TERM), Occur.MUST);
+
+        IndexSearcher searcher = new IndexSearcher(reader);
+        TopDocs topDocs = searcher.search(query, 1);
+        if (topDocs.totalHits == 0) {
+          // just add
+          List<Field> doc = TransactionRecorder.getDoc(_fieldManager, rowId, record);
+          doc.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+          writer.addDocument(doc);
+        } else if (topDocs.totalHits == 1) {
+          Selector selector = new Selector();
+          selector.setStartRecord(0);
+          selector.setMaxRecordsToFetch(Integer.MAX_VALUE);
+          selector.setLocationId(_shard + "/" + topDocs.scoreDocs[0].doc);
+          ResetableDocumentStoredFieldVisitor fieldVisitor = IndexManager.getFieldSelector(selector);
+          List<Document> docs = new ArrayList<Document>(BlurUtil.fetchDocuments(reader, fieldVisitor, selector,
+              _maxHeap, _table + "/" + _shard, _primeDocTerm, null));
+          BlurThriftRecord existingRecord = new BlurThriftRecord();
+          for (int i = 0; i < docs.size(); i++) {
+            Document document = docs.get(i);
+            if (document.get(BlurConstants.RECORD_ID).equals(record.getRecordId())) {
+              Document doc = docs.remove(i);
+              RowDocumentUtil.readRecord(doc, existingRecord);
+              break;
+            }
+          }
+
+          String recordId = existingRecord.getRecordId();
+          if (recordId == null) {
+            existingRecord.setRecordId(record.getRecordId());
+          } else if (!recordId.equals(record.getRecordId())) {
+            throw new IOException("Record ids do not match.");
+          }
+
+          String family = existingRecord.getFamily();
+          if (family == null) {
+            existingRecord.setFamily(record.getFamily());
+          } else if (!family.equals(record.getFamily())) {
+            throw new IOException("Record family do not match.");
+          }
+
+          for (Column column : record.getColumns()) {
+            existingRecord.addToColumns(column);
+          }
+
+          List<Field> doc = TransactionRecorder.getDoc(_fieldManager, rowId, existingRecord);
+          docs.add(toDocument(doc));
+
+          Row row = new Row(rowId, toRecords(docs), docs.size());
+          List<List<Field>> docsToUpdate = TransactionRecorder.getDocs(row, _fieldManager);
+
+          writer.updateDocuments(rowIdTerm, docsToUpdate);
+        } else {
+          throw new IOException("RowId [" + rowId + "] found more than one row primedoc.");
+        }
+      }
+    });
+  }
+
+  public void replaceColumns(final String rowId, final Record record) {
+    _actions.add(new InternalAction() {
+      @Override
+      void performAction(IndexReader reader, IndexWriter writer) throws IOException {
+        Term rowIdTerm = createRowId(rowId);
+        BooleanQuery query = new BooleanQuery();
+        query.add(new TermQuery(rowIdTerm), Occur.MUST);
+        query.add(new TermQuery(BlurUtil.PRIME_DOC_TERM), Occur.MUST);
+
+        IndexSearcher searcher = new IndexSearcher(reader);
+        TopDocs topDocs = searcher.search(query, 1);
+        if (topDocs.totalHits == 0) {
+          // just add
+          List<Field> doc = TransactionRecorder.getDoc(_fieldManager, rowId, record);
+          doc.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+          writer.addDocument(doc);
+        } else if (topDocs.totalHits == 1) {
+          Selector selector = new Selector();
+          selector.setStartRecord(0);
+          selector.setMaxRecordsToFetch(Integer.MAX_VALUE);
+          selector.setLocationId(_shard + "/" + topDocs.scoreDocs[0].doc);
+          ResetableDocumentStoredFieldVisitor fieldVisitor = IndexManager.getFieldSelector(selector);
+          List<Document> docs = new ArrayList<Document>(BlurUtil.fetchDocuments(reader, fieldVisitor, selector,
+              _maxHeap, _table + "/" + _shard, _primeDocTerm, null));
+          BlurThriftRecord existingRecord = new BlurThriftRecord();
+          for (int i = 0; i < docs.size(); i++) {
+            Document document = docs.get(i);
+            if (document.get(BlurConstants.RECORD_ID).equals(record.getRecordId())) {
+              Document doc = docs.remove(i);
+              RowDocumentUtil.readRecord(doc, existingRecord);
+              break;
+            }
+          }
+
+          Map<String, List<Column>> map = new HashMap<String, List<Column>>();
+          for (Column column : record.getColumns()) {
+            String name = column.getName();
+            List<Column> list = map.get(name);
+            if (list == null) {
+              list = new ArrayList<Column>();
+              map.put(name, list);
+            }
+            list.add(column);
+          }
+
+          Record newRecord = new Record(record.getRecordId(), record.getFamily(), null);
+          Set<String> processedColumns = new HashSet<String>();
+          List<Column> columns = existingRecord.getColumns();
+          if (columns != null) {
+            for (Column column : columns) {
+              String name = column.getName();
+              if (processedColumns.contains(name)) {
+                continue;
+              }
+              List<Column> newColumns = map.get(name);
+              if (newColumns != null) {
+                for (Column c : newColumns) {
+                  newRecord.addToColumns(c);
+                }
+                processedColumns.add(name);
+              } else {
+                newRecord.addToColumns(column);
+              }
+            }
+          }
+
+          for (Entry<String, List<Column>> e : map.entrySet()) {
+            String name = e.getKey();
+            if (processedColumns.contains(name)) {
+              continue;
+            }
+            List<Column> newColumns = e.getValue();
+            for (Column c : newColumns) {
+              newRecord.addToColumns(c);
+            }
+            processedColumns.add(name);
+          }
+
+          List<Field> doc = TransactionRecorder.getDoc(_fieldManager, rowId, newRecord);
+          docs.add(toDocument(doc));
+
+          Row row = new Row(rowId, toRecords(docs), docs.size());
+          List<List<Field>> docsToUpdate = TransactionRecorder.getDocs(row, _fieldManager);
+          writer.updateDocuments(rowIdTerm, docsToUpdate);
+        } else {
+          throw new IOException("RowId [" + rowId + "] found more than one row primedoc.");
+        }
+      }
+    });
+  }
+
+  private Document toDocument(List<Field> doc) {
+    Document document = new Document();
+    for (Field f : doc) {
+      document.add(f);
+    }
+    return document;
+  }
+
+  void performMutate(IndexReader reader, IndexWriter writer) throws IOException {
+    for (InternalAction internalAction : _actions) {
+      internalAction.performAction(reader, writer);
+    }
+    _actions.clear();
+  }
+
+  public static Term createRowId(String id) {
+    return new Term(BlurConstants.ROW_ID, id);
+  }
+
+  public static Term createRecordId(String id) {
+    return new Term(BlurConstants.RECORD_ID, id);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/813ccd7d/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
index beebdf6..b44a26b 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
@@ -294,7 +294,7 @@ public class IndexManagerTest {
       }
     });
 
-    for (int i = 0; i < 1000; i++) {
+    for (int i = 0; i < 25; i++) {
       Thread thread = new Thread(new Runnable() {
         @Override
         public void run() {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/813ccd7d/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
index 686f213..37eece8 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
@@ -169,8 +169,9 @@ public class BlurIndexSimpleWriterTest {
     Trace.setStorage(oldStorage);
   }
 
-  @Test
+//  @Test
   public void testBlurIndexWriterFaster() throws IOException, InterruptedException {
+    // This test doesn't make any sense anymore, because it's no different than the first test.
     setupWriter(configuration, 100, false);
     IndexSearcherClosable searcher1 = writer.getIndexSearcher();
     IndexReader reader1 = searcher1.getIndexReader();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/813ccd7d/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
index c5cf3a8..4772420 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
@@ -36,6 +36,7 @@ import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.blur.thrift.generated.Column;
 import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,6 +44,7 @@ import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.store.Directory;
 import org.junit.After;
 import org.junit.Before;
@@ -52,15 +54,15 @@ public class IndexImporterTest {
 
   private static final Path TMPDIR = new Path("target/tmp");
 
-  private Path base;
+  private Path _base;
   private Configuration configuration;
-  private IndexWriter commitWriter;
-  private IndexImporter indexImporter;
-  private Random random = new Random();
-  private Path path;
-  private Path badRowIdsPath;
-  private IndexWriter mainWriter;
-  private FileSystem fileSystem;
+  private IndexWriter _commitWriter;
+  private IndexImporter _indexImporter;
+  private Random _random = new Random();
+  private Path _path;
+  private Path _badRowIdsPath;
+  private IndexWriter _mainWriter;
+  private FileSystem _fileSystem;
 
   private FieldManager _fieldManager;
 
@@ -68,10 +70,10 @@ public class IndexImporterTest {
   public void setup() throws IOException {
     TableContext.clear();
     configuration = new Configuration();
-    base = new Path(TMPDIR, "blur-index-importer-test");
-    fileSystem = base.getFileSystem(configuration);
-    fileSystem.delete(base, true);
-    fileSystem.mkdirs(base);
+    _base = new Path(TMPDIR, "blur-index-importer-test");
+    _fileSystem = _base.getFileSystem(configuration);
+    _fileSystem.delete(_base, true);
+    _fileSystem.mkdirs(_base);
     setupWriter(configuration);
   }
 
@@ -80,71 +82,59 @@ public class IndexImporterTest {
     tableDescriptor.setName("test-table");
     String uuid = UUID.randomUUID().toString();
 
-    tableDescriptor.setTableUri(new Path(base, "table-table").toUri().toString());
+    tableDescriptor.setTableUri(new Path(_base, "table-table").toUri().toString());
     tableDescriptor.setShardCount(2);
 
     TableContext tableContext = TableContext.create(tableDescriptor);
     ShardContext shardContext = ShardContext.create(tableContext, "shard-00000000");
-    Path tablePath = new Path(base, "table-table");
+    Path tablePath = new Path(_base, "table-table");
     Path shardPath = new Path(tablePath, "shard-00000000");
     String indexDirName = "index_" + uuid;
-    path = new Path(shardPath, indexDirName + ".commit");
-    fileSystem.mkdirs(path);
-    badRowIdsPath = new Path(shardPath, indexDirName + ".bad_rowids");
-    Directory commitDirectory = new HdfsDirectory(configuration, path);
+    _path = new Path(shardPath, indexDirName + ".commit");
+    _fileSystem.mkdirs(_path);
+    _badRowIdsPath = new Path(shardPath, indexDirName + ".bad_rowids");
+    Directory commitDirectory = new HdfsDirectory(configuration, _path);
     Directory mainDirectory = new HdfsDirectory(configuration, shardPath);
     _fieldManager = tableContext.getFieldManager();
     Analyzer analyzerForIndex = _fieldManager.getAnalyzerForIndex();
     IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, analyzerForIndex);
-    commitWriter = new IndexWriter(commitDirectory, conf);
+    conf.setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES);
+    _commitWriter = new IndexWriter(commitDirectory, conf.clone());
 
-    mainWriter = new IndexWriter(mainDirectory, conf);
+    _mainWriter = new IndexWriter(mainDirectory, conf.clone());
     BufferStore.initNewBuffer(128, 128 * 128);
 
-    indexImporter = new IndexImporter(mainWriter, new ReentrantReadWriteLock(), shardContext, TimeUnit.MINUTES, 10);
+    _indexImporter = new IndexImporter(_mainWriter, new ReentrantReadWriteLock(), shardContext, TimeUnit.MINUTES, 10);
   }
 
   @After
   public void tearDown() throws IOException {
-    mainWriter.close();
-    indexImporter.close();
-    base.getFileSystem(configuration).delete(base, true);
+    IOUtils.closeQuietly(_mainWriter);
+    IOUtils.closeQuietly(_indexImporter);
+    _base.getFileSystem(configuration).delete(_base, true);
   }
 
   @Test
   public void testIndexImporterWithCorrectRowIdShardCombination() throws IOException {
     List<Field> document = _fieldManager.getFields("1", genRecord("1"));
-    commitWriter.addDocument(document);
-    commitWriter.commit();
-    commitWriter.close();
-    indexImporter.run();
-    assertFalse(fileSystem.exists(path));
-    assertFalse(fileSystem.exists(badRowIdsPath));
+    _commitWriter.addDocument(document);
+    _commitWriter.commit();
+    _commitWriter.close();
+    _indexImporter.run();
+    assertFalse(_fileSystem.exists(_path));
+    assertFalse(_fileSystem.exists(_badRowIdsPath));
   }
 
-  // private void debug(Path file) throws IOException {
-  // if (!fileSystem.exists(file)) {
-  // return;
-  // }
-  // System.out.println(file);
-  // if (!fileSystem.isFile(file)) {
-  // FileStatus[] listStatus = fileSystem.listStatus(file);
-  // for (FileStatus f : listStatus) {
-  // debug(f.getPath());
-  // }
-  // }
-  // }
-
   @Test
   public void testIndexImporterWithWrongRowIdShardCombination() throws IOException {
     setupWriter(configuration);
     List<Field> document = _fieldManager.getFields("2", genRecord("1"));
-    commitWriter.addDocument(document);
-    commitWriter.commit();
-    commitWriter.close();
-    indexImporter.run();
-    assertFalse(fileSystem.exists(path));
-    assertTrue(fileSystem.exists(badRowIdsPath));
+    _commitWriter.addDocument(document);
+    _commitWriter.commit();
+    _commitWriter.close();
+    _indexImporter.run();
+    assertFalse(_fileSystem.exists(_path));
+    assertTrue(_fileSystem.exists(_badRowIdsPath));
   }
 
   private Record genRecord(String recordId) {
@@ -152,7 +142,7 @@ public class IndexImporterTest {
     record.setFamily("testing");
     record.setRecordId(recordId);
     for (int i = 0; i < 10; i++) {
-      record.addToColumns(new Column("col" + i, Long.toString(random.nextLong())));
+      record.addToColumns(new Column("col" + i, Long.toString(_random.nextLong())));
     }
     return record;
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/813ccd7d/blur-core/src/test/java/org/apache/blur/manager/writer/MutatableActionTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/MutatableActionTest.java b/blur-core/src/test/java/org/apache/blur/manager/writer/MutatableActionTest.java
new file mode 100644
index 0000000..90ad9e5
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/MutatableActionTest.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.writer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.lucene.analysis.core.KeywordAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.util.Version;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MutatableActionTest {
+
+  private static final File TMPDIR = new File("./target/tmp");
+  private static final String TABLE = "test";
+
+  private MutatableAction _action;
+  private final Random _random = new Random();
+  private IndexWriterConfig _conf;
+  private File _base;
+
+  @Before
+  public void setup() throws IOException {
+    TableContext.clear();
+    _base = new File(TMPDIR, "MutatableActionTest");
+    rmr(_base);
+
+    File file = new File(_base, TABLE);
+    file.mkdirs();
+
+    TableContext.clear();
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setName("test");
+    tableDescriptor.setTableUri(file.toURI().toString());
+    TableContext tableContext = TableContext.create(tableDescriptor);
+    ShardContext shardContext = ShardContext.create(tableContext, "test");
+    _action = new MutatableAction(shardContext);
+    _conf = new IndexWriterConfig(Version.LUCENE_43, new KeywordAnalyzer());
+  }
+
+  private void rmr(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rmr(f);
+      }
+    }
+    file.delete();
+  }
+
+  @Test
+  public void testReplaceRow() throws IOException {
+    RAMDirectory directory = new RAMDirectory();
+    DirectoryReader reader = getIndexReader(directory);
+    IndexWriter writer = new IndexWriter(directory, _conf.clone());
+    assertEquals(0, reader.numDocs());
+
+    Row row = genRow();
+    _action.replaceRow(row);
+    _action.performMutate(reader, writer);
+    reader = commitAndReopen(reader, writer);
+    assertEquals(1, reader.numDocs());
+
+    Row row2 = new Row(row);
+    List<Column> cols = new ArrayList<Column>();
+    cols.add(new Column("n", "v"));
+    row2.addToRecords(new Record("1", "fam", cols));
+
+    _action.replaceRow(row2);
+    _action.performMutate(reader, writer);
+    reader = commitAndReopen(reader, writer);
+    assertEquals(2, reader.numDocs());
+  }
+
+  @Test
+  public void testDeleteRecord() throws IOException {
+    RAMDirectory directory = new RAMDirectory();
+    DirectoryReader reader = getIndexReader(directory);
+    IndexWriter writer = new IndexWriter(directory, _conf.clone());
+    assertEquals(0, reader.numDocs());
+
+    Row row = genRow();
+    List<Column> cols = new ArrayList<Column>();
+    cols.add(new Column("n", "v"));
+    row.addToRecords(new Record("1", "fam", cols));
+
+    _action.replaceRow(row);
+    _action.performMutate(reader, writer);
+    reader = commitAndReopen(reader, writer);
+    assertEquals(2, reader.numDocs());
+
+    _action.deleteRecord(row.getId(), "1");
+    _action.performMutate(reader, writer);
+    reader = commitAndReopen(reader, writer);
+    assertEquals(1, reader.numDocs());
+  }
+
+  @Test
+  public void testDeleteRow() throws IOException {
+    RAMDirectory directory = new RAMDirectory();
+    DirectoryReader reader = getIndexReader(directory);
+    IndexWriter writer = new IndexWriter(directory, _conf.clone());
+    assertEquals(0, reader.numDocs());
+
+    Row row = genRow();
+    _action.replaceRow(row);
+    _action.performMutate(reader, writer);
+    reader = commitAndReopen(reader, writer);
+    assertEquals(1, reader.numDocs());
+
+    _action.deleteRow(row.getId());
+    _action.performMutate(reader, writer);
+    reader = commitAndReopen(reader, writer);
+    assertEquals(0, reader.numDocs());
+  }
+
+  @Test
+  public void testReplaceRecord() throws IOException {
+    RAMDirectory directory = new RAMDirectory();
+    DirectoryReader reader = getIndexReader(directory);
+    IndexWriter writer = new IndexWriter(directory, _conf.clone());
+    assertEquals(0, reader.numDocs());
+
+    Row row = genRow();
+    List<Column> cols = new ArrayList<Column>();
+    cols.add(new Column("n", "v"));
+    row.addToRecords(new Record("1", "fam", cols));
+
+    _action.replaceRow(row);
+    _action.performMutate(reader, writer);
+    reader = commitAndReopen(reader, writer);
+    assertEquals(2, reader.numDocs());
+
+    cols.add(new Column("n2", "v2"));
+    Record record = new Record("1", "fam", cols);
+    _action.replaceRecord(row.getId(), record);
+    _action.performMutate(reader, writer);
+    reader = commitAndReopen(reader, writer);
+    assertEquals(2, reader.numDocs());
+
+    IndexSearcher searcher = new IndexSearcher(reader);
+    TopDocs topDocs = searcher.search(new TermQuery(new Term(BlurConstants.ROW_ID, row.getId())), 10);
+    Document doc2 = searcher.doc(topDocs.scoreDocs[1].doc);
+    List<IndexableField> fields = doc2.getFields();
+    assertEquals(fields.size(), 5);
+    String value = doc2.get("fam.n2");
+    assertEquals("v2", value);
+  }
+
+  @Test
+  public void testAppendColumns() throws IOException {
+    RAMDirectory directory = new RAMDirectory();
+    DirectoryReader reader = getIndexReader(directory);
+    IndexWriter writer = new IndexWriter(directory, _conf.clone());
+    assertEquals(0, reader.numDocs());
+
+    Row row = genRow();
+    List<Column> cols = new ArrayList<Column>();
+    cols.add(new Column("n", "v"));
+    row.addToRecords(new Record("1", "fam", cols));
+
+    _action.replaceRow(row);
+    _action.performMutate(reader, writer);
+    reader = commitAndReopen(reader, writer);
+    assertEquals(2, reader.numDocs());
+
+    cols.clear();
+    cols.add(new Column("n2", "v2"));
+    Record record = new Record("1", "fam", cols);
+    _action.appendColumns(row.getId(), record);
+    _action.performMutate(reader, writer);
+    reader = commitAndReopen(reader, writer);
+    assertEquals(2, reader.numDocs());
+
+    IndexSearcher searcher = new IndexSearcher(reader);
+    TopDocs topDocs = searcher.search(new TermQuery(new Term(BlurConstants.ROW_ID, row.getId())), 10);
+    Document doc2 = searcher.doc(topDocs.scoreDocs[1].doc);
+    List<IndexableField> fields = doc2.getFields();
+    assertEquals(fields.size(), 5);
+    String value = doc2.get("fam.n2");
+    assertEquals("v2", value);
+  }
+
+  @Test
+  public void testReplaceColumns() throws IOException {
+    RAMDirectory directory = new RAMDirectory();
+    DirectoryReader reader = getIndexReader(directory);
+    IndexWriter writer = new IndexWriter(directory, _conf.clone());
+    assertEquals(0, reader.numDocs());
+
+    Row row = genRow();
+    List<Column> cols = new ArrayList<Column>();
+    cols.add(new Column("n", "v"));
+    cols.add(new Column("n1", "v1"));
+    row.addToRecords(new Record("1", "fam", cols));
+
+    _action.replaceRow(row);
+    _action.performMutate(reader, writer);
+    reader = commitAndReopen(reader, writer);
+    assertEquals(2, reader.numDocs());
+
+    cols.clear();
+    cols.add(new Column("n1", "v2"));
+    Record record = new Record("1", "fam", cols);
+    _action.replaceColumns(row.getId(), record);
+    _action.performMutate(reader, writer);
+    reader = commitAndReopen(reader, writer);
+    assertEquals(2, reader.numDocs());
+
+    IndexSearcher searcher = new IndexSearcher(reader);
+    TopDocs topDocs = searcher.search(new TermQuery(new Term(BlurConstants.ROW_ID, row.getId())), 10);
+    Document doc2 = searcher.doc(topDocs.scoreDocs[1].doc);
+    List<IndexableField> fields = doc2.getFields();
+    assertEquals(5, fields.size());
+    String value = doc2.get("fam.n1");
+    assertEquals("v2", value);
+  }
+
+  private DirectoryReader commitAndReopen(DirectoryReader reader, IndexWriter writer) throws IOException {
+    writer.commit();
+    DirectoryReader newReader = DirectoryReader.openIfChanged(reader);
+    if (newReader == null) {
+      throw new IOException("Should have new data.");
+    }
+    reader.close();
+    return newReader;
+  }
+
+  private DirectoryReader getIndexReader(RAMDirectory directory) throws IOException {
+    if (!DirectoryReader.indexExists(directory)) {
+      new IndexWriter(directory, _conf.clone()).close();
+    }
+    return DirectoryReader.open(directory);
+  }
+
+  private Row genRow() {
+    Row row = new Row();
+    row.setId(Long.toString(_random.nextLong()));
+    Record record = new Record();
+    record.setFamily("testing");
+    record.setRecordId(Long.toString(_random.nextLong()));
+    for (int i = 0; i < 10; i++) {
+      record.addToColumns(new Column("col" + i, Long.toString(_random.nextLong())));
+    }
+    row.addToRecords(record);
+    return row;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/813ccd7d/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index cb22b3b..e4607f1 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -349,6 +349,14 @@ public class HdfsDirectory extends Directory implements LastModified {
     if (ifSameCluster(simpleTo, this)) {
       Path newDest = simpleTo.getPath(dest);
       Path oldSrc = getPath(src);
+      if (_useCache) {
+        simpleTo._fileMap.put(dest, _fileMap.get(src));
+        _fileMap.remove(src);
+        FSDataInputStream inputStream = _inputMap.get(src);
+        if (inputStream != null) {
+          inputStream.close();
+        }
+      }
       return _fileSystem.rename(oldSrc, newDest);
     }
     return false;


Mime
View raw message