incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/14] git commit: Working on getting the write ahead log working again.
Date Sun, 04 Nov 2012 02:17:15 GMT
Working on getting the write ahead log working again.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/a0b1b602
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/a0b1b602
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/a0b1b602

Branch: refs/heads/0.2-dev
Commit: a0b1b6024a91d6b558b08fad37eefa0948c762a9
Parents: 4e18a55
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sat Nov 3 21:55:45 2012 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sat Nov 3 21:55:45 2012 -0400

----------------------------------------------------------------------
 .../org/apache/blur/manager/writer/BlurIndex.java  |    5 +-
 .../blur/manager/writer/BlurIndexReader.java       |   11 +-
 .../apache/blur/manager/writer/BlurNRTIndex.java   |   24 +-
 .../blur/manager/writer/TransactionRecorder.java   |  329 ++++++---------
 .../org/apache/blur/thrift/BlurShardServer.java    |    5 +-
 .../org/apache/blur/thrift/lucene/Convert.java     |   10 +-
 6 files changed, 169 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a0b1b602/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
index 6cd8e41..39faa4e 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.blur.thrift.generated.Document;
 import org.apache.blur.thrift.generated.Row;
 import org.apache.blur.thrift.generated.Term;
+import org.apache.blur.thrift.generated.UpdatePackage;
 import org.apache.lucene.index.IndexReader;
 
 public abstract class BlurIndex {
@@ -43,8 +44,8 @@ public abstract class BlurIndex {
   public abstract void optimize(int numberOfSegmentsPerShard) throws IOException;
 
   public abstract void addDocuments(boolean waitToBeVisible, boolean wal, List<Document>
documents) throws IOException;
-  public abstract void updateDocuments(boolean waitToBeVisible, boolean wal, Term deleteTerm,
List<Document> documents) throws IOException;
-  public abstract void deleteDocuments(boolean waitToBeVisible, boolean wal, Term... deleteTerm)
throws IOException;
+  public abstract void deleteDocuments(boolean waitToBeVisible, boolean wal, Term... deleteTerms)
throws IOException;
   public abstract void deleteDocuments(boolean waitToBeVisible, boolean wal, ByteBuffer...
deleteQueries) throws IOException;
+  public abstract void updateDocuments(boolean waitToBeVisible, boolean wal, List<UpdatePackage>
updatePackages) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a0b1b602/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
index deba51e..ee40331 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
@@ -28,6 +28,7 @@ import org.apache.blur.log.LogFactory;
 import org.apache.blur.thrift.generated.Document;
 import org.apache.blur.thrift.generated.Row;
 import org.apache.blur.thrift.generated.Term;
+import org.apache.blur.thrift.generated.UpdatePackage;
 import org.apache.lucene.analysis.core.KeywordAnalyzer;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexWriterConfig;
@@ -80,11 +81,6 @@ public class BlurIndexReader extends AbstractBlurIndex {
   }
 
   @Override
-  public void updateDocuments(boolean waitToBeVisible, boolean wal, Term deleteTerm, List<Document>
documents) throws IOException {
-    throw new RuntimeException("Read-only shard");
-  }
-
-  @Override
   public void deleteDocuments(boolean waitToBeVisible, boolean wal, Term... deleteTerm) throws
IOException {
     throw new RuntimeException("Read-only shard");
   }
@@ -93,4 +89,9 @@ public class BlurIndexReader extends AbstractBlurIndex {
   public void deleteDocuments(boolean waitToBeVisible, boolean wal, ByteBuffer... deleteQueries)
throws IOException {
     throw new RuntimeException("Read-only shard"); 
   }
+
+  @Override
+  public void updateDocuments(boolean waitToBeVisible, boolean wal, List<UpdatePackage>
updatePackages) throws IOException {
+    throw new RuntimeException("Read-only shard");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a0b1b602/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
index 7e9c277..4d6f6d5 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -35,6 +35,7 @@ import org.apache.blur.thrift.generated.Document;
 import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.thrift.generated.Row;
 import org.apache.blur.thrift.generated.Term;
+import org.apache.blur.thrift.generated.UpdatePackage;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.lucene.codecs.appending.AppendingCodec;
@@ -135,12 +136,6 @@ public class BlurNRTIndex extends BlurIndex {
   }
 
   @Override
-  public void updateDocuments(boolean waitToBeVisible, boolean wal, Term deleteTerm, List<Document>
documents) throws IOException {
-    long generation = _recorder.updateDocuments(wal, deleteTerm, documents, _trackingWriter);
-    waitToBeVisible(waitToBeVisible, generation);
-  }
-
-  @Override
   public void deleteDocuments(boolean waitToBeVisible, boolean wal, Term... deleteTerm) throws
IOException {
     long generation = _recorder.deleteDocuments(wal, deleteTerm, _trackingWriter);
     waitToBeVisible(waitToBeVisible, generation);
@@ -151,22 +146,21 @@ public class BlurNRTIndex extends BlurIndex {
     long generation = _recorder.deleteDocuments(wal, deleteQueries, _trackingWriter);
     waitToBeVisible(waitToBeVisible, generation);
   }
+  
+  @Override
+  public void updateDocuments(boolean waitToBeVisible, boolean wal, List<UpdatePackage>
updatePackages) throws IOException {
+    long generation = _recorder.updateDocuments(wal, updatePackages, _trackingWriter);  
 
+    waitToBeVisible(waitToBeVisible, generation);
+  }
 
   @Override
   public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException
{
-    List<Record> records = row.records;
-    if (records == null || records.isEmpty()) {
-      deleteRow(waitToBeVisible, wal, row.id);
-      return;
-    }
-    long generation = _recorder.replaceRow(wal, row, _trackingWriter);
-    waitToBeVisible(waitToBeVisible, generation);
+    throw new RuntimeException("No longer supported");
   }
 
   @Override
   public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException
{
-    long generation = _recorder.deleteRow(wal, rowId, _trackingWriter);
-    waitToBeVisible(waitToBeVisible, generation);
+    throw new RuntimeException("No longer supported");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a0b1b602/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
b/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
index 8afe293..72d19d1 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
@@ -16,10 +16,10 @@ package org.apache.blur.manager.writer;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import static org.apache.blur.thrift.lucene.Convert.toLucene;
+
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -34,32 +34,27 @@ import org.apache.blur.analysis.BlurAnalyzer;
 import org.apache.blur.index.IndexWriter;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
-import org.apache.blur.thrift.generated.Column;
-import org.apache.blur.thrift.generated.Record;
-import org.apache.blur.thrift.generated.Row;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.blur.utils.RowIndexWriter;
+import org.apache.blur.thrift.generated.Document;
+import org.apache.blur.thrift.generated.Term;
+import org.apache.blur.thrift.generated.UpdatePackage;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.record.Utils;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.Field.Index;
-import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.document.FieldType;
 import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.Term;
 import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
-import static org.apache.blur.thrift.lucene.Convert.*;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TTransport;
 
 public class TransactionRecorder {
 
   enum TYPE {
-    DELETE((byte) 0), ROW((byte) 1);
+    DELETE_TERM((byte) 0), DELETE_QUERY((byte) 1), UPDATE_DOCUMENTS((byte) 2), ADD_DOCUMENTS((byte)
3);
     private byte b;
 
     private TYPE(byte b) {
@@ -73,9 +68,13 @@ public class TransactionRecorder {
     public static TYPE lookup(byte b) {
       switch (b) {
       case 0:
-        return DELETE;
+        return DELETE_TERM;
       case 1:
-        return ROW;
+        return DELETE_QUERY;
+      case 2:
+        return UPDATE_DOCUMENTS;
+      case 3:
+        return ADD_DOCUMENTS;
       default:
         throw new RuntimeException("Type not found [" + b + "]");
       }
@@ -131,178 +130,150 @@ public class TransactionRecorder {
   }
 
   private void replay(IndexWriter writer, DataInputStream inputStream) throws CorruptIndexException,
IOException {
-    long updateCount = 0;
-    long deleteCount = 0;
+    long addDocumentCount = 0;
+    long updateDocumentCount = 0;
+    long deleteTermCount = 0;
+    long deleteQueriesCount = 0;
     byte[] buffer;
     while ((buffer = readBuffer(inputStream)) != null) {
       DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(buffer));
       TYPE lookup = TYPE.lookup(dataInputStream.readByte());
       switch (lookup) {
-      case ROW:
-        Row row = readRow(dataInputStream);
-        writer.updateDocuments(createRowId(row.id), getDocs(row, analyzer));
-        updateCount++;
-        continue;
-      case DELETE:
-        String deleteRowId = readString(dataInputStream);
-        writer.deleteDocuments(createRowId(deleteRowId));
-        deleteCount++;
-        continue;
+      case ADD_DOCUMENTS:
+        List<org.apache.blur.thrift.generated.Document> documents = readAddDocumentsFromWal(dataInputStream);
+        writer.addDocuments(toLucene(documents));
+        addDocumentCount += documents.size();
+        break;
+      case UPDATE_DOCUMENTS:
+        List<org.apache.blur.thrift.generated.UpdatePackage> updatePackages = readUpdatePackagesFromWal(dataInputStream);
+        for (UpdatePackage updatePackage : updatePackages) {
+          List<org.apache.blur.thrift.generated.Document> docs = updatePackage.getDocuments();
+          writer.updateDocuments(toLucene(updatePackage.getTerm()), toLucene(docs));
+          updateDocumentCount += docs.size();
+        }
+        break;
+      case DELETE_QUERY:
+        ByteBuffer[] deleteQueries = readDeleteQueriesFromWal(dataInputStream);
+        writer.deleteDocuments(toLucene(deleteQueries));
+        deleteQueriesCount += deleteQueries.length;
+        break;
+      case DELETE_TERM:
+        Term[] deleteTerms = readDeleteTermsFromWal(dataInputStream);
+        writer.deleteDocuments(toLucene(deleteTerms));
+        deleteTermCount += deleteTerms.length;
+        break;
       default:
         LOG.error("Unknown type [{0}]", lookup);
         throw new IOException("Unknown type [" + lookup + "]");
       }
     }
-    LOG.info("Rows reclaimed form the WAL [{0}]", updateCount);
-    LOG.info("Deletes reclaimed form the WAL [{0}]", deleteCount);
+    LOG.info("Add Documents reclaimed form the WAL [{0}]", addDocumentCount);
+    LOG.info("Update Documents reclaimed form the WAL [{0}]", updateDocumentCount);
+    LOG.info("Delete Queries reclaimed form the WAL [{0}]", deleteQueriesCount);
+    LOG.info("Delete Terms reclaimed form the WAL [{0}]", deleteTermCount);
   }
 
-  private byte[] readBuffer(DataInputStream inputStream) {
-    try {
-      int length = inputStream.readInt();
-      byte[] buffer = new byte[length];
-      inputStream.readFully(buffer);
-      return buffer;
-    } catch (IOException e) {
-      if (e instanceof EOFException) {
-        return null;
+  private List<UpdatePackage> readUpdatePackagesFromWal(DataInputStream in) throws
IOException {
+    int size = in.readInt();
+    List<UpdatePackage> result = new ArrayList<UpdatePackage>(size);
+    TIOStreamTransport transport = new TIOStreamTransport(in);
+    TCompactProtocol protocol = new TCompactProtocol(transport);
+    for (int i = 0; i < size; i++) {
+      UpdatePackage updatePackage = new UpdatePackage();
+      try {
+        updatePackage.read(protocol);
+      } catch (TException e) {
+        throw new IOException(e);
       }
-      e.printStackTrace();
+      result.add(updatePackage);
     }
-    return null;
+    return result;
   }
 
-  private void rollLog() throws IOException {
-    LOG.info("Rolling WAL path [" + walPath + "]");
-    FSDataOutputStream os = outputStream.get();
-    if (os != null) {
-      os.close();
-    }
-    fileSystem.delete(walPath, false);
-    open();
-  }
+  private void writeUpdateDocumentsToWal(List<UpdatePackage> updatePackages) {
 
-  public void close() throws IOException {
-    synchronized (running) {
-      running.set(false);
-    }
-    outputStream.get().close();
   }
 
-  private static void writeRow(DataOutputStream outputStream, Row row) throws IOException
{
-    writeString(outputStream, row.id);
-    List<Record> records = row.records;
-    int size = records.size();
-    outputStream.writeInt(size);
-    for (int i = 0; i < size; i++) {
-      Record record = records.get(i);
-      writeRecord(outputStream, record);
-    }
+  private ByteBuffer[] readDeleteQueriesFromWal(DataInputStream in) {
+    return null;
   }
 
-  private static Row readRow(DataInputStream inputStream) throws IOException {
-    Row row = new Row();
-    row.id = readString(inputStream);
-    int size = inputStream.readInt();
-    for (int i = 0; i < size; i++) {
-      row.addToRecords(readRecord(inputStream));
-    }
-    return row;
-  }
+  private void writeDeleteQueriesToWal(ByteBuffer[] deleteQueries) {
 
-  private static void writeRecord(DataOutputStream outputStream, Record record) throws IOException
{
-    writeString(outputStream, record.recordId);
-    writeString(outputStream, record.family);
-    List<Column> columns = record.columns;
-    int size = columns.size();
-    outputStream.writeInt(size);
-    for (int i = 0; i < size; i++) {
-      writeColumn(outputStream, columns.get(i));
-    }
   }
 
-  private static Record readRecord(DataInputStream inputStream) throws IOException {
-    Record record = new Record();
-    record.recordId = readString(inputStream);
-    record.family = readString(inputStream);
-    int size = inputStream.readInt();
+  private Term[] readDeleteTermsFromWal(DataInputStream in) throws IOException {
+    int size = in.readInt();
+    Term[] result = new Term[size];
+    TIOStreamTransport transport = new TIOStreamTransport(in);
+    TCompactProtocol protocol = new TCompactProtocol(transport);
     for (int i = 0; i < size; i++) {
-      record.addToColumns(readColumn(inputStream));
+      Term term = new Term();
+      try {
+        term.read(protocol);
+      } catch (TException e) {
+        throw new IOException(e);
+      }
+      result[i] = term;
     }
-    return record;
+    return result;
   }
 
-  private static void writeColumn(DataOutputStream outputStream, Column column) throws IOException
{
-    writeString(outputStream, column.name);
-    writeString(outputStream, column.value);
-  }
+  private void writeDeleteTermsToWal(Term[] terms) {
 
-  private static Column readColumn(DataInputStream inputStream) throws IOException {
-    Column column = new Column();
-    column.name = readString(inputStream);
-    column.value = readString(inputStream);
-    return column;
   }
 
-  private static void writeDelete(DataOutputStream outputStream, String deleteRowId) throws
IOException {
-    writeString(outputStream, deleteRowId);
+  private List<Document> readAddDocumentsFromWal(DataInputStream in) throws IOException
{
+    int size = in.readInt();
+    List<Document> result = new ArrayList<Document>(size);
+    TIOStreamTransport transport = new TIOStreamTransport(in);
+    TCompactProtocol protocol = new TCompactProtocol(transport);
+    for (int i = 0; i < size; i++) {
+      Document document = new Document();
+      try {
+        document.read(protocol);
+      } catch (TException e) {
+        throw new IOException(e);
+      }
+      result.add(document);
+    }
+    return result;
   }
 
-  private static void writeString(DataOutputStream outputStream, String s) throws IOException
{
-    byte[] bs = s.getBytes();
-    Utils.writeVInt(outputStream, bs.length);
-    outputStream.write(bs);
-  }
+  private void writeAddDocumentsToWal(List<Document> documents) {
 
-  private static String readString(DataInputStream inputStream) throws IOException {
-    int length = Utils.readVInt(inputStream);
-    byte[] buffer = new byte[length];
-    inputStream.readFully(buffer);
-    return new String(buffer);
   }
 
-  private void sync(byte[] bs) throws IOException {
-    if (bs == null || outputStream == null) {
-      throw new RuntimeException("bs [" + bs + "] outputStream [" + outputStream + "]");
-    }
-    FSDataOutputStream os = outputStream.get();
-    os.writeInt(bs.length);
-    os.write(bs);
-    long now = System.nanoTime();
-    if (lastSync.get() + timeBetweenSyncs < now) {
-      os.sync();
-      lastSync.set(now);
+  private byte[] readBuffer(DataInputStream inputStream) {
+    try {
+      int length = inputStream.readInt();
+      byte[] buffer = new byte[length];
+      inputStream.readFully(buffer);
+      return buffer;
+    } catch (IOException e) {
+      if (e instanceof EOFException) {
+        return null;
+      }
+      e.printStackTrace();
     }
+    return null;
   }
 
-  public long replaceRow(boolean wal, Row row, TrackingIndexWriter writer) throws IOException
{
-    if (wal) {
-      synchronized (running) {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream outputStream = new DataOutputStream(baos);
-        outputStream.writeByte(TYPE.ROW.value());
-        writeRow(outputStream, row);
-        outputStream.close();
-        sync(baos.toByteArray());
-      }
+  private void rollLog() throws IOException {
+    LOG.info("Rolling WAL path [" + walPath + "]");
+    FSDataOutputStream os = outputStream.get();
+    if (os != null) {
+      os.close();
     }
-    Term term = createRowId(row.id);
-    List<Document> docs = getDocs(row, analyzer);
-    return writer.updateDocuments(term, docs);
+    fileSystem.delete(walPath, false);
+    open();
   }
 
-  public long deleteRow(boolean wal, String rowId, TrackingIndexWriter writer) throws IOException
{
-    if (wal) {
-      synchronized (running) {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream outputStream = new DataOutputStream(baos);
-        outputStream.writeByte(TYPE.DELETE.value());
-        writeDelete(outputStream, rowId);
-        outputStream.close();
-        sync(baos.toByteArray());
-      }
+  public void close() throws IOException {
+    synchronized (running) {
+      running.set(false);
     }
-    return writer.deleteDocuments(createRowId(rowId));
+    outputStream.get().close();
   }
 
   public void setWalPath(Path walPath) {
@@ -325,60 +296,40 @@ public class TransactionRecorder {
     }
   }
 
-  public static List<Document> getDocs(Row row, BlurAnalyzer analyzer) {
-    List<Record> records = row.records;
-    int size = records.size();
-    final String rowId = row.id;
-    final StringBuilder builder = new StringBuilder();
-    List<Document> docs = new ArrayList<Document>(size);
-    for (int i = 0; i < size; i++) {
-      Document document = convert(rowId, records.get(i), builder, analyzer);
-      if (i == 0) {
-        document.add(new Field(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO,
Index.NOT_ANALYZED_NO_NORMS));
-      }
-      docs.add(document);
-    }
-    return docs;
-  }
-
-  public static Document convert(String rowId, Record record, StringBuilder builder, BlurAnalyzer
analyzer) {
-    Document document = new Document();
-    document.add(new Field(BlurConstants.ROW_ID, rowId, ID_TYPE));
-    document.add(new Field(BlurConstants.RECORD_ID, record.recordId, ID_TYPE));
-    RowIndexWriter.addColumns(document, analyzer, builder, record.family, record.columns);
-    return document;
-  }
-
   public void setAnalyzer(BlurAnalyzer analyzer) {
     this.analyzer = analyzer;
   }
 
-  private Term createRowId(String id) {
-    return new Term(BlurConstants.ROW_ID, id);
-  }
-
   public long addDocuments(boolean wal, List<org.apache.blur.thrift.generated.Document>
documents, TrackingIndexWriter writer) throws IOException {
-    List<Document> docs = new ArrayList<Document>();
-    for (org.apache.blur.thrift.generated.Document doc : documents) {
-      docs.add(toLucene(doc));
+    if (wal) {
+      writeAddDocumentsToWal(documents);
     }
-    return writer.addDocuments(docs);
+    return writer.addDocuments(toLucene(documents));
   }
 
-  public long updateDocuments(boolean wal, org.apache.blur.thrift.generated.Term deleteTerm,
List<org.apache.blur.thrift.generated.Document> documents, TrackingIndexWriter writer)
-      throws IOException {
-    List<Document> docs = new ArrayList<Document>();
-    for (org.apache.blur.thrift.generated.Document doc : documents) {
-      docs.add(toLucene(doc));
+  public long deleteDocuments(boolean wal, org.apache.blur.thrift.generated.Term[] deleteTerms,
TrackingIndexWriter writer) throws IOException {
+    if (wal) {
+      writeDeleteTermsToWal(deleteTerms);
     }
-    return writer.updateDocuments(toLucene(deleteTerm), docs);
-  }
-
-  public long deleteDocuments(boolean wal, org.apache.blur.thrift.generated.Term[] deleteTerm,
TrackingIndexWriter writer) throws IOException {
-    return writer.deleteDocuments(toLucene(deleteTerm));
+    return writer.deleteDocuments(toLucene(deleteTerms));
   }
 
   public long deleteDocuments(boolean wal, ByteBuffer[] deleteQueries, TrackingIndexWriter
writer) throws IOException {
+    if (wal) {
+      writeDeleteQueriesToWal(deleteQueries);
+    }
     return writer.deleteDocuments(toLucene(deleteQueries));
   }
+
+  public long updateDocuments(boolean wal, List<UpdatePackage> updatePackages, TrackingIndexWriter
writer) throws IOException {
+    if (wal) {
+      writeUpdateDocumentsToWal(updatePackages);
+    }
+    long generation = -1;
+    for (UpdatePackage updatePackage : updatePackages) {
+      generation = writer.updateDocuments(toLucene(updatePackage.getTerm()), toLucene(updatePackage.getDocuments()));
+    }
+    return generation;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a0b1b602/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index a46a61f..c459732 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -79,6 +79,7 @@ import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.Sort;
 import org.apache.thrift.TException;
 import static org.apache.blur.thrift.lucene.Convert.*;
+
 public class BlurShardServer extends TableAdmin implements Iface {
 
   private static final Log LOG = LogFactory.getLog(BlurShardServer.class);
@@ -603,9 +604,7 @@ public class BlurShardServer extends TableAdmin implements Iface {
     boolean writeAheadLog = options.isWriteAheadLog();
     try {
       BlurIndex index = getIndex(table, shardIndex);
-      for (UpdatePackage updatePackage : updatePackages) {
-        index.updateDocuments(waitToBeVisible, writeAheadLog, updatePackage.getTerm(), updatePackage.getDocuments());
-      }
+      index.updateDocuments(waitToBeVisible, writeAheadLog, updatePackages);
     } catch (Throwable t) {
       LOG.error("Unknown error", t);
       throw new BException(t.getMessage(), t);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a0b1b602/src/blur-core/src/main/java/org/apache/blur/thrift/lucene/Convert.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/lucene/Convert.java b/src/blur-core/src/main/java/org/apache/blur/thrift/lucene/Convert.java
index 9b81d1c..dd44bdf 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/lucene/Convert.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/lucene/Convert.java
@@ -269,7 +269,7 @@ public class Convert {
   public static Term toLucene(org.apache.blur.thrift.generated.Term term) {
     return new Term(term.getField(), new BytesRef(term.getBytes()));
   }
-
+  
   public static Term[] toLucene(org.apache.blur.thrift.generated.Term[] terms) {
     Term[] result = new Term[terms.length];
     for (int i = 0; i < terms.length; i++) {
@@ -278,6 +278,14 @@ public class Convert {
     return result;
   }
   
+  public static List<Document> toLucene(List<org.apache.blur.thrift.generated.Document>
documents) throws IOException {
+    List<Document> docs = new ArrayList<Document>();
+    for (org.apache.blur.thrift.generated.Document doc : documents) {
+      docs.add(toLucene(doc));
+    }
+    return docs;
+  }
+  
   public static Query[] toLucene(ByteBuffer[] queries) throws IOException {
     Query[] result = new Query[queries.length];
     for (int i = 0; i < queries.length; i++) {


Mime
View raw message