lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mikemcc...@apache.org
Subject svn commit: r1151081 - in /lucene/dev/trunk/lucene/src: java/org/apache/lucene/index/ test/org/apache/lucene/index/
Date Tue, 26 Jul 2011 13:07:43 GMT
Author: mikemccand
Date: Tue Jul 26 13:07:42 2011
New Revision: 1151081

URL: http://svn.apache.org/viewvc?rev=1151081&view=rev
Log:
LUCENE-3340: fix cases where buffered deletes fail to be flushed at the right time, resulting
in memory leak

Modified:
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java?rev=1151081&r1=1151080&r2=1151081&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java Tue
Jul 26 13:07:42 2011
@@ -100,7 +100,7 @@ class BufferedDeletesStream {
     numTerms.addAndGet(packet.numTermDeletes);
     bytesUsed.addAndGet(packet.bytesUsed);
     if (infoStream != null) {
-      message("push deletes " + packet + " delGen=" + packet.delGen() + " packetCount=" +
deletes.size());
+      message("push deletes " + packet + " delGen=" + packet.delGen() + " packetCount=" +
deletes.size() + " totBytesUsed=" + bytesUsed.get());
     }
     assert checkDeleteStats();
     return packet.delGen();

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1151081&r1=1151080&r2=1151081&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Tue Jul
26 13:07:42 2011
@@ -141,7 +141,7 @@ final class DocumentsWriter {
       flushPolicy = configuredPolicy;
     }
     flushPolicy.init(this);
-    flushControl = new DocumentsWriterFlushControl(this, config );
+    flushControl = new DocumentsWriterFlushControl(this, config);
   }
 
   synchronized void deleteQueries(final Query... queries) throws IOException {
@@ -309,6 +309,9 @@ final class DocumentsWriter {
   }
 
   private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean maybeMerge) throws
IOException {
+    if (flushControl.doApplyAllDeletes()) {
+      applyAllDeletes(deleteQueue);
+    }
     if (flushingDWPT != null) {
       maybeMerge |= doFlush(flushingDWPT);
     } else {
@@ -443,10 +446,22 @@ final class DocumentsWriter {
         flushControl.doAfterFlush(flushingDWPT);
         flushingDWPT.checkAndResetHasAborted();
         indexWriter.flushCount.incrementAndGet();
+        indexWriter.doAfterFlush();
       }
      
       flushingDWPT = flushControl.nextPendingFlush();
     }
+
+    // If deletes alone are consuming > 1/2 our RAM
+    // buffer, force them all to apply now. This is to
+    // prevent too-frequent flushing of a long tail of
+    // tiny segments:
+    final double ramBufferSizeMB = indexWriter.getConfig().getRAMBufferSizeMB();
+    if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
+        flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) {
+      applyAllDeletes(deleteQueue);
+    }
+
     return maybeMerge;
   }
 
@@ -601,5 +616,4 @@ final class DocumentsWriter {
     }
     return true;
   }
- 
 }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1151081&r1=1151080&r2=1151081&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
(original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
Tue Jul 26 13:07:42 2011
@@ -339,7 +339,11 @@ public final class DocumentsWriterFlushC
    * Returns the number of delete terms in the global pool
    */
   public int getNumGlobalTermDeletes() {
-    return documentsWriter.deleteQueue.numGlobalTermDeletes();
+    return documentsWriter.deleteQueue.numGlobalTermDeletes() + documentsWriter.indexWriter.bufferedDeletesStream.numTerms();
+  }
+  
+  public long getDeleteBytesUsed() {
+    return documentsWriter.deleteQueue.bytesUsed() + documentsWriter.indexWriter.bufferedDeletesStream.bytesUsed();
   }
 
   synchronized int numFlushingDWPT() {

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java?rev=1151081&r1=1151080&r2=1151081&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
(original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
Tue Jul 26 13:07:42 2011
@@ -60,15 +60,11 @@ public class FlushByRamOrCountsPolicy ex
       }
     }
     final DocumentsWriter writer = this.writer.get();
-    // If deletes alone are consuming > 1/2 our RAM
-    // buffer, force them all to apply now. This is to
-    // prevent too-frequent flushing of a long tail of
-    // tiny segments:
     if ((flushOnRAM() &&
-        writer.deleteQueue.bytesUsed() > (1024*1024*indexWriterConfig.getRAMBufferSizeMB()/2)))
{
+        control.getDeleteBytesUsed() > (1024*1024*indexWriterConfig.getRAMBufferSizeMB())))
{
       control.setApplyAllDeletes();
      if (writer.infoStream != null) {
-       writer.message("force apply deletes bytesUsed=" +  writer.deleteQueue.bytesUsed()
+ " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB()));
+       writer.message("force apply deletes bytesUsed=" + control.getDeleteBytesUsed() + "
vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB()));
      }
    }
   }
@@ -82,8 +78,12 @@ public class FlushByRamOrCountsPolicy ex
       control.setFlushPending(state);
     } else if (flushOnRAM()) {// flush by RAM
       final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d);
-      final long totalRam = control.activeBytes();
+      final long totalRam = control.activeBytes() + control.getDeleteBytesUsed();
       if (totalRam >= limit) {
+        final DocumentsWriter writer = this.writer.get();
+        if (writer.infoStream != null) {
+          writer.message("flush: activeBytes=" + control.activeBytes() + " deleteBytes="
+ control.getDeleteBytesUsed() + " vs limit=" + limit);
+        }
         markLargestWriterPending(control, state, totalRam);
       }
     }

Modified: lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java?rev=1151081&r1=1151080&r2=1151081&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java (original)
+++ lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java Tue
Jul 26 13:07:42 2011
@@ -23,6 +23,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.MockAnalyzer;
@@ -944,4 +946,136 @@ public class TestIndexWriterDelete exten
     w.close();
     dir.close();
   }
+
+  // LUCENE-3340: make sure deletes that we don't apply
+  // during flush (ie are just pushed into the stream) are
+  // in fact later flushed due to their RAM usage:
+  public void testFlushPushedDeletesByRAM() throws Exception {
+    Directory dir = newDirectory();
+    // Cannot use RandomIndexWriter because we don't want to
+    // ever call commit() for this test:
+    IndexWriter w = new IndexWriter(dir,
+                                    newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random))
+                                    .setRAMBufferSizeMB(1.0f).setMaxBufferedDocs(1000).setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES).setReaderPooling(false));
+    w.setInfoStream(VERBOSE ? System.out : null);
+    int count = 0;
+    while(true) {
+      Document doc = new Document();
+      doc.add(new Field("id", count+"", Field.Store.NO, Field.Index.NOT_ANALYZED));
+      final Term delTerm;
+      if (count == 1010) {
+        // This is the only delete that applies
+        delTerm = new Term("id", ""+0);
+      } else {
+        // These get buffered, taking up RAM, but delete
+        // nothing when applied:
+        delTerm = new Term("id", "x" + count);
+      }
+      w.updateDocument(delTerm, doc);
+      // Eventually segment 0 should get a del docs:
+      if (dir.fileExists("_0_1.del")) {
+        if (VERBOSE) {
+          System.out.println("TEST: deletes created @ count=" + count);
+        }
+        break;
+      }
+      count++;
+
+      // Today we applyDelets @ count=7199; even if we make
+      // sizable improvements to RAM efficiency of buffered
+      // del term we're unlikely to go over 100K:
+      if (count > 100000) {
+        fail("delete's were not applied");
+      }
+    }
+    w.close();
+    dir.close();
+  }
+
+  // LUCENE-3340: make sure deletes that we don't apply
+  // during flush (ie are just pushed into the stream) are
+  // in fact later flushed due to their RAM usage:
+  public void testFlushPushedDeletesByCount() throws Exception {
+    Directory dir = newDirectory();
+    // Cannot use RandomIndexWriter because we don't want to
+    // ever call commit() for this test:
+    final int flushAtDelCount = atLeast(1020);
+    IndexWriter w = new IndexWriter(dir,
+                                    newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)).
+                                    setMaxBufferedDeleteTerms(flushAtDelCount).setMaxBufferedDocs(1000).setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH).setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES).setReaderPooling(false));
+    w.setInfoStream(VERBOSE ? System.out : null);
+    int count = 0;
+    while(true) {
+      Document doc = new Document();
+      doc.add(new Field("id", count+"", Field.Store.NO, Field.Index.NOT_ANALYZED));
+      final Term delTerm;
+      if (count == 1010) {
+        // This is the only delete that applies
+        delTerm = new Term("id", ""+0);
+      } else {
+        // These get buffered, taking up RAM, but delete
+        // nothing when applied:
+        delTerm = new Term("id", "x" + count);
+      }
+      w.updateDocument(delTerm, doc);
+      // Eventually segment 0 should get a del docs:
+      if (dir.fileExists("_0_1.del")) {
+        break;
+      }
+      count++;
+      if (count > flushAtDelCount) {
+        fail("delete's were not applied at count=" + flushAtDelCount);
+      }
+    }
+    w.close();
+    dir.close();
+  }
+
+  // Make sure buffered (pushed) deletes don't use up so
+  // much RAM that it forces long tail of tiny segments:
+  public void testApplyDeletesOnFlush() throws Exception {
+    Directory dir = newDirectory();
+    // Cannot use RandomIndexWriter because we don't want to
+    // ever call commit() for this test:
+    final AtomicInteger docsInSegment = new AtomicInteger();
+    final AtomicBoolean closing = new AtomicBoolean();
+    final AtomicBoolean sawAfterFlush = new AtomicBoolean();
+    IndexWriter w = new IndexWriter(dir,
+                                    newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)).
+                                    setRAMBufferSizeMB(0.5).setMaxBufferedDocs(-1).setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES).setReaderPooling(false))
{
+        @Override
+        public void doAfterFlush() {
+          assertTrue("only " + docsInSegment.get() + " in segment", closing.get() || docsInSegment.get()
>= 10);
+          docsInSegment.set(0);
+          sawAfterFlush.set(true);
+        }
+      };
+    w.setInfoStream(VERBOSE ? System.out : null);
+    int id = 0;
+    while(true) {
+      StringBuilder sb = new StringBuilder();
+      for(int termIDX=0;termIDX<100;termIDX++) {
+        sb.append(' ').append(_TestUtil.randomRealisticUnicodeString(random));
+      }
+      if (id == 500) {
+        w.deleteDocuments(new Term("id", "0"));
+      }
+      Document doc = new Document();
+      doc.add(newField("id", ""+id, Field.Index.NOT_ANALYZED));
+      doc.add(newField("body", sb.toString(), Field.Index.ANALYZED));
+      w.updateDocument(new Term("id", ""+id), doc);
+      docsInSegment.incrementAndGet();
+      if (dir.fileExists("_0_1.del")) {
+        if (VERBOSE) {
+          System.out.println("TEST: deletes created @ id=" + id);
+        }
+        break;
+      }
+      id++;
+    }
+    closing.set(true);
+    assertTrue(sawAfterFlush.get());
+    w.close();
+    dir.close();
+  }
 }



Mime
View raw message