incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Fixed BLUR-345.
Date Wed, 10 Dec 2014 14:51:38 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master fc8bc600e -> 11a43d7cd


Fixed BLUR-345.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/11a43d7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/11a43d7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/11a43d7c

Branch: refs/heads/master
Commit: 11a43d7cded3508a5127d1ed83a6ad1dc43fc408
Parents: fc8bc60
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed Dec 10 09:51:28 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed Dec 10 09:51:28 2014 -0500

----------------------------------------------------------------------
 .../blur/manager/writer/MutatableAction.java    | 316 +++++++++++++------
 .../apache/blur/manager/IndexManagerTest.java   |  28 +-
 2 files changed, 246 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/11a43d7c/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
index f933f5b..b5858b4 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
@@ -21,17 +21,19 @@ import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.blur.analysis.FieldManager;
-import org.apache.blur.manager.IndexManager;
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
@@ -39,22 +41,30 @@ import org.apache.blur.thrift.BException;
 import org.apache.blur.thrift.MutationHelper;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.Column;
-import org.apache.blur.thrift.generated.FetchResult;
-import org.apache.blur.thrift.generated.FetchRowResult;
+import org.apache.blur.thrift.generated.FetchRecordResult;
 import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.thrift.generated.RecordMutation;
 import org.apache.blur.thrift.generated.RecordMutationType;
 import org.apache.blur.thrift.generated.Row;
 import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.thrift.generated.RowMutationType;
-import org.apache.blur.thrift.generated.Selector;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.RowDocumentUtil;
+import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.DocsEnum;
+import org.apache.lucene.index.Fields;
+import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.Term;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.util.BytesRef;
 
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Meter;
@@ -74,14 +84,15 @@ public class MutatableAction extends IndexAction {
 
   static abstract class BaseRecordMutatorIterator implements Iterable<Record> {
 
-    private final Record _record;
     private final Iterable<Record> _iterable;
-    private final String _recordId;
+    private final Map<String, Record> _records;
 
-    public BaseRecordMutatorIterator(Iterable<Record> iterable, Record record) {
-      _record = record;
-      _recordId = record.getRecordId();
+    public BaseRecordMutatorIterator(Iterable<Record> iterable, List<Record>
records) {
       _iterable = iterable;
+      _records = new TreeMap<String, Record>();
+      for (Record r : records) {
+        _records.put(r.getRecordId(), r);
+      }
     }
 
     protected abstract Record handleRecordMutate(Record existingRecord, Record newRecord);
@@ -91,7 +102,7 @@ public class MutatableAction extends IndexAction {
       final Iterator<Record> iterator = _iterable.iterator();
       return new Iterator<Record>() {
 
-        private boolean _applied = false;
+        private SortedSet<String> _needToBeApplied = new TreeSet<String>(_records.keySet());
         private boolean _append = false;
 
         @Override
@@ -100,23 +111,30 @@ public class MutatableAction extends IndexAction {
           if (hasNext) {
             return true;
           }
-          if (_applied) {
+          if (areAllApplied()) {
             return false; // Already applied changes, finished.
           }
           _append = true;
-          return true; // Still need to add new record.
+          return true; // Still need to add new records.
+        }
+
+        private boolean areAllApplied() {
+          return _needToBeApplied.size() == 0;
         }
 
         @Override
         public Record next() {
           if (_append) {
-            _applied = true;
-            return _record;
+            String first = _needToBeApplied.first();
+            _needToBeApplied.remove(first);
+            return _records.get(first);
           }
           Record record = iterator.next();
-          if (record.getRecordId().equals(_recordId)) {
-            record = handleRecordMutate(record, _record);
-            _applied = true;
+          String recordId = record.getRecordId();
+          Record newRecord = _records.get(recordId);
+          if (newRecord != null) {
+            record = handleRecordMutate(record, newRecord);
+            _needToBeApplied.remove(recordId);
           }
           return record;
         }
@@ -137,42 +155,51 @@ public class MutatableAction extends IndexAction {
     }
 
     private final List<UpdateRowAction> _actions = new ArrayList<UpdateRowAction>();
+
+    private UpdateRowAction _deleteRecordsAction;
+    private final Set<String> _deleteRecordsActionRecordsIdToDelete = new HashSet<String>();
+    private UpdateRowAction _appendColumnsAction;
+    private List<Record> _appendColumnsActionRecords = new ArrayList<Record>();
+    private UpdateRowAction _replaceColumnsAction;
+    private List<Record> _replaceColumnsActionRecords = new ArrayList<Record>();
+    private UpdateRowAction _replaceRecordAction;
+    private List<Record> _replaceRecordActionRecords = new ArrayList<Record>();
+
     private final String _rowId;
-    private final String _table;
-    private final String _shard;
-    private final int _maxHeap;
     private final TableContext _tableContext;
     private final FieldManager _fieldManager;
 
-    UpdateRow(String rowId, String table, String shard, int maxHeap, TableContext tableContext)
{
+    UpdateRow(String rowId, TableContext tableContext) {
       _rowId = rowId;
-      _table = table;
-      _shard = shard;
-      _maxHeap = maxHeap;
       _tableContext = tableContext;
       _fieldManager = _tableContext.getFieldManager();
     }
 
     void deleteRecord(final String recordId) {
-      _actions.add(new UpdateRowAction() {
-        @Override
-        IterableRow performAction(IterableRow row) {
-          if (row == null) {
-            return null;
-          } else {
-            return new IterableRow(row.getRowId(), new DeleteRecordIterator(row, recordId));
+      if (_deleteRecordsAction == null) {
+        _deleteRecordsAction = new UpdateRowAction() {
+          @Override
+          IterableRow performAction(IterableRow row) {
+            if (row == null) {
+              return null;
+            } else {
+              return new IterableRow(row.getRowId(), new DeleteRecordIterator(row,
+                  _deleteRecordsActionRecordsIdToDelete));
+            }
           }
-        }
-      });
+        };
+        _actions.add(_deleteRecordsAction);
+      }
+      _deleteRecordsActionRecordsIdToDelete.add(recordId);
     }
 
     static class DeleteRecordIterator implements Iterable<Record> {
 
-      private final String _recordIdToDelete;
+      private final Set<String> _recordsIdToDelete;
       private final Iterable<Record> _iterable;
 
-      public DeleteRecordIterator(Iterable<Record> iterable, String recordIdToDelete)
{
-        _recordIdToDelete = recordIdToDelete;
+      public DeleteRecordIterator(Iterable<Record> iterable, Set<String> recordsIdToDelete)
{
+        _recordsIdToDelete = recordsIdToDelete;
         _iterable = iterable;
       }
 
@@ -187,7 +214,7 @@ public class MutatableAction extends IndexAction {
             if (record == null) {
               return false;
             }
-            if (record.getRecordId().equals(_recordIdToDelete)) {
+            if (_recordsIdToDelete.contains(record.getRecordId())) {
               iterator.next();// Eat the delete
               return hasNext();// Move to the next record
             }
@@ -209,22 +236,26 @@ public class MutatableAction extends IndexAction {
     }
 
     void appendColumns(final Record record) {
-      _actions.add(new UpdateRowAction() {
-        @Override
-        IterableRow performAction(IterableRow row) {
-          if (row == null) {
-            return new IterableRow(_rowId, Arrays.asList(record));
-          } else {
-            return new IterableRow(row.getRowId(), new AppendColumnsIterator(row, record));
+      if (_appendColumnsAction == null) {
+        _appendColumnsAction = new UpdateRowAction() {
+          @Override
+          IterableRow performAction(IterableRow row) {
+            if (row == null) {
+              return new IterableRow(_rowId, _appendColumnsActionRecords);
+            } else {
+              return new IterableRow(row.getRowId(), new AppendColumnsIterator(row, _appendColumnsActionRecords));
+            }
           }
-        }
-      });
+        };
+        _actions.add(_appendColumnsAction);
+      }
+      _appendColumnsActionRecords.add(record);
     }
 
     static class AppendColumnsIterator extends BaseRecordMutatorIterator {
 
-      public AppendColumnsIterator(Iterable<Record> iterable, Record record) {
-        super(iterable, record);
+      public AppendColumnsIterator(Iterable<Record> iterable, List<Record> records)
{
+        super(iterable, records);
       }
 
       @Override
@@ -238,22 +269,26 @@ public class MutatableAction extends IndexAction {
     }
 
     void replaceColumns(final Record record) {
-      _actions.add(new UpdateRowAction() {
-        @Override
-        IterableRow performAction(IterableRow row) {
-          if (row == null) {
-            return new IterableRow(_rowId, Arrays.asList(record));
-          } else {
-            return new IterableRow(_rowId, new ReplaceColumnsIterator(row, record));
+      if (_replaceColumnsAction == null) {
+        _replaceColumnsAction = new UpdateRowAction() {
+          @Override
+          IterableRow performAction(IterableRow row) {
+            if (row == null) {
+              return new IterableRow(_rowId, _replaceColumnsActionRecords);
+            } else {
+              return new IterableRow(row.getRowId(), new ReplaceColumnsIterator(row, _replaceColumnsActionRecords));
+            }
           }
-        }
-      });
+        };
+        _actions.add(_replaceColumnsAction);
+      }
+      _replaceColumnsActionRecords.add(record);
     }
 
     static class ReplaceColumnsIterator extends BaseRecordMutatorIterator {
 
-      public ReplaceColumnsIterator(Iterable<Record> iterable, Record record) {
-        super(iterable, record);
+      public ReplaceColumnsIterator(Iterable<Record> iterable, List<Record> records)
{
+        super(iterable, records);
       }
 
       @Override
@@ -297,24 +332,28 @@ public class MutatableAction extends IndexAction {
     }
 
     void replaceRecord(final Record record) {
-      _actions.add(new UpdateRowAction() {
-        @Override
-        IterableRow performAction(IterableRow row) {
-          if (row == null) {
-            // New Row
-            return new IterableRow(_rowId, Arrays.asList(record));
-          } else {
-            // Existing Row
-            return new IterableRow(_rowId, new ReplaceRecordIterator(row, record));
+      if (_replaceRecordAction == null) {
+        _replaceRecordAction = new UpdateRowAction() {
+          @Override
+          IterableRow performAction(IterableRow row) {
+            if (row == null) {
+              // New Row
+              return new IterableRow(_rowId, _replaceRecordActionRecords);
+            } else {
+              // Existing Row
+              return new IterableRow(row.getRowId(), new ReplaceRecordIterator(row, _replaceRecordActionRecords));
+            }
           }
-        }
-      });
+        };
+        _actions.add(_replaceRecordAction);
+      }
+      _replaceRecordActionRecords.add(record);
     }
 
     static class ReplaceRecordIterator extends BaseRecordMutatorIterator {
 
-      public ReplaceRecordIterator(Iterable<Record> iterable, Record record) {
-        super(iterable, record);
+      public ReplaceRecordIterator(Iterable<Record> iterable, List<Record> records)
{
+        super(iterable, records);
       }
 
       @Override
@@ -326,20 +365,7 @@ public class MutatableAction extends IndexAction {
 
     @Override
     void performAction(IndexSearcherClosable searcher, IndexWriter writer) throws IOException
{
-      Selector selector = new Selector();
-      selector.setRowId(_rowId);
-      IndexManager.populateSelector(searcher, _shard, _table, selector);
-      IterableRow iterableRow = null;
-      if (!selector.getLocationId().equals(IndexManager.NOT_FOUND)) {
-        FetchResult fetchResult = new FetchResult();
-        IndexManager.fetchRow(searcher.getIndexReader(), _table, _shard, selector, fetchResult,
null, null, _maxHeap,
-            _tableContext, null);
-        FetchRowResult rowResult = fetchResult.getRowResult();
-        if (rowResult != null) {
-          Row r = rowResult.getRow();
-          iterableRow = new IterableRow(r.getId(), r.getRecords());
-        }
-      }
+      IterableRow iterableRow = getIterableRow(_rowId, searcher);
       for (UpdateRowAction action : _actions) {
         iterableRow = action.performAction(iterableRow);
       }
@@ -363,6 +389,113 @@ public class MutatableAction extends IndexAction {
       _writeRowMeter.mark();
     }
 
+    private static class AtomicReaderTermsEnum {
+      AtomicReader _atomicReader;
+      TermsEnum _termsEnum;
+
+      AtomicReaderTermsEnum(AtomicReader atomicReader, TermsEnum termsEnum) {
+        _atomicReader = atomicReader;
+        _termsEnum = termsEnum;
+      }
+    }
+
+    private IterableRow getIterableRow(String rowId, IndexSearcherClosable searcher) throws
IOException {
+      IndexReader indexReader = searcher.getIndexReader();
+      BytesRef rowIdRef = new BytesRef(rowId);
+      List<AtomicReaderTermsEnum> possibleRowIds = new ArrayList<AtomicReaderTermsEnum>();
+      for (AtomicReaderContext atomicReaderContext : indexReader.leaves()) {
+        AtomicReader atomicReader = atomicReaderContext.reader();
+        Fields fields = atomicReader.fields();
+        if (fields == null) {
+          continue;
+        }
+        Terms terms = fields.terms(BlurConstants.ROW_ID);
+        if (terms == null) {
+          continue;
+        }
+        TermsEnum termsEnum = terms.iterator(null);
+        if (!termsEnum.seekExact(rowIdRef, true)) {
+          continue;
+        }
+        // need atomic read as well...
+        possibleRowIds.add(new AtomicReaderTermsEnum(atomicReader, termsEnum));
+      }
+      if (possibleRowIds.isEmpty()) {
+        return null;
+      }
+      return new IterableRow(rowId, getRecords(possibleRowIds));
+    }
+
+    private Iterable<Record> getRecords(final List<AtomicReaderTermsEnum> possibleRowIds)
{
+      return new Iterable<Record>() {
+        @Override
+        public Iterator<Record> iterator() {
+          final List<DocsEnum> docsEnums = new ArrayList<DocsEnum>();
+          for (AtomicReaderTermsEnum atomicReaderTermsEnum : possibleRowIds) {
+            try {
+              docsEnums.add(atomicReaderTermsEnum._termsEnum.docs(atomicReaderTermsEnum._atomicReader.getLiveDocs(),
+                  null));
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+          return new Iterator<Record>() {
+
+            private int _index = 0;
+            private boolean _nextCalled;
+            private int _docId;
+
+            @Override
+            public boolean hasNext() {
+              try {
+                if (_nextCalled) {
+                  if (_docId == DocIdSetIterator.NO_MORE_DOCS) {
+                    return false;
+                  }
+                  return true;
+                }
+                while (true) {
+                  if (_index >= docsEnums.size()) {
+                    _nextCalled = true;
+                    _docId = DocIdSetIterator.NO_MORE_DOCS;
+                    return false;
+                  }
+                  DocsEnum docsEnum = docsEnums.get(_index);
+                  int docId = docsEnum.nextDoc();
+                  if (docId != DocIdSetIterator.NO_MORE_DOCS) {
+                    _nextCalled = true;
+                    _docId = docId;
+                    return true;
+                  }
+                  _index++;
+                }
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+            }
+
+            @Override
+            public Record next() {
+              _nextCalled = false;
+              AtomicReaderTermsEnum atomicReaderTermsEnum = possibleRowIds.get(_index);
+              try {
+                Document document = atomicReaderTermsEnum._atomicReader.document(_docId);
+                FetchRecordResult fetchRecordResult = RowDocumentUtil.getRecord(document);
+                return fetchRecordResult.getRecord();
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+            }
+
+            @Override
+            public void remove() {
+              throw new RuntimeException("Not Supported.");
+            }
+          };
+        }
+      };
+    }
+
     Iterable<Iterable<Field>> wrapPrimeDoc(final Iterable<Iterable<Field>>
iterable) {
       return new Iterable<Iterable<Field>>() {
 
@@ -413,15 +546,10 @@ public class MutatableAction extends IndexAction {
   private final List<InternalAction> _actions = new ArrayList<InternalAction>();
   private final Map<String, UpdateRow> _rowUpdates = new HashMap<String, UpdateRow>();
   private final FieldManager _fieldManager;
-  private final String _shard;
-  private final String _table;
-  private final int _maxHeap = Integer.MAX_VALUE;
-  private TableContext _tableContext;
+  private final TableContext _tableContext;
 
   public MutatableAction(ShardContext context) {
     _tableContext = context.getTableContext();
-    _shard = context.getShard();
-    _table = _tableContext.getTable();
     _fieldManager = _tableContext.getFieldManager();
   }
 
@@ -490,7 +618,7 @@ public class MutatableAction extends IndexAction {
   private synchronized UpdateRow getUpdateRow(String rowId) {
     UpdateRow updateRow = _rowUpdates.get(rowId);
     if (updateRow == null) {
-      updateRow = new UpdateRow(rowId, _table, _shard, _maxHeap, _tableContext);
+      updateRow = new UpdateRow(rowId, _tableContext);
       _rowUpdates.put(rowId, updateRow);
       _actions.add(updateRow);
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/11a43d7c/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
index 1258bed..e91d16f 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
@@ -287,7 +287,7 @@ public class IndexManagerTest {
   @Test
   public void testMutationReplaceLargeRow() throws Exception {
     final String rowId = "largerow";
-    indexManager.mutate(getLargeRow(rowId));
+    indexManager.mutate(getLargeRow(rowId, RowMutationType.REPLACE_ROW, 10000));
     TraceStorage oldReporter = Trace.getStorage();
     Trace.setStorage(new BaseTraceStorage(new BlurConfiguration()) {
 
@@ -341,12 +341,32 @@ public class IndexManagerTest {
 
   }
 
-  private RowMutation getLargeRow(String rowId) {
+  @Test
+  public void testMutationAppendLargeRow() throws Exception {
+    final String rowId = "largerowappend";
+    int batch = 2;
+    int batchSize = 10000;
+    for (int i = 0; i < batch; i++) {
+      System.out.println("Adding Batch [" + i + "]");
+      indexManager.mutate(getLargeRow(rowId, RowMutationType.UPDATE_ROW, batchSize));
+    }
+
+    FetchResult fetchResult = new FetchResult();
+    Selector selector = new Selector();
+    selector.setRowId(rowId);
+    indexManager.fetchRow(TABLE, selector, fetchResult);
+
+    FetchRowResult fetchRowResult = fetchResult.getRowResult();
+    System.out.println(fetchRowResult.getTotalRecords());
+    assertEquals(batch * batchSize, fetchRowResult.getTotalRecords());
+  }
+
+  private RowMutation getLargeRow(String rowId, RowMutationType rowMutationType, int count)
{
     RowMutation rowMutation = new RowMutation();
     rowMutation.setTable(TABLE);
     rowMutation.setRowId(rowId);
-    rowMutation.setRecordMutations(getRecordMutations(10000));
-    rowMutation.setRowMutationType(RowMutationType.REPLACE_ROW);
+    rowMutation.setRecordMutations(getRecordMutations(count));
+    rowMutation.setRowMutationType(rowMutationType);
     return rowMutation;
   }
 


Mime
View raw message