lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sim...@apache.org
Subject lucene-solr:branch_7x: LUCENE-8317: Prevent concurrent deletes from being applied during full flush
Date Thu, 17 May 2018 12:11:55 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/branch_7x b178321bd -> 922fd2685


LUCENE-8317: Prevent concurrent deletes from being applied during full flush

Future deletes could potentially be exposed to flushes/commits/refreshes if the
amount of RAM used by deletes is greater than half of the IW RAM buffer.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/922fd268
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/922fd268
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/922fd268

Branch: refs/heads/branch_7x
Commit: 922fd26859cd1e288c8e9ed0d1f22bf75306de90
Parents: b178321
Author: Simon Willnauer <simonw@apache.org>
Authored: Wed May 16 17:34:51 2018 +0200
Committer: Simon Willnauer <simonw@apache.org>
Committed: Thu May 17 14:10:50 2018 +0200

----------------------------------------------------------------------
 lucene/CHANGES.txt                              |  4 ++
 .../apache/lucene/index/DocumentsWriter.java    | 17 ++++++-
 .../lucene/index/FlushByRamOrCountsPolicy.java  |  5 +-
 .../apache/lucene/index/TestIndexWriter.java    | 48 ++++++++++++++++++
 .../index/TestIndexWriterWithThreads.java       | 51 ++++++++++++++++++++
 5 files changed, 122 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/922fd268/lucene/CHANGES.txt
----------------------------------------------------------------------
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 6d47421..278ba9b 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -124,6 +124,10 @@ Bug Fixes
 * LUCENE-8287: Ensure that empty regex completion queries always return no results.
   (Julie Tibshirani via Jim Ferenczi)
 
+* LUCENE-8317: Prevent concurrent deletes from being applied during full flush.
+  Future deletes could potentially be exposed to flushes/commits/refreshes if the 
+  amount of RAM used by deletes is greater than half of the IW RAM buffer. (Simon Willnauer)
+
 Other
 
 * LUCENE-8301: Update randomizedtesting to 2.6.0. (Dawid Weiss)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/922fd268/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index cbf4d6d..cbbf22e 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -181,8 +181,10 @@ final class DocumentsWriter implements Closeable, Accountable {
   
   /** If buffered deletes are using too much heap, resolve them and write disk and return
true. */
   private boolean applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException
{
-    if (flushControl.getAndResetApplyAllDeletes()) {
+    if (flushControl.isFullFlush() == false // never apply deletes during full flush this
breaks happens before relationship
+        && flushControl.getAndResetApplyAllDeletes()) {
       if (deleteQueue != null) {
+        assert assertTicketQueueModification(deleteQueue);
         ticketQueue.addDeletes(deleteQueue);
       }
       flushNotifications.onDeletesApplied(); // apply deletes event forces a purge
@@ -542,6 +544,7 @@ final class DocumentsWriter implements Closeable, Accountable {
          * might miss to deletes documents in 'A'.
          */
         try {
+          assert assertTicketQueueModification(flushingDWPT.deleteQueue);
           // Each flush is assigned a ticket in the order they acquire the ticketQueue lock
           ticket = ticketQueue.addFlushTicket(flushingDWPT);
           final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM();
@@ -673,6 +676,14 @@ final class DocumentsWriter implements Closeable, Accountable {
     currentFullFlushDelQueue = session;
     return true;
   }
+
+  private boolean assertTicketQueueModification(DocumentsWriterDeleteQueue deleteQueue) {
+    // assign it then we don't need to sync on DW
+    DocumentsWriterDeleteQueue currentFullFlushDelQueue = this.currentFullFlushDelQueue;
+    assert currentFullFlushDelQueue == null || currentFullFlushDelQueue == deleteQueue:
+        "only modifications from the current flushing queue are permitted while doing a full
flush";
+    return true;
+  }
   
   /*
    * FlushAllThreads is synced by IW fullFlushLock. Flushing all threads is a
@@ -713,6 +724,7 @@ final class DocumentsWriter implements Closeable, Accountable {
         if (infoStream.isEnabled("DW")) {
           infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen
global deletes");
         }
+        assertTicketQueueModification(flushingDeleteQueue);
         ticketQueue.addDeletes(flushingDeleteQueue);
       }
       // we can't assert that we don't have any tickets in teh queue since we might add a
DocumentsWriterDeleteQueue
@@ -728,7 +740,7 @@ final class DocumentsWriter implements Closeable, Accountable {
     }
   }
   
-  void finishFullFlush(boolean success) {
+  void finishFullFlush(boolean success) throws IOException {
     try {
       if (infoStream.isEnabled("DW")) {
         infoStream.message("DW", Thread.currentThread().getName() + " finishFullFlush success="
+ success);
@@ -742,6 +754,7 @@ final class DocumentsWriter implements Closeable, Accountable {
       }
     } finally {
       pendingChangesInCurrentFullFlush = false;
+      applyAllDeletes(deleteQueue); // make sure we do execute this since we block applying
deletes during full flush
     }
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/922fd268/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java b/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
index ffd9501..6620aef 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
@@ -92,7 +92,10 @@ class FlushByRamOrCountsPolicy extends FlushPolicy {
    */
   protected void markLargestWriterPending(DocumentsWriterFlushControl control,
       ThreadState perThreadState, final long currentBytesPerThread) {
-    control.setFlushPending(findLargestNonPendingWriter(control, perThreadState));
+    ThreadState largestNonPendingWriter = findLargestNonPendingWriter(control, perThreadState);
+    if (largestNonPendingWriter != null) {
+      control.setFlushPending(largestNonPendingWriter);
+    }
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/922fd268/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
index 20dbb2f..c93007a 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -78,6 +78,7 @@ import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.BaseDirectoryWrapper;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.FilterDirectory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
@@ -3289,4 +3290,51 @@ public class TestIndexWriter extends LuceneTestCase {
     }
     IOUtils.close(reader, writer, dir);
   }
+
+  public void testDeleteHappensBeforeWhileFlush() throws IOException, InterruptedException
{
+    CountDownLatch latch = new CountDownLatch(1);
+    CountDownLatch inFlush = new CountDownLatch(1);
+    try (Directory dir = new FilterDirectory(newDirectory()) {
+      @Override
+      public IndexOutput createOutput(String name, IOContext context) throws IOException
{
+        StackTraceElement[] trace = new Exception().getStackTrace();
+        for (int i = 0; i < trace.length; i++) {
+          if ("flush".equals(trace[i].getMethodName()) && DefaultIndexingChain.class.getName().equals(trace[i].getClassName()))
{
+            try {
+              inFlush.countDown();
+              latch.await();
+            } catch (InterruptedException e) {
+              throw new AssertionError(e);
+            }
+          }
+        }
+        return super.createOutput(name, context);
+      }
+    }; IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig())) {
+      Document document = new Document();
+      document.add(new StringField("id", "1", Field.Store.YES));
+      writer.addDocument(document);
+      Thread t = new Thread(() -> {
+        try {
+          inFlush.await();
+          writer.docWriter.flushControl.setApplyAllDeletes();
+          if (random().nextBoolean()) {
+            writer.updateDocument(new Term("id", "1"), document);
+          } else {
+            writer.deleteDocuments(new Term("id", "1"));
+          }
+
+        } catch (Exception e) {
+          throw new AssertionError(e);
+        } finally {
+          latch.countDown();
+        }
+      });
+      t.start();
+      try (IndexReader reader = writer.getReader()) {
+        assertEquals(1, reader.numDocs());
+      };
+      t.join();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/922fd268/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
index fadbc32..6e9645b 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -30,6 +31,7 @@ import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.FieldType;
 import org.apache.lucene.document.NumericDocValuesField;
+import org.apache.lucene.document.StringField;
 import org.apache.lucene.document.TextField;
 import org.apache.lucene.search.DocIdSetIterator;
 import org.apache.lucene.store.AlreadyClosedException;
@@ -655,4 +657,53 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
     writerRef.get().close();
     d.close();
   }
+
+  public void testUpdateSingleDocWithThreads() throws IOException, BrokenBarrierException,
InterruptedException {
+    try (Directory dir = newDirectory();
+         RandomIndexWriter writer = new RandomIndexWriter(random(), dir,
+             newIndexWriterConfig().setMaxBufferedDocs(-1).setRAMBufferSizeMB(0.00001)))
{
+      Thread[] threads = new Thread[3 + random().nextInt(3)];
+      AtomicInteger done = new AtomicInteger(0);
+      CyclicBarrier barrier = new CyclicBarrier(threads.length + 1);
+      Document doc = new Document();
+      doc.add(new StringField("id", "1", Field.Store.NO));
+      writer.updateDocument(new Term("id", "1"), doc);
+      int itersPerThread = 100 + random().nextInt(2000);
+      for (int i = 0; i < threads.length; i++) {
+        threads[i] = new Thread(() -> {
+          try {
+            barrier.await();
+            for (int iters = 0; iters < itersPerThread; iters++) {
+              Document d = new Document();
+              d.add(new StringField("id", "1", Field.Store.NO));
+              writer.updateDocument(new Term("id", "1"), d);
+            }
+          } catch (Exception e) {
+            throw new AssertionError(e);
+          } finally {
+            done.incrementAndGet();
+          }
+        });
+        threads[i].start();
+      }
+      DirectoryReader open = DirectoryReader.open(writer.w);
+      assertEquals(open.numDocs(), 1);
+      barrier.await();
+      try {
+        do {
+          DirectoryReader newReader = DirectoryReader.openIfChanged(open);
+          if (newReader != null) {
+            open.close();
+            open = newReader;
+          }
+          assertEquals(open.numDocs(), 1);
+        } while (done.get() < threads.length);
+      } finally {
+        open.close();
+        for (int i = 0; i < threads.length; i++) {
+          threads[i].join();
+        }
+      }
+    }
+  }
 }


Mime
View raw message