lucene-java-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mikemcc...@apache.org
Subject svn commit: r892992 - in /lucene/java/trunk: ./ contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/ src/java/org/apache/lucene/index/
Date Mon, 21 Dec 2009 21:38:25 GMT
Author: mikemccand
Date: Mon Dec 21 21:38:24 2009
New Revision: 892992

URL: http://svn.apache.org/viewvc?rev=892992&view=rev
Log:
LUCENE-2164: make CMS smarter about prioritizing its threads

Modified:
    lucene/java/trunk/CHANGES.txt
    lucene/java/trunk/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java
    lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
    lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/java/trunk/src/java/org/apache/lucene/index/MergePolicy.java
    lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java

Modified: lucene/java/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/java/trunk/CHANGES.txt?rev=892992&r1=892991&r2=892992&view=diff
==============================================================================
--- lucene/java/trunk/CHANGES.txt (original)
+++ lucene/java/trunk/CHANGES.txt Mon Dec 21 21:38:24 2009
@@ -119,6 +119,14 @@
 * LUCENE-2161: Improve concurrency of IndexReader, especially in the
   context of near real-time readers.  (Mike McCandless)
 
+* LUCENE-2164: ConcurrentMergeScheduler has more control over merge
+  threads.  First, it gives smaller merges higher thread priority than
+  larges ones.  Second, a new set/getMaxMergeCount setting will pause
+  the larger merges to allow smaller ones to finish.  The defaults for
+  these settings are now dynamic, depending the number CPU cores as
+  reported by Runtime.getRuntime().availableProcessors() (Mike
+  McCandless)
+
 Build
 
  * LUCENE-2124: Moved the JDK-based collation support from contrib/collation 

Modified: lucene/java/trunk/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java?rev=892992&r1=892991&r2=892992&view=diff
==============================================================================
--- lucene/java/trunk/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java
(original)
+++ lucene/java/trunk/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/CreateIndexTask.java
Mon Dec 21 21:38:24 2009
@@ -22,6 +22,7 @@
 import org.apache.lucene.index.IndexDeletionPolicy;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.MergeScheduler;
+import org.apache.lucene.index.ConcurrentMergeScheduler;
 import org.apache.lucene.index.MergePolicy;
 
 import java.io.BufferedOutputStream;
@@ -33,9 +34,15 @@
 /**
  * Create an index. <br>
  * Other side effects: index writer object in perfRunData is set. <br>
- * Relevant properties: <code>merge.factor, max.buffered,
- *  max.field.length, ram.flush.mb [default 0],
- *  [default true]</code>.
+ * Relevant properties: <code>merge.factor (default 10),
+ * max.buffered (default no flush), max.field.length (default
+ * 10,000 tokens), max.field.length, compound (default true), ram.flush.mb [default 0],
+ * merge.policy (default org.apache.lucene.index.LogByteSizeMergePolicy),
+ * merge.scheduler (default
+ * org.apache.lucene.index.ConcurrentMergeScheduler),
+ * concurrent.merge.scheduler.max.thread.count and
+ * concurrent.merge.scheduler.max.merge.count (defaults per
+ * ConcurrentMergeScheduler) </code>.
  * <p>
  * This task also supports a "writer.info.stream" property with the following
  * values:
@@ -66,6 +73,18 @@
       throw new RuntimeException("unable to instantiate class '" + mergeScheduler + "' as
merge scheduler", e);
     }
 
+    if (mergeScheduler.equals("org.apache.lucene.index.ConcurrentMergeScheduler")) {
+      ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) writer.getMergeScheduler();
+      int v = config.get("concurrent.merge.scheduler.max.thread.count", -1);
+      if (v != -1) {
+        cms.setMaxThreadCount(v);
+      }
+      v = config.get("concurrent.merge.scheduler.max.merge.count", -1);
+      if (v != -1) {
+        cms.setMaxMergeCount(v);
+      }
+    }
+
     final String mergePolicy = config.get("merge.policy",
                                           "org.apache.lucene.index.LogByteSizeMergePolicy");
     try {

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java?rev=892992&r1=892991&r2=892992&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java Mon Dec
21 21:38:24 2009
@@ -23,24 +23,45 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Collections;
 
 /** A {@link MergeScheduler} that runs each merge using a
- *  separate thread, up until a maximum number of threads
- *  ({@link #setMaxThreadCount}) at which when a merge is
- *  needed, the thread(s) that are updating the index will
- *  pause until one or more merges completes.  This is a
- *  simple way to use concurrency in the indexing process
- *  without having to create and manage application level
- *  threads. */
-
+ *  separate thread.
+ *
+ *  <p>Specify the max number of threads that may run at
+ *  once with {@link #setMaxThreadCount}.</p>
+ *
+ *  <p>Separately specify the maximum number of simultaneous
+ *  merges with {@link #setMaxMergeCount}.  If the number of
+ *  merges exceeds the max number of threads then the
+ *  largest merges are paused until one of the smaller
+ *  merges completes.</p>
+ *
+ *  <p>If more than {@link #getMaxMergeCount} merges are
+ *  requested then this class will forcefully throttle the
+ *  incoming threads by pausing until one more more merges
+ *  complete.</p>
+ */ 
 public class ConcurrentMergeScheduler extends MergeScheduler {
 
   private int mergeThreadPriority = -1;
 
   protected List<MergeThread> mergeThreads = new ArrayList<MergeThread>();
 
-  // Max number of threads allowed to be merging at once
-  private int maxThreadCount = 1;
+  // Max number of merge threads allowed to be running at
+  // once.  When there are more merges then this, we
+  // forcefully pause the larger ones, letting the smaller
+  // ones run, up until maxMergeCount merges at which point
+  // we forcefully pause incoming threads (that presumably
+  // are the ones causing so much merging).  We dynamically
+  // default this from 1 to 3, depending on how many cores
+  // you have:
+  private int maxThreadCount = Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors()/2));
+
+  // Max number of merges we accept before forcefully
+  // throttling the incoming threads
+  private int maxMergeCount = maxThreadCount+2;
 
   protected Directory dir;
 
@@ -55,23 +76,45 @@
     }
   }
 
-  /** Sets the max # simultaneous threads that may be
-   *  running.  If a merge is necessary yet we already have
-   *  this many threads running, the incoming thread (that
-   *  is calling add/updateDocument) will block until
-   *  a merge thread has completed. */
+  /** Sets the max # simultaneous merge threads that should
+   *  be running at once.  This must be <= {@link
+   *  #setMaxMergeCount}. */
   public void setMaxThreadCount(int count) {
-    if (count < 1)
+    if (count < 1) {
       throw new IllegalArgumentException("count should be at least 1");
+    }
+    if (count > maxMergeCount) {
+      throw new IllegalArgumentException("count should be <= maxMergeCount (= " + maxMergeCount
+ ")");
+    }
     maxThreadCount = count;
   }
 
-  /** Get the max # simultaneous threads that may be
-   *  running. @see #setMaxThreadCount. */
+  /** @see #setMaxThreadCount. */
   public int getMaxThreadCount() {
     return maxThreadCount;
   }
 
+  /** Sets the max # simultaneous merges that are allowed.
+   *  If a merge is necessary yet we already have this many
+   *  threads running, the incoming thread (that is calling
+   *  add/updateDocument) will block until a merge thread
+   *  has completed.  Note that we will only run the
+   *  smallest {@link #setMaxThreadCount} merges at a time. */
+  public void setMaxMergeCount(int count) {
+    if (count < 1) {
+      throw new IllegalArgumentException("count should be at least 1");
+    }
+    if (count < maxThreadCount) {
+      throw new IllegalArgumentException("count should be >= maxThreadCount (= " + maxThreadCount
+ ")");
+    }
+    maxMergeCount = count;
+  }
+
+  /** See {@link #setMaxMergeCount}. */
+  public int getMaxMergeCount() {
+    return maxMergeCount;
+  }
+
   /** Return the priority that merge threads run at.  By
    *  default the priority is 1 plus the priority of (ie,
    *  slightly higher priority than) the first thread that
@@ -81,16 +124,73 @@
     return mergeThreadPriority;
   }
 
-  /** Set the priority that merge threads run at. */
+  /** Set the base priority that merge threads run at.
+   *  Note that CMS may increase priority of some merge
+   *  threads beyond this base priority.  It's best not to
+   *  set this any higher than
+   *  Thread.MAX_PRIORITY-maxThreadCount, so that CMS has
+   *  room to set relative priority among threads.  */
   public synchronized void setMergeThreadPriority(int pri) {
     if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY)
       throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY
+ " .. " + Thread.MAX_PRIORITY + " inclusive");
     mergeThreadPriority = pri;
+    updateMergeThreads();
+  }
 
-    final int numThreads = mergeThreadCount();
-    for(int i=0;i<numThreads;i++) {
-      MergeThread merge = mergeThreads.get(i);
-      merge.setThreadPriority(pri);
+  // Larger merges come first
+  protected static class CompareByMergeDocCount implements Comparator<MergeThread>
{
+    public int compare(MergeThread t1, MergeThread t2) {
+      final MergePolicy.OneMerge m1 = t1.getCurrentMerge();
+      final MergePolicy.OneMerge m2 = t2.getCurrentMerge();
+      
+      final int c1 = m1 == null ? Integer.MAX_VALUE : m1.segments.totalDocCount();
+      final int c2 = m2 == null ? Integer.MAX_VALUE : m2.segments.totalDocCount();
+
+      return c2 - c1;
+    }
+  }
+
+  /** Called whenever the running merges have changed, to
+   *  pause & unpause threads. */
+  protected synchronized void updateMergeThreads() {
+
+    Collections.sort(mergeThreads, new CompareByMergeDocCount());
+    
+    final int count = mergeThreads.size();
+    int pri = mergeThreadPriority;
+    for(int i=0;i<count;i++) {
+      final MergeThread mergeThread = mergeThreads.get(i);
+      final MergePolicy.OneMerge merge = mergeThread.getCurrentMerge();
+      if (merge == null) {
+        continue;
+      }
+      final boolean doPause;
+      if (i < count-maxThreadCount) {
+        doPause = true;
+      } else {
+        doPause = false;
+      }
+
+      if (verbose()) {
+        if (doPause != merge.getPause()) {
+          if (doPause) {
+            message("pause thread " + mergeThread.getName());
+          } else {
+            message("unpause thread " + mergeThread.getName());
+          }
+        }
+      }
+      if (doPause != merge.getPause()) {
+        merge.setPause(doPause);
+      }
+
+      if (!doPause) {
+        if (verbose()) {
+          message("set priority of merge thread " + mergeThread.getName() + " to " + pri);
+        }
+        mergeThread.setThreadPriority(pri);
+        pri = Math.min(Thread.MAX_PRIORITY, 1+pri);
+      }
     }
   }
 
@@ -192,9 +292,12 @@
       try {
         synchronized(this) {
           final MergeThread merger;
-          while (mergeThreadCount() >= maxThreadCount) {
-            if (verbose())
-              message("    too many merge threads running; stalling...");
+          long startStallTime = 0;
+          while (mergeThreadCount() >= maxMergeCount) {
+            startStallTime = System.currentTimeMillis();
+            if (verbose()) {
+              message("    too many merges; stalling...");
+            }
             try {
               wait();
             } catch (InterruptedException ie) {
@@ -202,15 +305,20 @@
             }
           }
 
-          if (verbose())
+          if (verbose()) {
+            if (startStallTime != 0) {
+              message("  stalled for " + (System.currentTimeMillis()-startStallTime) + "
msec");
+            }
             message("  consider merge " + merge.segString(dir));
-      
-          assert mergeThreadCount() < maxThreadCount;
+          }
+
+          assert mergeThreadCount() < maxMergeCount;
 
           // OK to spawn a new merge thread to handle this
           // merge:
           merger = getMergeThread(writer, merge);
           mergeThreads.add(merger);
+          updateMergeThreads();
           if (verbose())
             message("    launch new thread [" + merger.getName() + "]");
 
@@ -245,6 +353,7 @@
     IndexWriter writer;
     MergePolicy.OneMerge startMerge;
     MergePolicy.OneMerge runningMerge;
+    private volatile boolean done;
 
     public MergeThread(IndexWriter writer, MergePolicy.OneMerge startMerge) throws IOException
{
       this.writer = writer;
@@ -259,6 +368,16 @@
       return runningMerge;
     }
 
+    public synchronized MergePolicy.OneMerge getCurrentMerge() {
+      if (done) {
+        return null;
+      } else if (runningMerge != null) {
+        return runningMerge;
+      } else {
+        return startMerge;
+      }
+    }
+
     public void setThreadPriority(int pri) {
       try {
         setPriority(pri);
@@ -292,10 +411,14 @@
           merge = writer.getNextMerge();
           if (merge != null) {
             writer.mergeInit(merge);
+            updateMergeThreads();
             if (verbose())
               message("  merge thread: do another merge " + merge.segString(dir));
-          } else
+          } else {
+            done = true;
+            updateMergeThreads();
             break;
+          }
         }
 
         if (verbose())
@@ -317,6 +440,7 @@
           ConcurrentMergeScheduler.this.notifyAll();
           boolean removed = mergeThreads.remove(this);
           assert removed;
+          updateMergeThreads();
         }
       }
     }

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java?rev=892992&r1=892991&r2=892992&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java Mon Dec 21 21:38:24
2009
@@ -3944,7 +3944,7 @@
       handleOOM(oom, "merge");
     }
     if (infoStream != null) {
-      message("merge time " + (System.currentTimeMillis()-t0) + " msec");
+      message("merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.docCount
+ " docs");
     }
   }
 

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/MergePolicy.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/MergePolicy.java?rev=892992&r1=892991&r2=892992&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/MergePolicy.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/MergePolicy.java Mon Dec 21 21:38:24
2009
@@ -85,6 +85,7 @@
     final boolean useCompoundFile;
     boolean aborted;
     Throwable error;
+    boolean paused;
 
     public OneMerge(SegmentInfos segments, boolean useCompoundFile) {
       if (0 == segments.size())
@@ -110,6 +111,7 @@
      *  not be committed. */
     synchronized void abort() {
       aborted = true;
+      notifyAll();
     }
 
     /** Returns true if this merge was aborted. */
@@ -118,8 +120,34 @@
     }
 
     synchronized void checkAborted(Directory dir) throws MergeAbortedException {
-      if (aborted)
+      if (aborted) {
         throw new MergeAbortedException("merge is aborted: " + segString(dir));
+      }
+
+      while (paused) {
+        try {
+          // In theory we could wait() indefinitely, but we
+          // do 1000 msec, defensively
+          wait(1000);
+        } catch (InterruptedException ie) {
+          throw new RuntimeException(ie);
+        }
+        if (aborted) {
+          throw new MergeAbortedException("merge is aborted: " + segString(dir));
+        }
+      }
+    }
+
+    synchronized public void setPause(boolean paused) {
+      this.paused = paused;
+      if (!paused) {
+        // Wakeup merge thread, if it's waiting
+        notifyAll();
+      }
+    }
+
+    synchronized public boolean getPause() {
+      return paused;
     }
 
     String segString(Directory dir) {
@@ -262,5 +290,4 @@
    * compound file format.
    */
   public abstract boolean useCompoundDocStore(SegmentInfos segments);
-  
 }

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java?rev=892992&r1=892991&r2=892992&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java Mon Dec 21 21:38:24
2009
@@ -911,4 +911,14 @@
         return true;
     return false;
   }
+
+  /** Returns sum of all segment's docCounts.  Note that
+   *  this does not include deletions */
+  public int totalDocCount() {
+    int count = 0;
+    for(SegmentInfo info : this) {
+      count += info.docCount;
+    }
+    return count;
+  }
 }



Mime
View raw message