incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Api changes to allow for streaming reindex of entire row regardless of size.
Date Tue, 09 Dec 2014 21:52:04 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master a829afe61 -> 6b0b62d90


Api changes to allow for streaming reindex of entire row regardless of size.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/6b0b62d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/6b0b62d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/6b0b62d9

Branch: refs/heads/master
Commit: 6b0b62d904eefbcbb63b626abd25590af8cd2c28
Parents: a829afe
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Dec 9 16:52:05 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Dec 9 16:52:05 2014 -0500

----------------------------------------------------------------------
 .../manager/writer/GenericPeekableIterator.java |  71 ++++++++
 .../blur/manager/writer/IterablePlusOne.java    |  62 +++++++
 .../apache/blur/manager/writer/IterableRow.java |  63 +++++++
 .../blur/manager/writer/MutatableAction.java    | 166 +++++++++++--------
 .../writer/RecordToDocumentIterable.java        |  79 +++++++++
 5 files changed, 376 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6b0b62d9/blur-core/src/main/java/org/apache/blur/manager/writer/GenericPeekableIterator.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/GenericPeekableIterator.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/GenericPeekableIterator.java
new file mode 100644
index 0000000..13f08a8
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/GenericPeekableIterator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.writer;
+
+import java.util.Iterator;
+
+public class GenericPeekableIterator<T> implements Iterator<T> {
+
+  private Iterator<T> _iterator;
+  private T _current;
+
+  private GenericPeekableIterator(Iterator<T> iterator, T current) {
+    _iterator = iterator;
+    _current = current;
+  }
+
+  public static <T, E extends Exception> GenericPeekableIterator<T> wrap(Iterator<T>
iterator) {
+    if (iterator.hasNext()) {
+      return new GenericPeekableIterator<T>(iterator, iterator.next());
+    }
+    return new GenericPeekableIterator<T>(iterator, null);
+  }
+
+  /**
+   * Only valid is hasNext is true. If hasNext if false, peek will return null;
+   * 
+   * @return <T>
+   */
+  public T peek() {
+    return _current;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (_current != null) {
+      return true;
+    }
+    return _iterator.hasNext();
+  }
+
+  @Override
+  public T next() {
+    T next = null;
+    if (_iterator.hasNext()) {
+      next = _iterator.next();
+    }
+    T result = _current;
+    _current = next;
+    return result;
+  }
+
+  @Override
+  public void remove() {
+    throw new RuntimeException("Not Supported.");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6b0b62d9/blur-core/src/main/java/org/apache/blur/manager/writer/IterablePlusOne.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/IterablePlusOne.java b/blur-core/src/main/java/org/apache/blur/manager/writer/IterablePlusOne.java
new file mode 100644
index 0000000..d0a5c1d
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/IterablePlusOne.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.writer;
+
+import java.util.Iterator;
+
+public class IterablePlusOne<T> implements Iterable<T> {
+
+  private final T _one;
+  private final Iterable<T> _many;
+
+  public IterablePlusOne(T one, Iterable<T> many) {
+    _one = one;
+    _many = many;
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    final Iterator<T> iterator = _many.iterator();
+    return new Iterator<T>() {
+
+      private boolean _onePickedUp = false;
+
+      @Override
+      public boolean hasNext() {
+        if (!_onePickedUp) {
+          return true;
+        }
+        return iterator.hasNext();
+      }
+
+      @Override
+      public T next() {
+        if (!_onePickedUp) {
+          _onePickedUp = true;
+          return _one;
+        }
+        return iterator.next();
+      }
+
+      @Override
+      public void remove() {
+        throw new RuntimeException("Not Supported.");
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6b0b62d9/blur-core/src/main/java/org/apache/blur/manager/writer/IterableRow.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/IterableRow.java b/blur-core/src/main/java/org/apache/blur/manager/writer/IterableRow.java
new file mode 100644
index 0000000..e3a06b5
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/IterableRow.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.writer;
+
+import java.util.Iterator;
+
+import org.apache.blur.thrift.generated.Record;
+
+public class IterableRow implements Iterable<Record> {
+
+  public static final Iterable<Record> EMPTY = new Iterable<Record>() {
+    @Override
+    public Iterator<Record> iterator() {
+      return new Iterator<Record>() {
+        @Override
+        public void remove() {
+          throw new RuntimeException("Not Supported.");
+        }
+
+        @Override
+        public Record next() {
+          throw new RuntimeException("Empty");
+        }
+
+        @Override
+        public boolean hasNext() {
+          return false;
+        }
+      };
+    }
+  };
+  private final String _rowId;
+  private final Iterable<Record> _records;
+
+  public IterableRow(String rowId, Iterable<Record> records) {
+    _rowId = rowId;
+    _records = records;
+  }
+
+  public String getRowId() {
+    return _rowId;
+  }
+
+  @Override
+  public Iterator<Record> iterator() {
+    return _records.iterator();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6b0b62d9/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
index b245545..5cd13fa 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
@@ -21,8 +21,10 @@ import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -49,6 +51,8 @@ import org.apache.blur.thrift.generated.Selector;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.RowDocumentUtil;
 import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.Term;
 
@@ -71,7 +75,7 @@ public class MutatableAction extends IndexAction {
   static class UpdateRow extends InternalAction {
 
     static abstract class UpdateRowAction {
-      abstract Row performAction(Row row);
+      abstract IterableRow performAction(IterableRow row);
     }
 
     private final List<UpdateRowAction> _actions = new ArrayList<UpdateRowAction>();
@@ -94,21 +98,17 @@ public class MutatableAction extends IndexAction {
     void deleteRecord(final String recordId) {
       _actions.add(new UpdateRowAction() {
         @Override
-        Row performAction(Row row) {
+        IterableRow performAction(IterableRow row) {
           if (row == null) {
             return null;
           } else {
-            if (row.getRecords() == null) {
-              return row;
-            }
-            Row result = new Row();
-            result.setId(row.getId());
-            for (Record record : row.getRecords()) {
+            List<Record> records = new ArrayList<Record>();
+            for (Record record : row) {
               if (!record.getRecordId().equals(recordId)) {
-                result.addToRecords(record);
+                records.add(record);
               }
             }
-            return result;
+            return new IterableRow(row.getRowId(), records);
           }
         }
       });
@@ -117,32 +117,27 @@ public class MutatableAction extends IndexAction {
     void appendColumns(final Record record) {
       _actions.add(new UpdateRowAction() {
         @Override
-        Row performAction(Row row) {
+        IterableRow performAction(IterableRow row) {
           if (row == null) {
-            row = new Row(_rowId, null);
-            row.addToRecords(record);
-            return row;
+            return new IterableRow(_rowId, Arrays.asList(record));
           } else {
-            Row result = new Row();
-            result.setId(row.getId());
             String recordId = record.getRecordId();
+            List<Record> records = new ArrayList<Record>();
             boolean found = false;
-            if (row.getRecords() != null) {
-              for (Record r : row.getRecords()) {
-                if (!r.getRecordId().equals(recordId)) {
-                  result.addToRecords(r);
-                } else {
-                  found = true;
-                  // Append columns
-                  r.getColumns().addAll(record.getColumns());
-                  result.addToRecords(r);
-                }
+            for (Record r : row) {
+              if (!r.getRecordId().equals(recordId)) {
+                records.add(r);
+              } else {
+                found = true;
+                // Append columns
+                r.getColumns().addAll(record.getColumns());
+                records.add(r);
               }
             }
             if (!found) {
-              result.addToRecords(record);
+              records.add(record);
             }
-            return result;
+            return new IterableRow(_rowId, records);
           }
         }
       });
@@ -151,31 +146,26 @@ public class MutatableAction extends IndexAction {
     void replaceColumns(final Record record) {
       _actions.add(new UpdateRowAction() {
         @Override
-        Row performAction(Row row) {
+        IterableRow performAction(IterableRow row) {
           if (row == null) {
-            row = new Row(_rowId, null);
-            row.addToRecords(record);
-            return row;
+            return new IterableRow(_rowId, Arrays.asList(record));
           } else {
-            Row result = new Row();
-            result.setId(row.getId());
             String recordId = record.getRecordId();
             boolean found = false;
-            if (row.getRecords() != null) {
-              for (Record r : row.getRecords()) {
-                if (!r.getRecordId().equals(recordId)) {
-                  result.addToRecords(r);
-                } else {
-                  found = true;
-                  // Replace columns
-                  result.addToRecords(replaceColumns(r, record));
-                }
+            List<Record> records = new ArrayList<Record>();
+            for (Record r : row) {
+              if (!r.getRecordId().equals(recordId)) {
+                records.add(r);
+              } else {
+                found = true;
+                // Replace columns
+                records.add(replaceColumns(r, record));
               }
             }
             if (!found) {
-              result.addToRecords(record);
+              records.add(record);
             }
-            return result;
+            return new IterableRow(_rowId, records);
           }
         }
       });
@@ -217,25 +207,20 @@ public class MutatableAction extends IndexAction {
     void replaceRecord(final Record record) {
       _actions.add(new UpdateRowAction() {
         @Override
-        Row performAction(Row row) {
+        IterableRow performAction(IterableRow row) {
           if (row == null) {
-            row = new Row(_rowId, null);
-            row.addToRecords(record);
-            return row;
+            return new IterableRow(_rowId, Arrays.asList(record));
           } else {
-            Row result = new Row();
-            result.setId(row.getId());
+            List<Record> records = new ArrayList<Record>();
             String recordId = record.getRecordId();
-            if (row.getRecords() != null) {
-              for (Record r : row.getRecords()) {
-                if (!r.getRecordId().equals(recordId)) {
-                  result.addToRecords(r);
-                }
+            for (Record r : row) {
+              if (!r.getRecordId().equals(recordId)) {
+                records.add(r);
               }
             }
             // Add replacement
-            result.addToRecords(record);
-            return result;
+            records.add(record);
+            return new IterableRow(_rowId, records);
           }
         }
       });
@@ -246,30 +231,81 @@ public class MutatableAction extends IndexAction {
       Selector selector = new Selector();
       selector.setRowId(_rowId);
       IndexManager.populateSelector(searcher, _shard, _table, selector);
-      Row row = null;
+      IterableRow iterableRow = null;
       if (!selector.getLocationId().equals(IndexManager.NOT_FOUND)) {
         FetchResult fetchResult = new FetchResult();
         IndexManager.fetchRow(searcher.getIndexReader(), _table, _shard, selector, fetchResult,
null, null, _maxHeap,
             _tableContext, null);
         FetchRowResult rowResult = fetchResult.getRowResult();
         if (rowResult != null) {
-          row = rowResult.getRow();
+          Row r = rowResult.getRow();
+          iterableRow = new IterableRow(r.getId(), r.getRecords());
         }
       }
       for (UpdateRowAction action : _actions) {
-        row = action.performAction(row);
+        iterableRow = action.performAction(iterableRow);
       }
       Term term = createRowId(_rowId);
-      if (row != null && row.getRecords() != null && row.getRecords().size()
> 0) {
-        List<List<Field>> docsToUpdate = RowDocumentUtil.getDocs(row, _fieldManager);
-        writer.updateDocuments(term, docsToUpdate);
-        _writeRecordsMeter.mark(docsToUpdate.size());
+      // if (iterableRow != null) {
+      RecordToDocumentIterable docsToUpdate = new RecordToDocumentIterable(iterableRow, _fieldManager);
+      Iterator<Iterable<Field>> iterator = docsToUpdate.iterator();
+      final GenericPeekableIterator<Iterable<Field>> gpi = GenericPeekableIterator.wrap(iterator);
+      if (gpi.peek() != null) {
+        writer.updateDocuments(term, wrapPrimeDoc(new Iterable<Iterable<Field>>()
{
+          @Override
+          public Iterator<Iterable<Field>> iterator() {
+            return gpi;
+          }
+        }));
       } else {
         writer.deleteDocuments(term);
       }
+      _writeRecordsMeter.mark(docsToUpdate.count());
+      // }
       _writeRowMeter.mark();
     }
 
+    Iterable<Iterable<Field>> wrapPrimeDoc(final Iterable<Iterable<Field>>
iterable) {
+      return new Iterable<Iterable<Field>>() {
+
+        @Override
+        public Iterator<Iterable<Field>> iterator() {
+          final Iterator<Iterable<Field>> iterator = iterable.iterator();
+          return new Iterator<Iterable<Field>>() {
+
+            private boolean _first = true;
+
+            @Override
+            public boolean hasNext() {
+              return iterator.hasNext();
+            }
+
+            @Override
+            public Iterable<Field> next() {
+              Iterable<Field> fields = iterator.next();
+              if (_first) {
+                _first = false;
+                return addPrimeDocField(fields);
+              } else {
+                return fields;
+              }
+            }
+
+            private Iterable<Field> addPrimeDocField(Iterable<Field> fields)
{
+              return new IterablePlusOne<Field>(new StringField(BlurConstants.PRIME_DOC,
BlurConstants.PRIME_DOC_VALUE,
+                  Store.NO), fields);
+            }
+
+            @Override
+            public void remove() {
+              throw new RuntimeException("Not Supported.");
+            }
+
+          };
+        }
+      };
+    }
+
   }
 
   static abstract class InternalAction {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6b0b62d9/blur-core/src/main/java/org/apache/blur/manager/writer/RecordToDocumentIterable.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/RecordToDocumentIterable.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/RecordToDocumentIterable.java
new file mode 100644
index 0000000..d198938
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/RecordToDocumentIterable.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.writer;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.blur.analysis.FieldManager;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.lucene.document.Field;
+
+public class RecordToDocumentIterable implements Iterable<Iterable<Field>> {
+
+  private long _count;
+  private final IterableRow _row;
+  private final FieldManager _fieldManager;
+
+  public RecordToDocumentIterable(IterableRow row, FieldManager fieldManager) {
+    _row = row;
+    _fieldManager = fieldManager;
+  }
+
+  @Override
+  public Iterator<Iterable<Field>> iterator() {
+    final Iterator<Record> iterator = _row.iterator();
+    return new Iterator<Iterable<Field>>() {
+      private long count = 0;
+
+      @Override
+      public boolean hasNext() {
+        boolean hasNext = iterator.hasNext();
+        if (!hasNext) {
+          _count = count;
+        }
+        return hasNext;
+      }
+
+      @Override
+      public Iterable<Field> next() {
+        Record record = iterator.next();
+        count++;
+        try {
+          return convert(record);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public void remove() {
+        throw new RuntimeException("Not Supported.");
+      }
+    };
+  }
+
+  public Iterable<Field> convert(Record record) throws IOException {
+    return _fieldManager.getFields(_row.getRowId(), record);
+  }
+
+  public long count() {
+    return _count;
+  }
+
+}


Mime
View raw message