lucene-java-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mikemcc...@apache.org
Subject svn commit: r576798 [1/3] - in /lucene/java/trunk/src: java/org/apache/lucene/index/ test/org/apache/lucene/index/ test/org/apache/lucene/store/ test/org/apache/lucene/util/
Date Tue, 18 Sep 2007 09:27:17 GMT
Author: mikemccand
Date: Tue Sep 18 02:27:14 2007
New Revision: 576798

URL: http://svn.apache.org/viewvc?rev=576798&view=rev
Log:
LUCENE-845, LUCENE-847, LUCENE-870: factor MergePolicy & MergeScheduler out of IndexWriter,
improve overall concurrency of IndexWriter, and add ConcurrentMergeScheduler

Added:
    lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java   (with
props)
    lucene/java/trunk/src/java/org/apache/lucene/index/LogByteSizeMergePolicy.java   (with
props)
    lucene/java/trunk/src/java/org/apache/lucene/index/LogDocMergePolicy.java   (with props)
    lucene/java/trunk/src/java/org/apache/lucene/index/LogMergePolicy.java   (with props)
    lucene/java/trunk/src/java/org/apache/lucene/index/MergePolicy.java   (with props)
    lucene/java/trunk/src/java/org/apache/lucene/index/MergeScheduler.java   (with props)
    lucene/java/trunk/src/java/org/apache/lucene/index/SerialMergeScheduler.java   (with props)
    lucene/java/trunk/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java 
 (with props)
    lucene/java/trunk/src/test/org/apache/lucene/index/TestThreadedOptimize.java   (with props)
Modified:
    lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/java/trunk/src/java/org/apache/lucene/index/IndexFileDeleter.java
    lucene/java/trunk/src/java/org/apache/lucene/index/IndexModifier.java
    lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/java/trunk/src/java/org/apache/lucene/index/PositionBasedTermVectorMapper.java
    lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfo.java
    lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java
    lucene/java/trunk/src/test/org/apache/lucene/index/DocHelper.java
    lucene/java/trunk/src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java
    lucene/java/trunk/src/test/org/apache/lucene/index/TestDoc.java
    lucene/java/trunk/src/test/org/apache/lucene/index/TestDocumentWriter.java
    lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java
    lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriterMergePolicy.java
    lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriterMerging.java
    lucene/java/trunk/src/test/org/apache/lucene/index/TestStressIndexing.java
    lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java
    lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMOutputStream.java
    lucene/java/trunk/src/test/org/apache/lucene/util/_TestUtil.java

Added: lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java?rev=576798&view=auto
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (added)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java Tue Sep
18 02:27:14 2007
@@ -0,0 +1,277 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.store.Directory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.ArrayList;
+
+/** A {@link MergeScheduler} that runs each merge using a
+ *  separate thread, up until a maximum number of threads
+ *  ({@link #setMaxThreadCount}) at which points merges are
+ *  run in the foreground, serially.  This is a simple way
+ *  to use concurrency in the indexing process without
+ *  having to create and manage application level
+ *  threads. */
+
+public class ConcurrentMergeScheduler implements MergeScheduler {
+
+  public static boolean VERBOSE = false;
+
+  private int mergeThreadPriority = -1;
+
+  private List mergeThreads = new ArrayList();
+  private int maxThreadCount = 3;
+
+  private List exceptions = new ArrayList();
+  private Directory dir;
+
+  /** Sets the max # simultaneous threads that may be
+   *  running.  If a merge is necessary yet we already have
+   *  this many threads running, the merge is returned back
+   *  to IndexWriter so that it runs in the "foreground". */
+  public void setMaxThreadCount(int count) {
+    if (count < 1)
+      throw new IllegalArgumentException("count should be at least 1");
+    maxThreadCount = count;
+  }
+
+  /** Get the max # simultaneous threads that may be
+   *  running. @see #setMaxThreadCount. */
+  public int getMaxThreadCount() {
+    return maxThreadCount;
+  }
+
+  /** Return the priority that merge threads run at.  By
+   *  default the priority is 1 plus the priority of (ie,
+   *  slightly higher priority than) the first thread that
+   *  calls merge. */
+  public synchronized int getMergeThreadPriority() {
+    initMergeThreadPriority();
+    return mergeThreadPriority;
+  }
+
+  /** Return the priority that merge threads run at. */
+  public synchronized void setMergeThreadPriority(int pri) {
+    mergeThreadPriority = pri;
+
+    final int numThreads = mergeThreads.size();
+    for(int i=0;i<numThreads;i++) {
+      MergeThread merge = (MergeThread) mergeThreads.get(i);
+      try {
+        merge.setPriority(pri);
+      } catch (NullPointerException npe) {
+        // Strangely, Sun's JDK 1.5 on Linux sometimes
+        // throws NPE out of here...
+      }
+    }
+  }
+
+  /** Returns any exceptions that were caught in the merge
+   *  threads. */
+  public List getExceptions() {
+    return exceptions;
+  }
+
+  private void message(String message) {
+    System.out.println("CMS [" + Thread.currentThread().getName() + "]: " + message);
+  }
+
+  private synchronized void initMergeThreadPriority() {
+    if (mergeThreadPriority == -1)
+      // Default to slightly higher priority than our
+      // calling thread
+      mergeThreadPriority = 1+Thread.currentThread().getPriority();
+  }
+
+  public void close() {}
+
+  private synchronized void finishThreads() {
+    while(mergeThreads.size() > 0) {
+      if (VERBOSE) {
+        message("now wait for threads; currently " + mergeThreads.size() + " still running");
+        for(int i=0;i<mergeThreads.size();i++) {
+          final MergeThread mergeThread = ((MergeThread) mergeThreads.get(i));
+          message("    " + i + ": " + mergeThread.merge.segString(dir));
+        }
+      }
+
+      try {
+        wait();
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  public void sync() {
+    finishThreads();
+  }
+
+  // Used for testing
+  private boolean suppressExceptions;
+
+  /** Used for testing */
+  void setSuppressExceptions() {
+    suppressExceptions = true;
+  }
+  void clearSuppressExceptions() {
+    suppressExceptions = false;
+  }
+
+  public void merge(IndexWriter writer)
+    throws CorruptIndexException, IOException {
+
+    initMergeThreadPriority();
+
+    dir = writer.getDirectory();
+
+    // First, quickly run through the newly proposed merges
+    // and add any orthogonal merges (ie a merge not
+    // involving segments already pending to be merged) to
+    // the queue.  If we are way behind on merging, many of
+    // these newly proposed merges will likely already be
+    // registered.
+
+    if (VERBOSE) {
+      message("now merge");
+      message("  index: " + writer.segString());
+    }
+
+    // Iterate, pulling from the IndexWriter's queue of
+    // pending merges, until its empty:
+    while(true) {
+
+      // TODO: we could be careful about which merges to do in
+      // the BG (eg maybe the "biggest" ones) vs FG, which
+      // merges to do first (the easiest ones?), etc.
+
+      MergePolicy.OneMerge merge = writer.getNextMerge();
+      if (merge == null) {
+        if (VERBOSE)
+          message("  no more merges pending; now return");
+        return;
+      }
+
+      // We do this w/ the primary thread to keep
+      // deterministic assignment of segment names
+      writer.mergeInit(merge);
+
+      if (VERBOSE)
+        message("  consider merge " + merge.segString(dir));
+      
+      if (merge.isExternal) {
+        if (VERBOSE)
+          message("    merge involves segments from an external directory; now run in foreground");
+      } else {
+        synchronized(this) {
+          if (mergeThreads.size() < maxThreadCount) {
+            // OK to spawn a new merge thread to handle this
+            // merge:
+            MergeThread merger = new MergeThread(writer, merge);
+            mergeThreads.add(merger);
+            if (VERBOSE)
+              message("    launch new thread [" + merger.getName() + "]");
+            try {
+              merger.setPriority(mergeThreadPriority);
+            } catch (NullPointerException npe) {
+              // Strangely, Sun's JDK 1.5 on Linux sometimes
+              // throws NPE out of here...
+            }
+            merger.start();
+            continue;
+          } else if (VERBOSE)
+            message("    too many merge threads running; run merge in foreground");
+        }
+      }
+
+      // Too many merge threads already running, so we do
+      // this in the foreground of the calling thread
+      writer.merge(merge);
+    }
+  }
+
+  private class MergeThread extends Thread {
+
+    IndexWriter writer;
+    MergePolicy.OneMerge merge;
+
+    public MergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException
{
+      this.writer = writer;
+      this.merge = merge;
+    }
+
+    public void run() {
+      try {
+
+        if (VERBOSE)
+          message("  merge thread: start");
+
+        // First time through the while loop we do the merge
+        // that we were started with:
+        MergePolicy.OneMerge merge = this.merge;
+
+        while(true) {
+          writer.merge(merge);
+
+          // Subsequent times through the loop we do any new
+          // merge that writer says is necessary:
+          merge = writer.getNextMerge();
+          if (merge != null) {
+            writer.mergeInit(merge);
+            if (VERBOSE)
+              message("  merge thread: do another merge " + merge.segString(dir));
+          } else
+            break;
+        }
+
+        if (VERBOSE)
+          message("  merge thread: done");
+
+      } catch (Throwable exc) {
+        // When a merge was aborted & IndexWriter closed,
+        // it's possible to get various IOExceptions,
+        // NullPointerExceptions, AlreadyClosedExceptions:
+        merge.setException(exc);
+        writer.addMergeException(merge);
+
+        if (!merge.isAborted()) {
+          // If the merge was not aborted then the exception
+          // is real
+          exceptions.add(exc);
+          
+          if (!suppressExceptions)
+            // suppressExceptions is normally only set during
+            // testing.
+            throw new MergePolicy.MergeException(exc);
+        }
+      } finally {
+        synchronized(ConcurrentMergeScheduler.this) {
+          mergeThreads.remove(this);
+          ConcurrentMergeScheduler.this.notifyAll();
+        }
+      }
+    }
+
+    public String toString() {
+      return "merge thread: " + merge.segString(dir);
+    }
+  }
+}

Propchange: lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=576798&r1=576797&r2=576798&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java Tue Sep 18 02:27:14
2007
@@ -113,6 +113,7 @@
 
   private int nextDocID;                          // Next docID to be added
   private int numDocsInRAM;                       // # docs buffered in RAM
+  private int numDocsInStore;                     // # docs written to doc stores
   private int nextWriteDocID;                     // Next docID to be written
 
   // Max # ThreadState instances; if there are more threads
@@ -238,6 +239,7 @@
       String s = docStoreSegment;
       docStoreSegment = null;
       docStoreOffset = 0;
+      numDocsInStore = 0;
       return s;
     } else {
       return null;
@@ -245,6 +247,11 @@
   }
 
   private List files = null;                      // Cached list of files we've created
+  private List abortedFiles = null;               // List of files that were written before
last abort()
+
+  List abortedFiles() {
+    return abortedFiles;
+  }
 
   /* Returns list of files in use by this instance,
    * including any flushed segments. */
@@ -278,6 +285,9 @@
    *  docs added since last flush. */
   synchronized void abort() throws IOException {
 
+    if (infoStream != null)
+      infoStream.println("docWriter: now abort");
+
     // Forcefully remove waiting ThreadStates from line
     for(int i=0;i<numWaiting;i++)
       waitingThreadStates[i].isIdle = true;
@@ -290,6 +300,8 @@
 
     try {
 
+      abortedFiles = files();
+
       // Discard pending norms:
       final int numField = fieldInfos.size();
       for (int i=0;i<numField;i++) {
@@ -332,6 +344,7 @@
       }
 
       files = null;
+
     } finally {
       resumeAllThreads();
     }
@@ -398,7 +411,7 @@
 
     newFiles = new ArrayList();
 
-    docStoreOffset += numDocsInRAM;
+    docStoreOffset = numDocsInStore;
 
     if (closeDocStore) {
       assert docStoreSegment != null;
@@ -2119,6 +2132,7 @@
       segment = writer.newSegmentName();
 
     numDocsInRAM++;
+    numDocsInStore++;
 
     // We must at this point commit to flushing to ensure we
     // always get N docs when we flush by doc count, even if

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/IndexFileDeleter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/IndexFileDeleter.java?rev=576798&r1=576797&r2=576798&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/IndexFileDeleter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/IndexFileDeleter.java Tue Sep 18 02:27:14
2007
@@ -105,7 +105,7 @@
   }
   
   private void message(String message) {
-    infoStream.println(this + " " + Thread.currentThread().getName() + ": " + message);
+    infoStream.println("Deleter [" + Thread.currentThread().getName() + "]: " + message);
   }
 
   /**
@@ -275,25 +275,59 @@
    * Writer calls this when it has hit an error and had to
    * roll back, to tell us that there may now be
    * unreferenced files in the filesystem.  So we re-list
-   * the filesystem and delete such files:
+   * the filesystem and delete such files.  If segmentName
+   * is non-null, we will only delete files corresponding to
+   * that segment.
    */
-  public void refresh() throws IOException {
+  public void refresh(String segmentName) throws IOException {
     String[] files = directory.list();
     if (files == null)
       throw new IOException("cannot read directory " + directory + ": list() returned null");
     IndexFileNameFilter filter = IndexFileNameFilter.getFilter();
+    String segmentPrefix1;
+    String segmentPrefix2;
+    if (segmentName != null) {
+      segmentPrefix1 = segmentName + ".";
+      segmentPrefix2 = segmentName + "_";
+    } else {
+      segmentPrefix1 = null;
+      segmentPrefix2 = null;
+    }
+    
     for(int i=0;i<files.length;i++) {
       String fileName = files[i];
-      if (filter.accept(null, fileName) && !refCounts.containsKey(fileName) &&
!fileName.equals(IndexFileNames.SEGMENTS_GEN)) {
+      if (filter.accept(null, fileName) &&
+          (segmentName == null || fileName.startsWith(segmentPrefix1) || fileName.startsWith(segmentPrefix2))
&&
+          !refCounts.containsKey(fileName) &&
+          !fileName.equals(IndexFileNames.SEGMENTS_GEN)) {
         // Unreferenced file, so remove it
         if (infoStream != null) {
-          message("refresh: removing newly created unreferenced file \"" + fileName + "\"");
+          message("refresh [prefix=" + segmentName + "]: removing newly created unreferenced
file \"" + fileName + "\"");
         }
         deleteFile(fileName);
       }
     }
   }
 
+  public void refresh() throws IOException {
+    refresh(null);
+  }
+
+  public void close() throws IOException {
+    deletePendingFiles();
+  }
+
+  private void deletePendingFiles() throws IOException {
+    if (deletable != null) {
+      List oldDeletable = deletable;
+      deletable = null;
+      int size = oldDeletable.size();
+      for(int i=0;i<size;i++) {
+        deleteFile((String) oldDeletable.get(i));
+      }
+    }
+  }
+
   /**
    * For definition of "check point" see IndexWriter comments:
    * "Clarification: Check Points (and commits)".
@@ -322,19 +356,17 @@
 
     // Try again now to delete any previously un-deletable
     // files (because they were in use, on Windows):
-    if (deletable != null) {
-      List oldDeletable = deletable;
-      deletable = null;
-      int size = oldDeletable.size();
-      for(int i=0;i<size;i++) {
-        deleteFile((String) oldDeletable.get(i));
-      }
-    }
+    deletePendingFiles();
 
     // Incref the files:
     incRef(segmentInfos, isCommit);
-    if (docWriter != null)
-      incRef(docWriter.files());
+    final List docWriterFiles;
+    if (docWriter != null) {
+      docWriterFiles = docWriter.files();
+      if (docWriterFiles != null)
+        incRef(docWriterFiles);
+    } else
+      docWriterFiles = null;
 
     if (isCommit) {
       // Append to our commits list:
@@ -364,9 +396,9 @@
           lastFiles.add(segmentInfo.files());
         }
       }
-      if (docWriter != null)
-        lastFiles.add(docWriter.files());
     }
+    if (docWriterFiles != null)
+      lastFiles.add(docWriterFiles);
   }
 
   void incRef(SegmentInfos segmentInfos, boolean isCommit) throws IOException {
@@ -385,7 +417,7 @@
     }
   }
 
-  private void incRef(List files) throws IOException {
+  void incRef(List files) throws IOException {
     int size = files.size();
     for(int i=0;i<size;i++) {
       String fileName = (String) files.get(i);
@@ -397,7 +429,7 @@
     }
   }
 
-  private void decRef(List files) throws IOException {
+  void decRef(List files) throws IOException {
     int size = files.size();
     for(int i=0;i<size;i++) {
       decRef((String) files.get(i));
@@ -438,7 +470,22 @@
     return rc;
   }
 
-  private void deleteFile(String fileName)
+  void deleteFiles(List files) throws IOException {
+    final int size = files.size();
+    for(int i=0;i<size;i++)
+      deleteFile((String) files.get(i));
+  }
+
+  /** Delets the specified files, but only if they are new
+   *  (have not yet been incref'd). */
+  void deleteNewFiles(List files) throws IOException {
+    final int size = files.size();
+    for(int i=0;i<size;i++)
+      if (!refCounts.containsKey(files.get(i)))
+        deleteFile((String) files.get(i));
+  }
+
+  void deleteFile(String fileName)
        throws IOException {
     try {
       if (infoStream != null) {
@@ -490,11 +537,12 @@
 
     int count;
 
-    final private int IncRef() {
+    final public int IncRef() {
       return ++count;
     }
 
-    final private int DecRef() {
+    final public int DecRef() {
+      assert count > 0;
       return --count;
     }
   }

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/IndexModifier.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/IndexModifier.java?rev=576798&r1=576797&r2=576798&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/IndexModifier.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/IndexModifier.java Tue Sep 18 02:27:14
2007
@@ -202,6 +202,10 @@
         indexReader = null;
       }
       indexWriter = new IndexWriter(directory, analyzer, false);
+      // IndexModifier cannot use ConcurrentMergeScheduler
+      // because it synchronizes on the directory which can
+      // cause deadlock
+      indexWriter.setMergeScheduler(new SerialMergeScheduler());
       indexWriter.setInfoStream(infoStream);
       indexWriter.setUseCompoundFile(useCompoundFile);
       if (maxBufferedDocs != 0)



Mime
View raw message