lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mikemcc...@apache.org
Subject svn commit: r1408092 - in /lucene/dev/branches/branch_4x: ./ lucene/ lucene/core/ lucene/core/src/java/org/apache/lucene/index/ lucene/core/src/test/org/apache/lucene/index/
Date Sun, 11 Nov 2012 19:34:18 GMT
Author: mikemccand
Date: Sun Nov 11 19:34:17 2012
New Revision: 1408092

URL: http://svn.apache.org/viewvc?rev=1408092&view=rev
Log:
LUCENE-4544: fix off-by-one in max number of CMS merge threads

Modified:
    lucene/dev/branches/branch_4x/   (props changed)
    lucene/dev/branches/branch_4x/lucene/   (props changed)
    lucene/dev/branches/branch_4x/lucene/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_4x/lucene/core/   (props changed)
    lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
    lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
    lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
    lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java

Modified: lucene/dev/branches/branch_4x/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/CHANGES.txt?rev=1408092&r1=1408091&r2=1408092&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/lucene/CHANGES.txt Sun Nov 11 19:34:17 2012
@@ -128,6 +128,10 @@ Bug Fixes
   not synced. Instead, it now tracks an 'epoch' version, which is incremented
   whenever the taxonomy is re-created, or replaced. (Shai Erera)
 
+* LUCENE-4544: Fixed off-by-1 in ConcurrentMergeScheduler that would
+  allow 1+maxMergeCount merges threads to be created, instead of just
+  maxMergeCount (Radim Kolar, Mike McCandless)
+
 Optimizations
 
 * LUCENE-4536: PackedInts on-disk format is now byte-aligned (it used to be

Modified: lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java?rev=1408092&r1=1408091&r2=1408092&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
(original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
Sun Nov 11 19:34:17 2012
@@ -302,7 +302,7 @@ public class ConcurrentMergeScheduler ex
   }
 
   @Override
-  public void merge(IndexWriter writer) throws IOException {
+  public synchronized void merge(IndexWriter writer) throws IOException {
 
     assert !Thread.holdsLock(writer);
 
@@ -328,31 +328,34 @@ public class ConcurrentMergeScheduler ex
     // pending merges, until it's empty:
     while (true) {
 
-      synchronized(this) {
-        long startStallTime = 0;
-        while (mergeThreadCount() >= 1+maxMergeCount) {
-          startStallTime = System.currentTimeMillis();
-          if (verbose()) {
-            message("    too many merges; stalling...");
-          }
-          try {
-            wait();
-          } catch (InterruptedException ie) {
-            throw new ThreadInterruptedException(ie);
-          }
-        }
-
+      long startStallTime = 0;
+      while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount)
{
+        // This means merging has fallen too far behind: we
+        // have already created maxMergeCount threads, and
+        // now there's at least one more merge pending.
+        // Note that only maxThreadCount of
+        // those created merge threads will actually be
+        // running; the rest will be paused (see
+        // updateMergeThreads).  We stall this producer
+        // thread to prevent creation of new segments,
+        // until merging has caught up:
+        startStallTime = System.currentTimeMillis();
         if (verbose()) {
-          if (startStallTime != 0) {
-            message("  stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
-          }
+          message("    too many merges; stalling...");
+        }
+        try {
+          wait();
+        } catch (InterruptedException ie) {
+          throw new ThreadInterruptedException(ie);
         }
       }
 
+      if (verbose()) {
+        if (startStallTime != 0) {
+          message("  stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
+        }
+      }
 
-      // TODO: we could be careful about which merges to do in
-      // the BG (eg maybe the "biggest" ones) vs FG, which
-      // merges to do first (the easiest ones?), etc.
       MergePolicy.OneMerge merge = writer.getNextMerge();
       if (merge == null) {
         if (verbose()) {
@@ -361,34 +364,28 @@ public class ConcurrentMergeScheduler ex
         return;
       }
 
-      // We do this w/ the primary thread to keep
-      // deterministic assignment of segment names
-      writer.mergeInit(merge);
-
       boolean success = false;
       try {
-        synchronized(this) {
-          if (verbose()) {
-            message("  consider merge " + writer.segString(merge.segments));
-          }
+        if (verbose()) {
+          message("  consider merge " + writer.segString(merge.segments));
+        }
 
-          // OK to spawn a new merge thread to handle this
-          // merge:
-          final MergeThread merger = getMergeThread(writer, merge);
-          mergeThreads.add(merger);
-          if (verbose()) {
-            message("    launch new thread [" + merger.getName() + "]");
-          }
+        // OK to spawn a new merge thread to handle this
+        // merge:
+        final MergeThread merger = getMergeThread(writer, merge);
+        mergeThreads.add(merger);
+        if (verbose()) {
+          message("    launch new thread [" + merger.getName() + "]");
+        }
 
-          merger.start();
+        merger.start();
 
-          // Must call this after starting the thread else
-          // the new thread is removed from mergeThreads
-          // (since it's not alive yet):
-          updateMergeThreads();
+        // Must call this after starting the thread else
+        // the new thread is removed from mergeThreads
+        // (since it's not alive yet):
+        updateMergeThreads();
 
-          success = true;
-        }
+        success = true;
       } finally {
         if (!success) {
           writer.mergeFinish(merge);
@@ -482,7 +479,6 @@ public class ConcurrentMergeScheduler ex
           // merge that writer says is necessary:
           merge = tWriter.getNextMerge();
           if (merge != null) {
-            tWriter.mergeInit(merge);
             updateMergeThreads();
             if (verbose()) {
               message("  merge thread: do another merge " + tWriter.segString(merge.segments));
@@ -546,4 +542,13 @@ public class ConcurrentMergeScheduler ex
   void clearSuppressExceptions() {
     suppressExceptions = false;
   }
+  
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": ");
+    sb.append("maxThreadCount=").append(maxThreadCount).append(", ");    
+    sb.append("maxMergeCount=").append(maxMergeCount).append(", ");    
+    sb.append("mergeThreadPriority=").append(mergeThreadPriority);
+    return sb.toString();
+  }
 }

Modified: lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java?rev=1408092&r1=1408091&r2=1408092&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
(original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
Sun Nov 11 19:34:17 2012
@@ -1894,6 +1894,15 @@ public class IndexWriter implements Clos
   }
 
   /**
+   * Expert: returns true if there are merges waiting to be scheduled.
+   * 
+   * @lucene.experimental
+   */
+  public synchronized boolean hasPendingMerges() {
+    return pendingMerges.size() != 0;
+  }
+
+  /**
    * Close the <code>IndexWriter</code> without committing
    * any changes that have occurred since the last commit
    * (or since it was opened, if commit hasn't been called).
@@ -2076,7 +2085,7 @@ public class IndexWriter implements Clos
       // they are aborted.
       while(runningMerges.size() > 0) {
         if (infoStream.isEnabled("IW")) {
-          infoStream.message("IW", "now wait for " + runningMerges.size() + " running merge
to abort");
+          infoStream.message("IW", "now wait for " + runningMerges.size() + " running merge/s
to abort");
         }
         doWait();
       }

Modified: lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java?rev=1408092&r1=1408091&r2=1408092&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
(original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
Sun Nov 11 19:34:17 2012
@@ -555,7 +555,7 @@ public class LiveIndexWriterConfig {
     sb.append("commit=").append(commit == null ? "null" : commit).append("\n");
     sb.append("openMode=").append(getOpenMode()).append("\n");
     sb.append("similarity=").append(getSimilarity().getClass().getName()).append("\n");
-    sb.append("mergeScheduler=").append(getMergeScheduler().getClass().getName()).append("\n");
+    sb.append("mergeScheduler=").append(getMergeScheduler()).append("\n");
     sb.append("default WRITE_LOCK_TIMEOUT=").append(IndexWriterConfig.WRITE_LOCK_TIMEOUT).append("\n");
     sb.append("writeLockTimeout=").append(getWriteLockTimeout()).append("\n");
     sb.append("codec=").append(getCodec()).append("\n");

Modified: lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java?rev=1408092&r1=1408091&r2=1408092&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
(original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
Sun Nov 11 19:34:17 2012
@@ -18,14 +18,20 @@ package org.apache.lucene.index;
  */
 
 import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
+import org.apache.lucene.document.TextField;
 import org.apache.lucene.index.IndexWriterConfig.OpenMode;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.MockDirectoryWrapper;
 import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util._TestUtil;
 
 public class TestConcurrentMergeScheduler extends LuceneTestCase {
   
@@ -245,4 +251,74 @@ public class TestConcurrentMergeSchedule
 
     directory.close();
   }
+
+  // LUCENE-4544
+  public void testMaxMergeCount() throws Exception {
+    Directory dir = newDirectory();
+    IndexWriterConfig iwc = new IndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
+
+    final int maxMergeCount = _TestUtil.nextInt(random(), 1, 5);
+    final int maxMergeThreads = _TestUtil.nextInt(random(), 1, maxMergeCount);
+    final CountDownLatch enoughMergesWaiting = new CountDownLatch(maxMergeCount);
+    final AtomicInteger runningMergeCount = new AtomicInteger(0);
+    final AtomicBoolean failed = new AtomicBoolean();
+
+    if (VERBOSE) {
+      System.out.println("TEST: maxMergeCount=" + maxMergeCount + " maxMergeThreads=" + maxMergeThreads);
+    }
+
+    ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() {
+
+      @Override
+      protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
+        try {
+          // Stall all incoming merges until we see
+          // maxMergeCount:
+          int count = runningMergeCount.incrementAndGet();
+          try {
+            assertTrue("count=" + count + " vs maxMergeCount=" + maxMergeCount, count <=
maxMergeCount);
+            enoughMergesWaiting.countDown();
+
+            // Stall this merge until we see exactly
+            // maxMergeCount merges waiting
+            while (true) {
+              if (enoughMergesWaiting.await(10, TimeUnit.MILLISECONDS) || failed.get()) {
+                break;
+              }
+            }
+            // Then sleep a bit to give a chance for the bug
+            // (too many pending merges) to appear:
+            Thread.sleep(20);
+            super.doMerge(merge);
+          } finally {
+            runningMergeCount.decrementAndGet();
+          }
+        } catch (Throwable t) {
+          failed.set(true);
+          writer.mergeFinish(merge);
+          throw new RuntimeException(t);
+        }
+      }
+      };
+    cms.setMaxThreadCount(maxMergeThreads);
+    cms.setMaxMergeCount(maxMergeCount);
+    iwc.setMergeScheduler(cms);
+    iwc.setMaxBufferedDocs(2);
+
+    TieredMergePolicy tmp = new TieredMergePolicy();
+    iwc.setMergePolicy(tmp);
+    tmp.setMaxMergeAtOnce(2);
+    tmp.setSegmentsPerTier(2);
+
+    IndexWriter w = new IndexWriter(dir, iwc);
+    Document doc = new Document();
+    doc.add(newField("field", "field", TextField.TYPE_NOT_STORED));
+    while(enoughMergesWaiting.getCount() != 0 && !failed.get()) {
+      for(int i=0;i<10;i++) {
+        w.addDocument(doc);
+      }
+    }
+    w.close(false);
+    dir.close();
+  }
 }

Modified: lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java?rev=1408092&r1=1408091&r2=1408092&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
(original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
Sun Nov 11 19:34:17 2012
@@ -424,7 +424,10 @@ public class TestIndexWriterExceptions e
   public void testExceptionOnMergeInit() throws IOException {
     Directory dir = newDirectory();
     IndexWriterConfig conf = newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random()))
-      .setMaxBufferedDocs(2).setMergeScheduler(new ConcurrentMergeScheduler()).setMergePolicy(newLogMergePolicy());
+      .setMaxBufferedDocs(2).setMergePolicy(newLogMergePolicy());
+    ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
+    cms.setSuppressExceptions();
+    conf.setMergeScheduler(cms);
     ((LogMergePolicy) conf.getMergePolicy()).setMergeFactor(2);
     MockIndexWriter3 w = new MockIndexWriter3(dir, conf);
     w.doFail = true;



Mime
View raw message