lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mikemcc...@apache.org
Subject [09/19] lucene-solr:master: sequence numbers: always increment seq no (even for addDocument/s); add tests; add javadocs; make DWDQ's seqNo private
Date Sun, 05 Jun 2016 09:51:10 GMT
sequence numbers: always increment seq no (even for addDocument/s); add tests; add javadocs;
make DWDQ's seqNo private


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/59311a44
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/59311a44
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/59311a44

Branch: refs/heads/master
Commit: 59311a445c1552340ba25a0e3e45bac55b3abdbd
Parents: 34673ad
Author: Mike McCandless <mikemccand@apache.org>
Authored: Fri May 27 06:11:07 2016 -0400
Committer: Mike McCandless <mikemccand@apache.org>
Committed: Fri May 27 06:11:07 2016 -0400

----------------------------------------------------------------------
 .../apache/lucene/index/DocumentsWriter.java    |   2 +-
 .../index/DocumentsWriterDeleteQueue.java       |  22 ++-
 .../index/DocumentsWriterFlushControl.java      |   4 +-
 .../lucene/index/DocumentsWriterPerThread.java  |   4 +-
 .../org/apache/lucene/index/IndexWriter.java    |  90 +++++++++--
 .../index/TestIndexingSequenceNumbers.java      | 155 ++++++++++++++++++-
 6 files changed, 249 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/59311a44/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index 9f1bdd3..5630fbb 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -259,7 +259,7 @@ final class DocumentsWriter implements Closeable, Accountable {
       deleteQueue.clear();
 
       // jump over any possible in flight ops:
-      deleteQueue.seqNo.addAndGet(perThreadPool.getActiveThreadStateCount()+1);
+      deleteQueue.skipSequenceNumbers(perThreadPool.getActiveThreadStateCount()+1);
 
       flushControl.abortPendingFlushes();
       flushControl.waitForFlush();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/59311a44/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
index 80d2c85..4a11599 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
@@ -84,7 +84,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
   final long generation;
 
   /** Generates the sequence number that IW returns to callers changing the index, showing
the effective serialization of all operations. */
-  final AtomicLong seqNo;
+  private final AtomicLong nextSeqNo;
   
   DocumentsWriterDeleteQueue() {
     // seqNo must start at 1 because some APIs negate this to also return a boolean
@@ -98,7 +98,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
   DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long generation, long
startSeqNo) {
     this.globalBufferedUpdates = globalBufferedUpdates;
     this.generation = generation;
-    this.seqNo = new AtomicLong(startSeqNo);
+    this.nextSeqNo = new AtomicLong(startSeqNo);
     /*
      * we use a sentinel instance as our initial tail. No slice will ever try to
      * apply this tail since the head is always omitted.
@@ -168,10 +168,10 @@ final class DocumentsWriterDeleteQueue implements Accountable {
           /*
            * now that we are done we need to advance the tail
            */
-          long mySeqNo = seqNo.getAndIncrement();
+          long seqNo = getNextSequenceNumber();
           boolean result = tailUpdater.compareAndSet(this, currentTail, newNode);
           assert result;
-          return mySeqNo;
+          return seqNo;
         }
       }
     }
@@ -460,6 +460,16 @@ final class DocumentsWriterDeleteQueue implements Accountable {
   public String toString() {
     return "DWDQ: [ generation: " + generation + " ]";
   }
-  
-  
+
+  public long getNextSequenceNumber() {
+    return nextSeqNo.getAndIncrement();
+  }  
+
+  public long getLastSequenceNumber() {
+    return nextSeqNo.get()-1;
+  }  
+
+  public void skipSequenceNumbers(long jump) {
+    nextSeqNo.addAndGet(jump);
+  }  
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/59311a44/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index ffcb7dc..99bf8d8 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -481,9 +481,9 @@ final class DocumentsWriterFlushControl implements Accountable {
       // we do another full flush
       //System.out.println("DWFC: fullFLush old seqNo=" + documentsWriter.deleteQueue.seqNo.get()
+ " activeThreadCount=" + perThreadPool.getActiveThreadStateCount());
 
-      // Insert a gap in seqNo of current active thread count, in the worst case those threads
now have one operation in flight.  It's fine
+      // Insert a gap in seqNo of current active thread count, in the worst case each of
those threads now have one operation in flight.  It's fine
       // if we have some sequence numbers that were never assigned:
-      seqNo = documentsWriter.deleteQueue.seqNo.get() + perThreadPool.getActiveThreadStateCount();
+      seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount()
+ 2;
 
       DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1,
seqNo+1);
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/59311a44/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
index 5b1afa0..cf5694d 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
@@ -292,7 +292,7 @@ class DocumentsWriterPerThread {
         deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
         return seqNo;
       } else {
-        seqNo = deleteQueue.seqNo.get();
+        seqNo = deleteQueue.getNextSequenceNumber();
       }
 
       return seqNo;
@@ -328,7 +328,7 @@ class DocumentsWriterPerThread {
       assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
     } else  {
       applySlice &= deleteQueue.updateSlice(deleteSlice);
-      seqNo = deleteQueue.seqNo.get();
+      seqNo = deleteQueue.getNextSequenceNumber();
     }
     
     if (applySlice) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/59311a44/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 945399c..2bdfea7 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -95,6 +95,14 @@ import org.apache.lucene.util.Version;
   and then adds the entire document). When finished adding, deleting 
   and updating documents, {@link #close() close} should be called.</p>
 
+  <a name="sequence_numbers"></a>
+  <p>Each method that changes the index returns a {@code long} sequence number, which
+  expresses the effective order in which each change was applied.
+  {@link #commit} also returns a sequence number, describing which
+  changes are in the commit point and which are not.  Sequence numbers
+  are transient (not saved into the index in any way) and only valid
+  within a single {@code IndexWriter} instance.</p>
+
   <a name="flush"></a>
   <p>These changes are buffered in memory and periodically
   flushed to the {@link Directory} (during the above method
@@ -1288,6 +1296,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
    * replaced with the Unicode replacement character
    * U+FFFD.</p>
    *
+   * @return The <a href="#sequence_number">sequence number</a>
+   * for this operation
+   *
    * @throws CorruptIndexException if the index is corrupt
    * @throws IOException if there is a low-level IO error
    */
@@ -1327,6 +1338,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
    * and will likely break them up.  Use such tools at your
    * own risk!
    *
+   * @return The <a href="#sequence_number">sequence number</a>
+   * for this operation
+   *
    * @throws CorruptIndexException if the index is corrupt
    * @throws IOException if there is a low-level IO error
    *
@@ -1344,6 +1358,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
    *
    * See {@link #addDocuments(Iterable)}.
    *
+   * @return The <a href="#sequence_number">sequence number</a>
+   * for this operation
+   *
    * @throws CorruptIndexException if the index is corrupt
    * @throws IOException if there is a low-level IO error
    *
@@ -1441,7 +1458,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
           }
           //System.out.println("  yes " + info.info.name + " " + docID);
 
-          return docWriter.deleteQueue.seqNo.getAndIncrement();
+          return docWriter.deleteQueue.getNextSequenceNumber();
         }
       } else {
         //System.out.println("  no rld " + info.info.name + " " + docID);
@@ -1458,6 +1475,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
    * terms. All given deletes are applied and flushed atomically
    * at the same time.
    *
+   * @return The <a href="#sequence_number">sequence number</a>
+   * for this operation
+   *
    * @param terms array of terms to identify the documents
    * to be deleted
    * @throws CorruptIndexException if the index is corrupt
@@ -1484,6 +1504,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
    * Deletes the document(s) matching any of the provided queries.
    * All given deletes are applied and flushed atomically at the same time.
    *
+   * @return The <a href="#sequence_number">sequence number</a>
+   * for this operation
+   *
    * @param queries array of queries to identify the documents
    * to be deleted
    * @throws CorruptIndexException if the index is corrupt
@@ -1522,6 +1545,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
    * by a reader on the same index (flush may happen only after
    * the add).
    *
+   * @return The <a href="#sequence_number">sequence number</a>
+   * for this operation
+   *
    * @param term the term to identify the document(s) to be
    * deleted
    * @param doc the document to be added
@@ -1566,6 +1592,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
    *          field name of the {@link NumericDocValues} field
    * @param value
    *          new value for the field
+   *
+   * @return The <a href="#sequence_number">sequence number</a>
+   * for this operation
+   *
    * @throws CorruptIndexException
    *           if the index is corrupt
    * @throws IOException
@@ -1606,6 +1636,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
    *          field name of the {@link BinaryDocValues} field
    * @param value
    *          new value for the field
+   *
+   * @return The <a href="#sequence_number">sequence number</a>
+   * for this operation
+   *
    * @throws CorruptIndexException
    *           if the index is corrupt
    * @throws IOException
@@ -1642,6 +1676,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
    * 
    * @param updates
    *          the updates to apply
+   *
+   * @return The <a href="#sequence_number">sequence number</a>
+   * for this operation
+   *
    * @throws CorruptIndexException
    *           if the index is corrupt
    * @throws IOException
@@ -2256,6 +2294,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
    * threads are running {@link #forceMerge}, {@link #addIndexes(CodecReader[])}
    * or {@link #forceMergeDeletes} methods, they may receive
    * {@link MergePolicy.MergeAbortedException}s.
+   *
+   * @return The <a href="#sequence_number">sequence number</a>
+   * for this operation
    */
   public long deleteAll() throws IOException {
     ensureOpen();
@@ -2304,7 +2345,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
             globalFieldNumberMap.clear();
 
             success = true;
-            return docWriter.deleteQueue.seqNo.get();
+            return docWriter.deleteQueue.getNextSequenceNumber();
 
           } finally {
             docWriter.unlockAllAfterAbortAll(this);
@@ -2542,6 +2583,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
    *
    * <p>This requires this index not be among those to be added.
    *
+   * @return The <a href="#sequence_number">sequence number</a>
+   * for this operation
+   *
    * @throws CorruptIndexException if the index is corrupt
    * @throws IOException if there is a low-level IO error
    * @throws IllegalArgumentException if addIndexes would cause
@@ -2559,6 +2603,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
 
     boolean successTop = false;
 
+    long seqNo;
+
     try {
       if (infoStream.isEnabled("IW")) {
         infoStream.message("IW", "flush at addIndexes(Directory...)");
@@ -2630,6 +2676,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
           // Now reserve the docs, just before we update SIS:
           reserveDocs(totalMaxDoc);
 
+          seqNo = docWriter.deleteQueue.getNextSequenceNumber();
+
           success = true;
         } finally {
           if (!success) {
@@ -2647,6 +2695,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
 
     } catch (VirtualMachineError tragedy) {
       tragicEvent(tragedy, "addIndexes(Directory...)");
+      // dead code but javac disagrees:
+      seqNo = -1;
     } finally {
       if (successTop) {
         IOUtils.close(locks);
@@ -2656,8 +2706,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
     }
     maybeMerge();
 
-    // no need to increment:
-    return docWriter.deleteQueue.seqNo.get();
+    return seqNo;
   }
   
   /**
@@ -2682,6 +2731,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
    * {@code maxMergeAtOnce} parameter, you should pass that many readers in one
    * call.
    * 
+   * @return The <a href="#sequence_number">sequence number</a>
+   * for this operation
+   *
    * @throws CorruptIndexException
    *           if the index is corrupt
    * @throws IOException
@@ -2697,6 +2749,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
 
     Sort indexSort = config.getIndexSort();
 
+    long seqNo;
+
     try {
       if (infoStream.isEnabled("IW")) {
         infoStream.message("IW", "flush at addIndexes(CodecReader...)");
@@ -2731,8 +2785,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
       rateLimiters.set(new MergeRateLimiter(null));
 
       if (!merger.shouldMerge()) {
-        // no need to increment:
-        return docWriter.deleteQueue.seqNo.get();
+        return docWriter.deleteQueue.getNextSequenceNumber();
       }
 
       merger.merge();                // merge 'em
@@ -2751,8 +2804,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
           // Safe: these files must exist
           deleteNewFiles(infoPerCommit.files());
 
-          // no need to increment:
-          return docWriter.deleteQueue.seqNo.get();
+          return docWriter.deleteQueue.getNextSequenceNumber();
         }
         ensureOpen();
         useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, infoPerCommit, this);
@@ -2788,8 +2840,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
           // Safe: these files must exist
           deleteNewFiles(infoPerCommit.files());
 
-          // no need to increment:
-          return docWriter.deleteQueue.seqNo.get();
+          return docWriter.deleteQueue.getNextSequenceNumber();
         }
         ensureOpen();
 
@@ -2797,15 +2848,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
         reserveDocs(numDocs);
       
         segmentInfos.add(infoPerCommit);
+        seqNo = docWriter.deleteQueue.getNextSequenceNumber();
         checkpoint();
       }
     } catch (VirtualMachineError tragedy) {
       tragicEvent(tragedy, "addIndexes(CodecReader...)");
+      // dead code but javac disagrees:
+      seqNo = -1;
     }
     maybeMerge();
 
-    // no need to increment:
-    return docWriter.deleteQueue.seqNo.get();
+    return seqNo;
   }
 
   /** Copies the segment files as-is into the IndexWriter's directory. */
@@ -2873,6 +2926,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
    * <p>You can also just call {@link #commit()} directly
    *  without prepareCommit first in which case that method
    *  will internally call prepareCommit.
+   *
+   * @return The <a href="#sequence_number">sequence number</a>
+   * last operation in the commit.  All sequence numbers &lt;= this value
+   * will be reflected in the commit, and all others will not.
    */
   @Override
   public final long prepareCommit() throws IOException {
@@ -3069,6 +3126,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
    * point, and all other operations will not. </p>
    *
    * @see #prepareCommit
+   *
+   * @return The <a href="#sequence_number">sequence number</a>
+   * last operation in the commit.  All sequence numbers &lt;= this value
+   * will be reflected in the commit, and all others will not.
    */
   @Override
   public final long commit() throws IOException {
@@ -4988,11 +5049,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable
{
     };
   }
 
-  /** Returns the last sequence number.
+  /** Returns the last <a href="#sequence_number">sequence number</a>, or 0
+   *  if no index-changing operations have completed yet.
    *
    * @lucene.experimental */
   public long getLastSequenceNumber() {
     ensureOpen();
-    return docWriter.deleteQueue.seqNo.get()-1;
+    return docWriter.deleteQueue.getLastSequenceNumber();
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/59311a44/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
index fb9b9ab..002292c 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
@@ -43,7 +43,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
     IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
     long a = w.addDocument(new Document());
     long b = w.addDocument(new Document());
-    assertTrue(b >= a);
+    assertTrue(b > a);
     w.close();
     dir.close();
   }
@@ -129,7 +129,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
   }
 
   static class Operation {
-    // 0 = update, 1 = delete, 2 = commit
+    // 0 = update, 1 = delete, 2 = commit, 3 = add
     byte what;
     int id;
     int threadID;
@@ -248,7 +248,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
             }
           }
 
-          assertTrue(op.seqNo >= lastSeqNo);
+          assertTrue(op.seqNo > lastSeqNo);
           lastSeqNo = op.seqNo;
         }
       }
@@ -293,4 +293,153 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
 
     dir.close();
   }
+
+  public void testStressConcurrentAddAndDeleteAndCommit() throws Exception {
+    final int opCount = atLeast(10000);
+    final int idCount = TestUtil.nextInt(random(), 10, 1000);
+
+    Directory dir = newDirectory();
+    IndexWriterConfig iwc = newIndexWriterConfig();
+    iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
+
+    // Cannot use RIW since it randomly commits:
+    final IndexWriter w = new IndexWriter(dir, iwc);
+
+    final int numThreads = TestUtil.nextInt(random(), 2, 5);
+    Thread[] threads = new Thread[numThreads];
+    //System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount
+ " threadCount=" + threads.length);
+    final CountDownLatch startingGun = new CountDownLatch(1);
+    List<List<Operation>> threadOps = new ArrayList<>();
+
+    Object commitLock = new Object();
+    final List<Operation> commits = new ArrayList<>();
+
+    // multiple threads update the same set of documents, and we randomly commit
+    for(int i=0;i<threads.length;i++) {
+      final List<Operation> ops = new ArrayList<>();
+      threadOps.add(ops);
+      final int threadID = i;
+      threads[i] = new Thread() {
+          @Override
+          public void run() {
+            try {
+              startingGun.await();
+              for(int i=0;i<opCount;i++) {
+                Operation op = new Operation();
+                op.threadID = threadID;
+                if (random().nextInt(500) == 17) {
+                  op.what = 2;
+                  synchronized(commitLock) {
+                    op.seqNo = w.commit();
+                    if (op.seqNo != -1) {
+                      commits.add(op);
+                    }
+                  }
+                } else {
+                  op.id = random().nextInt(idCount);
+                  Term idTerm = new Term("id", "" + op.id);
+                  if (random().nextInt(10) == 1) {
+                    op.what = 1;
+                    if (random().nextBoolean()) {
+                      op.seqNo = w.deleteDocuments(idTerm);
+                    } else {
+                      op.seqNo = w.deleteDocuments(new TermQuery(idTerm));
+                    }
+                  } else {
+                    Document doc = new Document();
+                    doc.add(new StoredField("thread", threadID));
+                    doc.add(new StringField("id", "" + op.id, Field.Store.NO));
+                    if (random().nextBoolean()) {
+                      List<Document> docs = new ArrayList<>();
+                      docs.add(doc);
+                      op.seqNo = w.addDocuments(docs);
+                    } else {
+                      op.seqNo = w.addDocument(doc);
+                    }
+                    op.what = 3;
+                  }
+                  ops.add(op);
+                }
+              }
+            } catch (Exception e) {
+              throw new RuntimeException(e);
+            }
+          }
+        };
+      threads[i].start();
+    }
+    startingGun.countDown();
+    for(Thread thread : threads) {
+      thread.join();
+    }
+
+    Operation commitOp = new Operation();
+    commitOp.seqNo = w.commit();
+    if (commitOp.seqNo != -1) {
+      commits.add(commitOp);
+    }
+
+    List<IndexCommit> indexCommits = DirectoryReader.listCommits(dir);
+    assertEquals(commits.size(), indexCommits.size());
+
+    // how many docs with this id are expected:
+    int[] expectedCounts = new int[idCount];
+    long[] lastDelSeqNos = new long[idCount];
+      
+    //System.out.println("TEST: " + commits.size() + " commits");
+    for(int i=0;i<commits.size();i++) {
+      // this commit point should reflect all operations <= this seqNo
+      long commitSeqNo = commits.get(i).seqNo;
+      //System.out.println("  commit " + i + ": seqNo=" + commitSeqNo + " segs=" + indexCommits.get(i));
+
+      // first find the highest seqNo of the last delete op, for each id, prior to this commit:
+      Arrays.fill(lastDelSeqNos, -1);
+      for(int threadID=0;threadID<threadOps.size();threadID++) {
+        long lastSeqNo = 0;
+        for(Operation op : threadOps.get(threadID)) {
+          if (op.what == 1 && op.seqNo <= commitSeqNo && op.seqNo >
lastDelSeqNos[op.id]) {
+            lastDelSeqNos[op.id] = op.seqNo;
+          }
+
+          // within one thread the seqNos must only increase:
+          assertTrue(op.seqNo > lastSeqNo);
+          lastSeqNo = op.seqNo;
+        }
+      }
+
+      // then count how many adds happened since the last delete and before this commit:
+      Arrays.fill(expectedCounts, 0);
+      for(int threadID=0;threadID<threadOps.size();threadID++) {
+        for(Operation op : threadOps.get(threadID)) {
+          if (op.what == 3 && op.seqNo <= commitSeqNo && op.seqNo >
lastDelSeqNos[op.id]) {
+            expectedCounts[op.id]++;
+          }
+        }
+      }
+
+      DirectoryReader r = DirectoryReader.open(indexCommits.get(i));
+      IndexSearcher s = new IndexSearcher(r);
+
+      for(int id=0;id<idCount;id++) {
+        //System.out.println("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]);
+        assertEquals(expectedCounts[id], s.count(new TermQuery(new Term("id", ""+id))));
+      }
+      w.close();
+      r.close();
+    }
+
+    dir.close();
+  }
+
+  public void testDeleteAll() throws Exception {
+    Directory dir = newDirectory();
+    IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
+    long a = w.addDocument(new Document());
+    long b = w.deleteAll();
+    assertTrue(a < b);
+    long c = w.commit();
+    assertTrue(b < c);
+    w.close();
+    dir.close();
+  }
 }


Mime
View raw message