incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: Fixed BLUR-328
Date Thu, 16 Jan 2014 14:42:47 GMT
Fixed BLUR-328


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/a0fd157b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/a0fd157b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/a0fd157b

Branch: refs/heads/master
Commit: a0fd157b639359ea019ec2b0e2ffb1f696537c4e
Parents: 98980fe
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Jan 16 09:41:42 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Jan 16 09:42:39 2014 -0500

----------------------------------------------------------------------
 .../org/apache/blur/manager/IndexManager.java   | 116 +++-
 .../manager/writer/BlurIndexSimpleWriter.java   |  12 +-
 .../blur/manager/writer/MutatableAction.java    | 534 ++++++++-----------
 .../apache/blur/manager/IndexManagerTest.java   |  32 ++
 4 files changed, 388 insertions(+), 306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a0fd157b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index f8a23cf..4ca8cf1 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -130,7 +131,7 @@ import com.yammer.metrics.core.TimerContext;
 
 public class IndexManager {
 
-  private static final String NOT_FOUND = "NOT_FOUND";
+  public static final String NOT_FOUND = "NOT_FOUND";
   private static final Log LOG = LogFactory.getLog(IndexManager.class);
 
   private final Meter _readRecordsMeter;
@@ -371,8 +372,8 @@ public class IndexManager {
         getScoreType(query.scoreType), context);
   }
 
-  private void populateSelector(IndexSearcherClosable searcher, String shardName, String
table, Selector selector)
-      throws IOException, BlurException {
+  public static void populateSelector(IndexSearcherClosable searcher, String shardName, String
table, Selector selector)
+      throws IOException {
     Tracer trace = Trace.trace("populate selector");
     String rowId = selector.rowId;
     String recordId = selector.recordId;
@@ -939,12 +940,121 @@ public class IndexManager {
   }
 
   private void doMutates(List<RowMutation> mutations) throws BlurException, IOException
{
+    mutations = reduceMutates(mutations);
     Map<String, List<RowMutation>> map = getMutatesPerTable(mutations);
     for (Entry<String, List<RowMutation>> entry : map.entrySet()) {
       doMutates(entry.getKey(), entry.getValue());
     }
   }
 
+  private List<RowMutation> reduceMutates(List<RowMutation> mutations) throws
BlurException {
+    Map<String, RowMutation> mutateMap = new TreeMap<String, RowMutation>();
+    for (RowMutation mutation : mutations) {
+      RowMutation rowMutation = mutateMap.get(mutation.getRowId());
+      if (rowMutation != null) {
+        mutateMap.put(mutation.getRowId(), merge(rowMutation, mutation));
+      } else {
+        mutateMap.put(mutation.getRowId(), mutation);
+      }
+    }
+    return new ArrayList<RowMutation>(mutateMap.values());
+  }
+
+  private RowMutation merge(RowMutation mutation1, RowMutation mutation2) throws BlurException
{
+    RowMutationType rowMutationType1 = mutation1.getRowMutationType();
+    RowMutationType rowMutationType2 = mutation2.getRowMutationType();
+    if (!rowMutationType1.equals(rowMutationType2)) {
+      throw new BException(
+          "RowMutation conflict, cannot perform 2 different operations on the same row in
the same batch. [{0}] [{1}]",
+          mutation1, mutation2);
+    }
+    if (rowMutationType1.equals(RowMutationType.DELETE_ROW)) {
+      // Since both are trying to delete the same row, just pick one and move
+      // on.
+      return mutation1;
+    } else if (rowMutationType1.equals(RowMutationType.REPLACE_ROW)) {
+      throw new BException(
+          "RowMutation conflict, cannot perform 2 different REPLACE_ROW mutations on the
same row in the same batch. [{0}] [{1}]",
+          mutation1, mutation2);
+    } else {
+      // Now this is a row update, so try to merge the record mutations
+      List<RecordMutation> recordMutations1 = mutation1.getRecordMutations();
+      List<RecordMutation> recordMutations2 = mutation2.getRecordMutations();
+      List<RecordMutation> mergedRecordMutations = merge(recordMutations1, recordMutations2);
+      mutation1.setRecordMutations(mergedRecordMutations);
+      return mutation1;
+    }
+  }
+
+  private List<RecordMutation> merge(List<RecordMutation> recordMutations1, List<RecordMutation>
recordMutations2)
+      throws BException {
+    Map<String, RecordMutation> recordMutationMap = new TreeMap<String, RecordMutation>();
+    merge(recordMutations1, recordMutationMap);
+    merge(recordMutations2, recordMutationMap);
+    return new ArrayList<RecordMutation>(recordMutationMap.values());
+  }
+
+  private void merge(List<RecordMutation> recordMutations, Map<String, RecordMutation>
recordMutationMap)
+      throws BException {
+    for (RecordMutation recordMutation : recordMutations) {
+      Record record = recordMutation.getRecord();
+      String recordId = record.getRecordId();
+      RecordMutation existing = recordMutationMap.get(recordId);
+      if (existing != null) {
+        recordMutationMap.put(recordId, merge(recordMutation, existing));
+      } else {
+        recordMutationMap.put(recordId, recordMutation);
+      }
+    }
+  }
+
+  private RecordMutation merge(RecordMutation recordMutation1, RecordMutation recordMutation2)
throws BException {
+    RecordMutationType recordMutationType1 = recordMutation1.getRecordMutationType();
+    RecordMutationType recordMutationType2 = recordMutation2.getRecordMutationType();
+    if (!recordMutationType1.equals(recordMutationType2)) {
+      throw new BException(
+          "RecordMutation conflict, cannot perform 2 different operations on the same record
in the same row in the same batch. [{0}] [{1}]",
+          recordMutation1, recordMutation2);
+    }
+
+    if (recordMutationType1.equals(RecordMutationType.DELETE_ENTIRE_RECORD)) {
+      // Since both are trying to delete the same record, just pick one and move
+      // on.
+      return recordMutation1;
+    } else if (recordMutationType1.equals(RecordMutationType.REPLACE_ENTIRE_RECORD)) {
+      throw new BException(
+          "RecordMutation conflict, cannot perform 2 different replace record operations
on the same record in the same row in the same batch. [{0}] [{1}]",
+          recordMutation1, recordMutation2);
+    } else if (recordMutationType1.equals(RecordMutationType.REPLACE_COLUMNS)) {
+      throw new BException(
+          "RecordMutation conflict, cannot perform 2 different replace columns operations
on the same record in the same row in the same batch. [{0}] [{1}]",
+          recordMutation1, recordMutation2);
+    } else {
+      Record record1 = recordMutation1.getRecord();
+      Record record2 = recordMutation2.getRecord();
+      String family1 = record1.getFamily();
+      String family2 = record2.getFamily();
+
+      if (isSameFamily(family1, family2)) {
+        record1.getColumns().addAll(record2.getColumns());
+        return recordMutation1;
+      } else {
+        throw new BException("RecordMutation conflict, cannot merge records with different
family. [{0}] [{1}]",
+            recordMutation1, recordMutation2);
+      }
+    }
+  }
+
+  private boolean isSameFamily(String family1, String family2) {
+    if (family1 == null && family2 == null) {
+      return true;
+    }
+    if (family1 != null && family1.equals(family2)) {
+      return true;
+    }
+    return false;
+  }
+
   private void doMutates(final String table, List<RowMutation> mutations) throws IOException,
BlurException {
     final Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a0fd157b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index 0427195..d9093dc 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -267,9 +267,10 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     if (newReader == null) {
       LOG.error("Reader should be new after commit for table [{0}] shard [{1}].", _tableContext.getTable(),
           _shardContext.getShard());
+    } else {
+      _indexReader.set(wrap(newReader));
+      _indexCloser.close(currentReader);
     }
-    _indexReader.set(wrap(newReader));
-    _indexCloser.close(currentReader);
     trace3.done();
   }
 
@@ -278,14 +279,19 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     _writeLock.lock();
     waitUntilNotNull(_writer);
     BlurIndexWriter writer = _writer.get();
+    IndexSearcherClosable indexSearcher = null;
     try {
-      mutatableAction.performMutate(_indexReader.get(), writer);
+      indexSearcher = getIndexSearcher();
+      mutatableAction.performMutate(indexSearcher, writer);
       commit();
     } catch (Exception e) {
       writer.rollback();
       openWriter();
       throw new IOException("Unknown error during mutation", e);
     } finally {
+      if (indexSearcher != null) {
+        indexSearcher.close();
+      }
       _writeLock.unlock();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a0fd157b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
index 70cb377..05e99af 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
@@ -18,66 +18,255 @@ package org.apache.blur.manager.writer;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.TreeMap;
 
 import org.apache.blur.analysis.FieldManager;
 import org.apache.blur.manager.IndexManager;
+import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.FetchResult;
+import org.apache.blur.thrift.generated.FetchRowResult;
 import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.thrift.generated.Row;
 import org.apache.blur.thrift.generated.Selector;
 import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.BlurThriftRecord;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.blur.utils.ResetableDocumentStoredFieldVisitor;
 import org.apache.blur.utils.RowDocumentUtil;
-import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
-import org.apache.lucene.document.Field.Store;
-import org.apache.lucene.document.StringField;
-import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.Term;
-import org.apache.lucene.search.BooleanClause.Occur;
-import org.apache.lucene.search.BooleanQuery;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.search.TopDocs;
 
 public class MutatableAction {
 
+  static class UpdateRow extends InternalAction {
+
+    static abstract class UpdateRowAction {
+      abstract Row performAction(Row row);
+    }
+
+    private final List<UpdateRowAction> _actions = new ArrayList<UpdateRowAction>();
+    private final String _rowId;
+    private final String _table;
+    private final String _shard;
+    private final int _maxHeap;
+    private final TableContext _tableContext;
+    private final FieldManager _fieldManager;
+
+    UpdateRow(String rowId, String table, String shard, int maxHeap, TableContext tableContext)
{
+      _rowId = rowId;
+      _table = table;
+      _shard = shard;
+      _maxHeap = maxHeap;
+      _tableContext = tableContext;
+      _fieldManager = _tableContext.getFieldManager();
+    }
+
+    void deleteRecord(final String recordId) {
+      _actions.add(new UpdateRowAction() {
+        @Override
+        Row performAction(Row row) {
+          if (row == null) {
+            return null;
+          } else {
+            if (row.getRecords() == null) {
+              return row;
+            }
+            Row result = new Row();
+            result.setId(row.getId());
+            for (Record record : row.getRecords()) {
+              if (!record.getRecordId().equals(recordId)) {
+                result.addToRecords(record);
+              }
+            }
+            return result;
+          }
+        }
+      });
+    }
+
+    void appendColumns(final Record record) {
+      _actions.add(new UpdateRowAction() {
+        @Override
+        Row performAction(Row row) {
+          if (row == null) {
+            row = new Row(_rowId, null);
+            row.addToRecords(record);
+            return row;
+          } else {
+            Row result = new Row();
+            result.setId(row.getId());
+            String recordId = record.getRecordId();
+            boolean found = false;
+            if (row.getRecords() != null) {
+              for (Record r : row.getRecords()) {
+                if (!r.getRecordId().equals(recordId)) {
+                  result.addToRecords(r);
+                } else {
+                  found = true;
+                  // Append columns
+                  r.getColumns().addAll(record.getColumns());
+                  result.addToRecords(r);
+                }
+              }
+            }
+            if (!found) {
+              result.addToRecords(record);
+            }
+            return result;
+          }
+        }
+      });
+    }
+
+    void replaceColumns(final Record record) {
+      _actions.add(new UpdateRowAction() {
+        @Override
+        Row performAction(Row row) {
+          if (row == null) {
+            row = new Row(_rowId, null);
+            row.addToRecords(record);
+            return row;
+          } else {
+            Row result = new Row();
+            result.setId(row.getId());
+            String recordId = record.getRecordId();
+            boolean found = false;
+            if (row.getRecords() != null) {
+              for (Record r : row.getRecords()) {
+                if (!r.getRecordId().equals(recordId)) {
+                  result.addToRecords(r);
+                } else {
+                  found = true;
+                  // Replace columns
+                  result.addToRecords(replaceColumns(r, record));
+                }
+              }
+            }
+            if (!found) {
+              result.addToRecords(record);
+            }
+            return result;
+          }
+        }
+      });
+    }
+
+    protected Record replaceColumns(Record existing, Record newRecord) {
+      Map<String, List<Column>> existingColumns = getColumnMap(existing.getColumns());
+      Map<String, List<Column>> newColumns = getColumnMap(newRecord.getColumns());
+      existingColumns.putAll(newColumns);
+      Record record = new Record();
+      record.setFamily(existing.getFamily());
+      record.setRecordId(existing.getRecordId());
+      record.setColumns(toList(existingColumns.values()));
+      return record;
+    }
+
+    private List<Column> toList(Collection<List<Column>> values) {
+      ArrayList<Column> list = new ArrayList<Column>();
+      for (List<Column> v : values) {
+        list.addAll(v);
+      }
+      return list;
+    }
+
+    private Map<String, List<Column>> getColumnMap(List<Column> columns)
{
+      Map<String, List<Column>> columnMap = new TreeMap<String, List<Column>>();
+      for (Column column : columns) {
+        String name = column.getName();
+        List<Column> list = columnMap.get(name);
+        if (list == null) {
+          list = new ArrayList<Column>();
+          columnMap.put(name, list);
+        }
+        list.add(column);
+      }
+      return columnMap;
+    }
+
+    void replaceRecord(final Record record) {
+      _actions.add(new UpdateRowAction() {
+        @Override
+        Row performAction(Row row) {
+          if (row == null) {
+            row = new Row(_rowId, null);
+            row.addToRecords(record);
+            return row;
+          } else {
+            Row result = new Row();
+            result.setId(row.getId());
+            String recordId = record.getRecordId();
+            if (row.getRecords() != null) {
+              for (Record r : row.getRecords()) {
+                if (!r.getRecordId().equals(recordId)) {
+                  result.addToRecords(r);
+                }
+              }
+            }
+            // Add replacement
+            result.addToRecords(record);
+            return result;
+          }
+        }
+      });
+    }
+
+    @Override
+    void performAction(IndexSearcherClosable searcher, IndexWriter writer) throws IOException
{
+      Selector selector = new Selector();
+      selector.setRowId(_rowId);
+      IndexManager.populateSelector(searcher, _shard, _table, selector);
+      Row row = null;
+      if (!selector.getLocationId().equals(IndexManager.NOT_FOUND)) {
+        FetchResult fetchResult = new FetchResult();
+        IndexManager.fetchRow(searcher.getIndexReader(), _table, _shard, selector, fetchResult,
null, null, _maxHeap, _tableContext, null);
+        FetchRowResult rowResult = fetchResult.getRowResult();
+        if (rowResult != null) {
+          row = rowResult.getRow();
+        }
+      }
+      for (UpdateRowAction action : _actions) {
+        row = action.performAction(row);
+      }
+      Term term = createRowId(_rowId);
+      if (row != null && row.getRecords() != null && row.getRecords().size()
> 0) {
+        List<List<Field>> docsToUpdate = RowDocumentUtil.getDocs(row, _fieldManager);
+        writer.updateDocuments(term, docsToUpdate);
+      } else {
+        writer.deleteDocuments(term);
+      }
+    }
+
+  }
+
   static abstract class InternalAction {
-    abstract void performAction(IndexReader reader, IndexWriter writer) throws IOException;
+    abstract void performAction(IndexSearcherClosable searcher, IndexWriter writer) throws
IOException;
   }
 
   private final List<InternalAction> _actions = new ArrayList<InternalAction>();
+  private final Map<String, UpdateRow> _rowUpdates = new HashMap<String, UpdateRow>();
   private final FieldManager _fieldManager;
   private final String _shard;
   private final String _table;
-  private final Term _primeDocTerm;
   private final int _maxHeap = Integer.MAX_VALUE;
+  private TableContext _tableContext;
 
   public MutatableAction(ShardContext context) {
-    TableContext tableContext = context.getTableContext();
+    _tableContext = context.getTableContext();
     _shard = context.getShard();
-    _table = tableContext.getTable();
-    _fieldManager = tableContext.getFieldManager();
-    _primeDocTerm = tableContext.getDefaultPrimeDocTerm();
+    _table = _tableContext.getTable();
+    _fieldManager = _tableContext.getFieldManager();
   }
 
   public void deleteRow(final String rowId) {
     _actions.add(new InternalAction() {
       @Override
-      void performAction(IndexReader reader, IndexWriter writer) throws IOException {
+      void performAction(IndexSearcherClosable searcher, IndexWriter writer) throws IOException
{
         writer.deleteDocuments(createRowId(rowId));
       }
     });
@@ -86,7 +275,7 @@ public class MutatableAction {
   public void replaceRow(final Row row) {
     _actions.add(new InternalAction() {
       @Override
-      void performAction(IndexReader reader, IndexWriter writer) throws IOException {
+      void performAction(IndexSearcherClosable searcher, IndexWriter writer) throws IOException
{
         List<List<Field>> docs = RowDocumentUtil.getDocs(row, _fieldManager);
         Term rowId = createRowId(row.getId());
         writer.updateDocuments(rowId, docs);
@@ -95,294 +284,29 @@ public class MutatableAction {
   }
 
   public void deleteRecord(final String rowId, final String recordId) {
-    _actions.add(new InternalAction() {
-      @Override
-      void performAction(IndexReader reader, IndexWriter writer) throws IOException {
-        Term rowIdTerm = createRowId(rowId);
-        BooleanQuery query = new BooleanQuery();
-        query.add(new TermQuery(rowIdTerm), Occur.MUST);
-        query.add(new TermQuery(BlurUtil.PRIME_DOC_TERM), Occur.MUST);
-
-        IndexSearcher searcher = new IndexSearcher(reader);
-        TopDocs topDocs = searcher.search(query, 1);
-        if (topDocs.totalHits == 0) {
-          // do nothing
-        } else if (topDocs.totalHits == 1) {
-          Selector selector = new Selector();
-          selector.setStartRecord(0);
-          selector.setMaxRecordsToFetch(Integer.MAX_VALUE);
-          selector.setLocationId(_shard + "/" + topDocs.scoreDocs[0].doc);
-          ResetableDocumentStoredFieldVisitor fieldVisitor = IndexManager.getFieldSelector(selector);
-          AtomicBoolean moreDocsToFetch = new AtomicBoolean(false);
-          AtomicInteger totalRecords = new AtomicInteger();
-          List<Document> docs = new ArrayList<Document>(BlurUtil.fetchDocuments(reader,
fieldVisitor, selector,
-              _maxHeap, _table + "/" + _shard, _primeDocTerm, null, moreDocsToFetch, totalRecords,
null));
-          if (moreDocsToFetch.get()) {
-            throw new IOException("Row too large to update.");
-          }
-          boolean found = false;
-          for (int i = 0; i < docs.size(); i++) {
-            Document document = docs.get(i);
-            if (document.get(BlurConstants.RECORD_ID).equals(recordId)) {
-              docs.remove(i);
-              found = true;
-              break;
-            }
-          }
-          if (found) {
-            if (docs.size() == 0) {
-              writer.deleteDocuments(rowIdTerm);
-            } else {
-              Row row = new Row(rowId, toRecords(docs));
-              List<List<Field>> docsToUpdate = RowDocumentUtil.getDocs(row, _fieldManager);
-              writer.updateDocuments(rowIdTerm, docsToUpdate);
-            }
-          }
-        } else {
-          throw new IOException("RowId [" + rowId + "] found more than one row primedoc.");
-        }
-      }
-    });
+    UpdateRow updateRow = getUpdateRow(rowId);
+    updateRow.deleteRecord(recordId);
   }
 
   public void replaceRecord(final String rowId, final Record record) {
-    _actions.add(new InternalAction() {
-      @Override
-      void performAction(IndexReader reader, IndexWriter writer) throws IOException {
-        Term rowIdTerm = createRowId(rowId);
-        BooleanQuery query = new BooleanQuery();
-        query.add(new TermQuery(rowIdTerm), Occur.MUST);
-        query.add(new TermQuery(BlurUtil.PRIME_DOC_TERM), Occur.MUST);
-
-        IndexSearcher searcher = new IndexSearcher(reader);
-        TopDocs topDocs = searcher.search(query, 1);
-        if (topDocs.totalHits == 0) {
-          // just add
-          List<Field> doc = RowDocumentUtil.getDoc(_fieldManager, rowId, record);
-          doc.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE,
Store.NO));
-          writer.addDocument(doc);
-        } else if (topDocs.totalHits == 1) {
-          Selector selector = new Selector();
-          selector.setStartRecord(0);
-          selector.setMaxRecordsToFetch(Integer.MAX_VALUE);
-          selector.setLocationId(_shard + "/" + topDocs.scoreDocs[0].doc);
-          ResetableDocumentStoredFieldVisitor fieldVisitor = IndexManager.getFieldSelector(selector);
-          AtomicBoolean moreDocsToFetch = new AtomicBoolean(false);
-          AtomicInteger totalRecords = new AtomicInteger();
-          List<Document> docs = new ArrayList<Document>(BlurUtil.fetchDocuments(reader,
fieldVisitor, selector,
-              _maxHeap, _table + "/" + _shard, _primeDocTerm, null, moreDocsToFetch, totalRecords,
null));
-          if (moreDocsToFetch.get()) {
-            throw new IOException("Row too large to update.");
-          }
-          List<Field> doc = RowDocumentUtil.getDoc(_fieldManager, rowId, record);
-
-          for (int i = 0; i < docs.size(); i++) {
-            Document document = docs.get(i);
-            if (document.get(BlurConstants.RECORD_ID).equals(record.getRecordId())) {
-              docs.remove(i);
-              break;
-            }
-          }
-          docs.add(toDocument(doc));
-          Row row = new Row(rowId, toRecords(docs));
-          List<List<Field>> docsToUpdate = RowDocumentUtil.getDocs(row, _fieldManager);
-          writer.updateDocuments(rowIdTerm, docsToUpdate);
-        } else {
-          throw new IOException("RowId [" + rowId + "] found more than one row primedoc.");
-        }
-      }
-    });
-  }
-
-  private List<Record> toRecords(List<Document> docs) {
-    List<Record> records = new ArrayList<Record>();
-    for (Document document : docs) {
-      BlurThriftRecord existingRecord = new BlurThriftRecord();
-      RowDocumentUtil.readRecord(document, existingRecord);
-      records.add(existingRecord);
-    }
-    return records;
+    UpdateRow updateRow = getUpdateRow(rowId);
+    updateRow.replaceRecord(record);
   }
 
   public void appendColumns(final String rowId, final Record record) {
-    _actions.add(new InternalAction() {
-      @Override
-      void performAction(IndexReader reader, IndexWriter writer) throws IOException {
-        Term rowIdTerm = createRowId(rowId);
-        BooleanQuery query = new BooleanQuery();
-        query.add(new TermQuery(rowIdTerm), Occur.MUST);
-        query.add(new TermQuery(BlurUtil.PRIME_DOC_TERM), Occur.MUST);
-
-        IndexSearcher searcher = new IndexSearcher(reader);
-        TopDocs topDocs = searcher.search(query, 1);
-        if (topDocs.totalHits == 0) {
-          // just add
-          List<Field> doc = RowDocumentUtil.getDoc(_fieldManager, rowId, record);
-          doc.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE,
Store.NO));
-          writer.addDocument(doc);
-        } else if (topDocs.totalHits == 1) {
-          Selector selector = new Selector();
-          selector.setStartRecord(0);
-          selector.setMaxRecordsToFetch(Integer.MAX_VALUE);
-          selector.setLocationId(_shard + "/" + topDocs.scoreDocs[0].doc);
-          ResetableDocumentStoredFieldVisitor fieldVisitor = IndexManager.getFieldSelector(selector);
-          AtomicBoolean moreDocsToFetch = new AtomicBoolean(false);
-          AtomicInteger totalRecords = new AtomicInteger();
-          List<Document> docs = new ArrayList<Document>(BlurUtil.fetchDocuments(reader,
fieldVisitor, selector,
-              _maxHeap, _table + "/" + _shard, _primeDocTerm, null, moreDocsToFetch, totalRecords,
null));
-          if (moreDocsToFetch.get()) {
-            throw new IOException("Row too large to update.");
-          }
-          BlurThriftRecord existingRecord = new BlurThriftRecord();
-          for (int i = 0; i < docs.size(); i++) {
-            Document document = docs.get(i);
-            if (document.get(BlurConstants.RECORD_ID).equals(record.getRecordId())) {
-              Document doc = docs.remove(i);
-              RowDocumentUtil.readRecord(doc, existingRecord);
-              break;
-            }
-          }
-
-          String recordId = existingRecord.getRecordId();
-          if (recordId == null) {
-            existingRecord.setRecordId(record.getRecordId());
-          } else if (!recordId.equals(record.getRecordId())) {
-            throw new IOException("Record ids do not match.");
-          }
-
-          String family = existingRecord.getFamily();
-          if (family == null) {
-            existingRecord.setFamily(record.getFamily());
-          } else if (!family.equals(record.getFamily())) {
-            throw new IOException("Record family do not match.");
-          }
-
-          for (Column column : record.getColumns()) {
-            existingRecord.addToColumns(column);
-          }
-
-          List<Field> doc = RowDocumentUtil.getDoc(_fieldManager, rowId, existingRecord);
-          docs.add(toDocument(doc));
-
-          Row row = new Row(rowId, toRecords(docs));
-          List<List<Field>> docsToUpdate = RowDocumentUtil.getDocs(row, _fieldManager);
-
-          writer.updateDocuments(rowIdTerm, docsToUpdate);
-        } else {
-          throw new IOException("RowId [" + rowId + "] found more than one row primedoc.");
-        }
-      }
-    });
+    UpdateRow updateRow = getUpdateRow(rowId);
+    updateRow.appendColumns(record);
   }
 
   public void replaceColumns(final String rowId, final Record record) {
-    _actions.add(new InternalAction() {
-      @Override
-      void performAction(IndexReader reader, IndexWriter writer) throws IOException {
-        Term rowIdTerm = createRowId(rowId);
-        BooleanQuery query = new BooleanQuery();
-        query.add(new TermQuery(rowIdTerm), Occur.MUST);
-        query.add(new TermQuery(BlurUtil.PRIME_DOC_TERM), Occur.MUST);
-
-        IndexSearcher searcher = new IndexSearcher(reader);
-        TopDocs topDocs = searcher.search(query, 1);
-        if (topDocs.totalHits == 0) {
-          // just add
-          List<Field> doc = RowDocumentUtil.getDoc(_fieldManager, rowId, record);
-          doc.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE,
Store.NO));
-          writer.addDocument(doc);
-        } else if (topDocs.totalHits == 1) {
-          Selector selector = new Selector();
-          selector.setStartRecord(0);
-          selector.setMaxRecordsToFetch(Integer.MAX_VALUE);
-          selector.setLocationId(_shard + "/" + topDocs.scoreDocs[0].doc);
-          ResetableDocumentStoredFieldVisitor fieldVisitor = IndexManager.getFieldSelector(selector);
-          AtomicBoolean moreDocsToFetch = new AtomicBoolean(false);
-          AtomicInteger totalRecords = new AtomicInteger();
-          List<Document> docs = new ArrayList<Document>(BlurUtil.fetchDocuments(reader,
fieldVisitor, selector,
-              _maxHeap, _table + "/" + _shard, _primeDocTerm, null, moreDocsToFetch, totalRecords,
null));
-          if (moreDocsToFetch.get()) {
-            throw new IOException("Row too large to update.");
-          }
-          BlurThriftRecord existingRecord = new BlurThriftRecord();
-          for (int i = 0; i < docs.size(); i++) {
-            Document document = docs.get(i);
-            if (document.get(BlurConstants.RECORD_ID).equals(record.getRecordId())) {
-              Document doc = docs.remove(i);
-              RowDocumentUtil.readRecord(doc, existingRecord);
-              break;
-            }
-          }
-
-          Map<String, List<Column>> map = new HashMap<String, List<Column>>();
-          for (Column column : record.getColumns()) {
-            String name = column.getName();
-            List<Column> list = map.get(name);
-            if (list == null) {
-              list = new ArrayList<Column>();
-              map.put(name, list);
-            }
-            list.add(column);
-          }
-
-          Record newRecord = new Record(record.getRecordId(), record.getFamily(), null);
-          Set<String> processedColumns = new HashSet<String>();
-          List<Column> columns = existingRecord.getColumns();
-          if (columns != null) {
-            for (Column column : columns) {
-              String name = column.getName();
-              if (processedColumns.contains(name)) {
-                continue;
-              }
-              List<Column> newColumns = map.get(name);
-              if (newColumns != null) {
-                for (Column c : newColumns) {
-                  newRecord.addToColumns(c);
-                }
-                processedColumns.add(name);
-              } else {
-                newRecord.addToColumns(column);
-              }
-            }
-          }
-
-          for (Entry<String, List<Column>> e : map.entrySet()) {
-            String name = e.getKey();
-            if (processedColumns.contains(name)) {
-              continue;
-            }
-            List<Column> newColumns = e.getValue();
-            for (Column c : newColumns) {
-              newRecord.addToColumns(c);
-            }
-            processedColumns.add(name);
-          }
-
-          List<Field> doc = RowDocumentUtil.getDoc(_fieldManager, rowId, newRecord);
-          docs.add(toDocument(doc));
-
-          Row row = new Row(rowId, toRecords(docs));
-          List<List<Field>> docsToUpdate = RowDocumentUtil.getDocs(row, _fieldManager);
-          writer.updateDocuments(rowIdTerm, docsToUpdate);
-        } else {
-          throw new IOException("RowId [" + rowId + "] found more than one row primedoc.");
-        }
-      }
-    });
-  }
-
-  private Document toDocument(List<Field> doc) {
-    Document document = new Document();
-    for (Field f : doc) {
-      document.add(f);
-    }
-    return document;
+    UpdateRow updateRow = getUpdateRow(rowId);
+    updateRow.replaceColumns(record);
   }
 
-  void performMutate(IndexReader reader, IndexWriter writer) throws IOException {
+  void performMutate(IndexSearcherClosable searcher, IndexWriter writer) throws IOException
{
     try {
       for (InternalAction internalAction : _actions) {
-        internalAction.performAction(reader, writer);
+        internalAction.performAction(searcher, writer);
       }
     } finally {
       _actions.clear();
@@ -397,4 +321,14 @@ public class MutatableAction {
     return new Term(BlurConstants.RECORD_ID, id);
   }
 
+  private synchronized UpdateRow getUpdateRow(String rowId) {
+    UpdateRow updateRow = _rowUpdates.get(rowId);
+    if (updateRow == null) {
+      updateRow = new UpdateRow(rowId, _table, _shard, _maxHeap, _tableContext);
+      _rowUpdates.put(rowId, updateRow);
+      _actions.add(updateRow);
+    }
+    return updateRow;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a0fd157b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
index 52dd58b..cdb36ef 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
@@ -1149,6 +1149,38 @@ public class IndexManagerTest {
   }
 
   @Test
+  public void testMultipleMutationReplaceRecordWithInSameBatch() throws Exception {
+    RowMutation mutation1 = newRowMutation(
+        TABLE,
+        "row-4000",
+        newRecordMutation(FAMILY, "record-4a", newColumn("testcol1", "value2"), newColumn("testcol2",
"value3"),
+            newColumn("testcol3", "value4")));
+
+    RowMutation mutation2 = newRowMutation(
+        TABLE,
+        "row-4000",
+        newRecordMutation(FAMILY, "record-4b", newColumn("testcol1", "value2"), newColumn("testcol2",
"value3"),
+            newColumn("testcol3", "value4")));
+    mutation1.setRowMutationType(RowMutationType.UPDATE_ROW);
+    mutation2.setRowMutationType(RowMutationType.UPDATE_ROW);
+    indexManager.mutate(Arrays.asList(mutation1, mutation2));
+    Selector selector = new Selector().setRowId("row-4000");
+    FetchResult fetchResult = new FetchResult();
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+    assertNotNull(fetchResult.rowResult.row);
+    Row row = newRow(
+        "row-4000",
+        newRecord(FAMILY, "record-4a", newColumn("testcol1", "value2"), newColumn("testcol2",
"value3"),
+            newColumn("testcol3", "value4")),
+        newRecord(FAMILY, "record-4b", newColumn("testcol1", "value2"), newColumn("testcol2",
"value3"),
+            newColumn("testcol3", "value4")));
+
+    FetchRowResult rowResult = fetchResult.getRowResult();
+    assertEquals(row, rowResult.getRow());
+    assertEquals(2, rowResult.getTotalRecords());
+  }
+
+  @Test
   public void testMutationReplaceMissingRow() throws Exception {
     Column c1 = newColumn("testcol1", "value20");
     Column c2 = newColumn("testcol2", "value21");


Mime
View raw message