lucene-java-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mikemcc...@apache.org
Subject svn commit: r690537 - in /lucene/java/trunk/src: java/org/apache/lucene/index/ test/org/apache/lucene/index/
Date Sat, 30 Aug 2008 17:16:32 GMT
Author: mikemccand
Date: Sat Aug 30 10:16:29 2008
New Revision: 690537

URL: http://svn.apache.org/viewvc?rev=690537&view=rev
Log:
LUCENE-1135: fix concurrency issues in IndexWriter when calling commit, rollback, close, optimize, addIndexes*

Modified:
    lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
    lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java
    lucene/java/trunk/src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java
    lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java?rev=690537&r1=690536&r2=690537&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java Sat Aug 30 10:16:29 2008
@@ -187,24 +187,15 @@
 
         message("  consider merge " + merge.segString(dir));
       
-        if (merge.isExternal) {
-          message("    merge involves segments from an external directory; now run in foreground");
-        } else {
-          assert mergeThreadCount() < maxThreadCount;
-
-          // OK to spawn a new merge thread to handle this
-          // merge:
-          final MergeThread merger = getMergeThread(writer, merge);
-          mergeThreads.add(merger);
-          message("    launch new thread [" + merger.getName() + "]");
-          merger.start();
-          continue;
-        }
-      }
+        assert mergeThreadCount() < maxThreadCount;
 
-      // This merge involves segments outside our index
-      // Directory so we must merge in foreground
-      doMerge(merge);
+        // OK to spawn a new merge thread to handle this
+        // merge:
+        final MergeThread merger = getMergeThread(writer, merge);
+        mergeThreads.add(merger);
+        message("    launch new thread [" + merger.getName() + "]");
+        merger.start();
+      }
     }
   }
 

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java?rev=690537&r1=690536&r2=690537&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java Sat Aug 30 10:16:29 2008
@@ -300,7 +300,7 @@
 
   private Similarity similarity = Similarity.getDefault(); // how to normalize
 
-  private volatile long changeCount; // increments every a change is completed
+  private volatile long changeCount; // increments every time a change is completed
   private long lastCommitChangeCount; // last changeCount that was committed
 
   private SegmentInfos rollbackSegmentInfos;      // segmentInfos we will fallback to if the commit fails
@@ -345,18 +345,58 @@
   private int flushDeletesCount;
   private double maxSyncPauseSeconds = DEFAULT_MAX_SYNC_PAUSE_SECONDS;
 
+  // Used to only allow one addIndexes to proceed at once
+  // TODO: use ReadWriteLock once we are on 5.0
+  private int readCount;                          // count of how many threads are holding read lock
+  private Thread writeThread;                     // non-null if any thread holds write lock
+    
+  synchronized void acquireWrite() {
+    while(writeThread != null || readCount > 0)
+      doWait();
+
+    // We could have been closed while we were waiting:
+    ensureOpen();
+
+    writeThread = Thread.currentThread();
+  }
+
+  synchronized void releaseWrite() {
+    assert Thread.currentThread() == writeThread;
+    writeThread = null;
+    notifyAll();
+  }
+
+  synchronized void acquireRead() {
+    final Thread current = Thread.currentThread();
+    while(writeThread != null && writeThread != current)
+      doWait();
+
+    readCount++;
+  }
+
+  synchronized void releaseRead() {
+    readCount--;
+    assert readCount >= 0;
+    if (0 == readCount)
+      notifyAll();
+  }
+
   /**
    * Used internally to throw an {@link
    * AlreadyClosedException} if this IndexWriter has been
    * closed.
    * @throws AlreadyClosedException if this IndexWriter is
    */
-  protected final void ensureOpen() throws AlreadyClosedException {
-    if (closed) {
+  protected synchronized final void ensureOpen(boolean includePendingClose) throws AlreadyClosedException {
+    if (closed || (includePendingClose && closing)) {
       throw new AlreadyClosedException("this IndexWriter is closed");
     }
   }
 
+  protected synchronized final void ensureOpen() throws AlreadyClosedException {
+    ensureOpen(true);
+  }
+
   /**
    * Prints a message to the infoStream (if non-null),
    * prefixed with the identifying information for this
@@ -469,7 +509,8 @@
    * @see #setTermIndexInterval(int)
    */
   public int getTermIndexInterval() {
-    ensureOpen();
+    // We pass false because this method is called by SegmentMerger while we are in the process of closing
+    ensureOpen(false);
     return termIndexInterval;
   }
 
@@ -1127,7 +1168,7 @@
       }
 
       this.autoCommit = autoCommit;
-      setRollbackSegmentInfos();
+      setRollbackSegmentInfos(segmentInfos);
 
       docWriter = new DocumentsWriter(directory, this);
       docWriter.setInfoStream(infoStream);
@@ -1153,8 +1194,9 @@
     }
   }
 
-  private void setRollbackSegmentInfos() {
-    rollbackSegmentInfos = (SegmentInfos) segmentInfos.clone();
+  private synchronized void setRollbackSegmentInfos(SegmentInfos infos) {
+    rollbackSegmentInfos = (SegmentInfos) infos.clone();
+    assert !hasExternalSegments(rollbackSegmentInfos);
     rollbackSegments = new HashMap();
     final int size = rollbackSegmentInfos.size();
     for(int i=0;i<size;i++)
@@ -1189,7 +1231,7 @@
   /**
    * Expert: set the merge scheduler used by this writer.
    */
-  public void setMergeScheduler(MergeScheduler mergeScheduler) throws CorruptIndexException, IOException {
+  synchronized public void setMergeScheduler(MergeScheduler mergeScheduler) throws CorruptIndexException, IOException {
     ensureOpen();
     if (mergeScheduler == null)
       throw new NullPointerException("MergeScheduler must be non-null");
@@ -1645,10 +1687,7 @@
           // Another thread is presently trying to close;
           // wait until it finishes one way (closes
           // successfully) or another (fails to close)
-          try {
-            wait();
-          } catch (InterruptedException ie) {
-          }
+          doWait();
         }
       } else
         return false;
@@ -1656,6 +1695,9 @@
   }
 
   private void closeInternal(boolean waitForMerges) throws CorruptIndexException, IOException {
+
+    docWriter.pauseAllThreads();
+
     try {
       if (infoStream != null)
         message("now flush at close");
@@ -1707,8 +1749,12 @@
       synchronized(this) {
         closing = false;
         notifyAll();
-        if (!closed && infoStream != null)
-          message("hit exception while closing");
+        if (!closed) {
+          if (docWriter != null)
+            docWriter.resumeAllThreads();
+          if (infoStream != null)
+            message("hit exception while closing");
+        }
       }
     }
   }
@@ -1792,8 +1838,9 @@
   }
 
   /** Returns the Directory used by this index. */
-  public Directory getDirectory() {
-    ensureOpen();
+  public Directory getDirectory() {     
+    // Pass false because the flush during closing calls getDirectory
+    ensureOpen(false);
     return directory;
   }
 
@@ -2287,11 +2334,7 @@
     if (doWait) {
       synchronized(this) {
         while(optimizeMergesPending()) {
-          try {
-            wait();
-          } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-          }
+          doWait();
 
           if (mergeExceptions.size() > 0) {
             // Forward any exceptions in background merge
@@ -2310,6 +2353,12 @@
           }
         }
       }
+
+      // If close is called while we are still
+      // running, throw an exception so the calling
+      // thread will know the optimize did not
+      // complete
+      ensureOpen();
     }
 
     // NOTE: in the ConcurrentMergeScheduler case, when
@@ -2364,6 +2413,9 @@
         boolean running = true;
         while(running) {
 
+          // Check each merge that MergePolicy asked us to
+          // do, to see if any of them are still running and
+          // if any of them have hit an exception.
           running = false;
           for(int i=0;i<numMerges;i++) {
             final MergePolicy.OneMerge merge = (MergePolicy.OneMerge) spec.merges.get(i);
@@ -2377,13 +2429,9 @@
             }
           }
 
-          if (running) {
-            try {
-              wait();
-            } catch (InterruptedException ie) {
-              Thread.currentThread().interrupt();
-            }
-          }
+          // If any of our merges are still running, wait:
+          if (running)
+            doWait();
         }
       }
     }
@@ -2481,6 +2529,28 @@
     }
   }
 
+  /** Like getNextMerge() except only returns a merge if it's
+   *  external. */
+  private synchronized MergePolicy.OneMerge getNextExternalMerge() {
+    if (pendingMerges.size() == 0)
+      return null;
+    else {
+      Iterator it = pendingMerges.iterator();
+      while(it.hasNext()) {
+        // Advance the merge from pending to running
+        MergePolicy.OneMerge merge = (MergePolicy.OneMerge) it.next();
+        if (merge.isExternal) {
+          it.remove();
+          runningMerges.add(merge);
+          return merge;
+        }
+      }
+
+      // All existing merges do not involve external segments
+      return null;
+    }
+  }
+
   /*
    * Begin a transaction.  During a transaction, any segment
    * merges that happen (or ram segments flushed) will not
@@ -2494,33 +2564,66 @@
    * within the transactions, so they must be flushed before the
    * transaction is started.
    */
-  private synchronized void startTransaction() throws IOException {
+  private synchronized void startTransaction(boolean haveWriteLock) throws IOException {
 
-    if (infoStream != null)
-      message("now start transaction");
+    boolean success = false;
+    try {
+      if (infoStream != null)
+        message("now start transaction");
 
-    assert docWriter.getNumBufferedDeleteTerms() == 0 :
-           "calling startTransaction with buffered delete terms not supported";
-    assert docWriter.getNumDocsInRAM() == 0 :
-           "calling startTransaction with buffered documents not supported";
-
-    localRollbackSegmentInfos = (SegmentInfos) segmentInfos.clone();
-    localAutoCommit = autoCommit;
-    localFlushedDocCount = docWriter.getFlushedDocCount();
+      assert docWriter.getNumBufferedDeleteTerms() == 0 :
+      "calling startTransaction with buffered delete terms not supported: numBufferedDeleteTerms=" + docWriter.getNumBufferedDeleteTerms();
+      assert docWriter.getNumDocsInRAM() == 0 :
+      "calling startTransaction with buffered documents not supported: numDocsInRAM=" + docWriter.getNumDocsInRAM();
+
+      ensureOpen();
+
+      // If a transaction is trying to roll back (because
+      // addIndexes hit an exception) then wait here until
+      // that's done:
+      synchronized(this) {
+        while(stopMerges)
+          doWait();
+      }
+      success = true;
+    } finally {
+      // Releaes the write lock if our caller held it, on
+      // hitting an exception
+      if (!success && haveWriteLock)
+        releaseWrite();
+    }
 
-    if (localAutoCommit) {
+    if (!haveWriteLock)
+      acquireWrite();
 
-      if (infoStream != null)
-        message("flush at startTransaction");
+    success = false;
+    try {
+      localRollbackSegmentInfos = (SegmentInfos) segmentInfos.clone();
 
-      flush(true, false, false);
+      assert !hasExternalSegments(segmentInfos);
 
-      // Turn off auto-commit during our local transaction:
-      autoCommit = false;
-    } else
-      // We must "protect" our files at this point from
-      // deletion in case we need to rollback:
-      deleter.incRef(segmentInfos, false);
+      localAutoCommit = autoCommit;
+      localFlushedDocCount = docWriter.getFlushedDocCount();
+
+      if (localAutoCommit) {
+
+        if (infoStream != null)
+          message("flush at startTransaction");
+
+        flush(true, false, false);
+
+        // Turn off auto-commit during our local transaction:
+        autoCommit = false;
+      } else
+        // We must "protect" our files at this point from
+        // deletion in case we need to rollback:
+        deleter.incRef(segmentInfos, false);
+
+      success = true;
+    } finally {
+      if (!success)
+        finishAddIndexes();
+    }
   }
 
   /*
@@ -2536,6 +2639,12 @@
     autoCommit = localAutoCommit;
     docWriter.setFlushedDocCount(localFlushedDocCount);
 
+    // Must finish merges before rolling back segmentInfos
+    // so merges don't hit exceptions on trying to commit
+    // themselves, don't get files deleted out from under
+    // them, etc:
+    finishMerges(false);
+
     // Keep the same segmentInfos instance but replace all
     // of its SegmentInfo instances.  This is so the next
     // attempt to commit using this instance of IndexWriter
@@ -2544,6 +2653,11 @@
     segmentInfos.addAll(localRollbackSegmentInfos);
     localRollbackSegmentInfos = null;
 
+    // This must come after we rollback segmentInfos, so
+    // that if a commit() kicks off it does not see the
+    // segmentInfos with external segments
+    finishAddIndexes();
+
     // Ask deleter to locate unreferenced files we had
     // created & remove them:
     deleter.checkpoint(segmentInfos, false);
@@ -2552,9 +2666,16 @@
       // Remove the incRef we did in startTransaction:
       deleter.decRef(segmentInfos);
 
+    // Also ask deleter to remove any newly created files
+    // that were never incref'd; this "garbage" is created
+    // when a merge kicks off but aborts part way through
+    // before it had a chance to incRef the files it had
+    // partially created
     deleter.refresh();
-    finishMerges(false);
-    stopMerges = false;
+    
+    notifyAll();
+
+    assert !hasExternalSegments();
   }
 
   /*
@@ -2590,6 +2711,10 @@
       deleter.decRef(localRollbackSegmentInfos);
 
     localRollbackSegmentInfos = null;
+
+    assert !hasExternalSegments();
+
+    finishAddIndexes();
   }
 
   /**
@@ -2626,6 +2751,8 @@
 
     boolean success = false;
 
+    docWriter.pauseAllThreads();
+
     try {
       finishMerges(false);
 
@@ -2651,6 +2778,8 @@
         // once").
         segmentInfos.clear();
         segmentInfos.addAll(rollbackSegmentInfos);
+
+        assert !hasExternalSegments();
         
         docWriter.abort();
 
@@ -2671,6 +2800,7 @@
     } finally {
       synchronized(this) {
         if (!success) {
+          docWriter.resumeAllThreads();
           closing = false;
           notifyAll();
           if (infoStream != null)
@@ -2706,6 +2836,12 @@
         merge.abort();
       }
 
+      // Ensure any running addIndexes finishes.  It's fine
+      // if a new one attempts to start because its merges
+      // will quickly see the stopMerges == true and abort.
+      acquireRead();
+      releaseRead();
+
       // These merges periodically check whether they have
       // been aborted, and stop if so.  We wait here to make
       // sure they all stop.  It should not take very long
@@ -2714,25 +2850,27 @@
       while(runningMerges.size() > 0) {
         if (infoStream != null)
           message("now wait for " + runningMerges.size() + " running merge to abort");
-        try {
-          wait();
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
-        }
+        doWait();
       }
 
+      stopMerges = false;
+      notifyAll();
+
       assert 0 == mergingSegments.size();
 
       if (infoStream != null)
         message("all running merges have aborted");
 
     } else {
-      while(pendingMerges.size() > 0 || runningMerges.size() > 0) {
-        try {
-          wait();
-        } catch (InterruptedException ie) {
-        }
-      }
+      // Ensure any running addIndexes finishes.  It's fine
+      // if a new one attempts to start because from our
+      // caller above the call will see that we are in the
+      // process of closing, and will throw an
+      // AlreadyClosedException.
+      acquireRead();
+      releaseRead();
+      while(pendingMerges.size() > 0 || runningMerges.size() > 0)
+        doWait();
       assert 0 == mergingSegments.size();
     }
   }
@@ -2747,6 +2885,31 @@
     deleter.checkpoint(segmentInfos, false);
   }
 
+  private void finishAddIndexes() {
+    releaseWrite();
+  }
+
+  private void blockAddIndexes(boolean includePendingClose) {
+
+    acquireRead();
+
+    boolean success = false;
+    try {
+
+      // Make sure we are still open since we could have
+      // waited quite a while for last addIndexes to finish
+      ensureOpen(includePendingClose);
+      success = true;
+    } finally {
+      if (!success)
+        releaseRead();
+    }
+  }
+
+  private void resumeAddIndexes() {
+    releaseRead();
+  }
+
   /** Merges all segments from an array of indexes into this index.
    * @deprecated Use {@link #addIndexesNoOptimize} instead,
    * then separately call {@link #optimize} afterwards if
@@ -2758,6 +2921,8 @@
     throws CorruptIndexException, IOException {
 
     ensureOpen();
+    
+    noDupDirs(dirs);
 
     // Do not allow add docs or deletes while we are running:
     docWriter.pauseAllThreads();
@@ -2770,18 +2935,20 @@
 
       boolean success = false;
 
-      startTransaction();
+      startTransaction(false);
 
       try {
 
         int docCount = 0;
         synchronized(this) {
+          ensureOpen();
           for (int i = 0; i < dirs.length; i++) {
             SegmentInfos sis = new SegmentInfos();	  // read infos from dir
             sis.read(dirs[i]);
             for (int j = 0; j < sis.size(); j++) {
               final SegmentInfo info = sis.info(j);
               docCount += info.docCount;
+              assert !segmentInfos.contains(info);
               segmentInfos.addElement(info);	  // add each info
             }
           }
@@ -2813,6 +2980,17 @@
     mergeGen++;
   }
 
+  private void noDupDirs(Directory[] dirs) {
+    HashSet dups = new HashSet();
+    for(int i=0;i<dirs.length;i++) {
+      if (dups.contains(dirs[i]))
+        throw new IllegalArgumentException("Directory " + dirs[i] + " appears more than once");
+      if (dirs[i] == directory)
+        throw new IllegalArgumentException("Cannot add directory to itself");
+      dups.add(dirs[i]);
+    }
+  }
+
   /**
    * Merges all segments from an array of indexes into this
    * index.
@@ -2864,6 +3042,8 @@
 
     ensureOpen();
 
+    noDupDirs(dirs);
+
     // Do not allow add docs or deletes while we are running:
     docWriter.pauseAllThreads();
 
@@ -2874,12 +3054,14 @@
 
       boolean success = false;
 
-      startTransaction();
+      startTransaction(false);
 
       try {
 
         int docCount = 0;
         synchronized(this) {
+          ensureOpen();
+
           for (int i = 0; i < dirs.length; i++) {
             if (directory == dirs[i]) {
               // cannot add this index: segments may be deleted in merge before added
@@ -2890,6 +3072,7 @@
             sis.read(dirs[i]);
             for (int j = 0; j < sis.size(); j++) {
               SegmentInfo info = sis.info(j);
+              assert !segmentInfos.contains(info): "dup info dir=" + info.dir + " name=" + info.name;
               docCount += info.docCount;
               segmentInfos.addElement(info); // add each info
             }
@@ -2901,12 +3084,16 @@
 
         maybeMerge();
 
+        ensureOpen();
+
         // If after merging there remain segments in the index
         // that are in a different directory, just copy these
         // over into our index.  This is necessary (before
         // finishing the transaction) to avoid leaving the
         // index in an unusable (inconsistent) state.
-        copyExternalSegments();
+        resolveExternalSegments();
+
+        ensureOpen();
 
         success = true;
 
@@ -2925,47 +3112,85 @@
     }
   }
 
+  private boolean hasExternalSegments() {
+    return hasExternalSegments(segmentInfos);
+  }
+
+  private boolean hasExternalSegments(SegmentInfos infos) {
+    final int numSegments = infos.size();
+    for(int i=0;i<numSegments;i++)
+      if (infos.info(i).dir != directory)
+        return true;
+    return false;
+  }
+
   /* If any of our segments are using a directory != ours
-   * then copy them over.  Currently this is only used by
+   * then we have to either copy them over one by one, merge
+   * them (if merge policy has chosen to) or wait until
+   * currently running merges (in the background) complete.
+   * We don't return until the SegmentInfos has no more
+   * external segments.  Currently this is only used by
    * addIndexesNoOptimize(). */
-  private void copyExternalSegments() throws CorruptIndexException, IOException {
+  private void resolveExternalSegments() throws CorruptIndexException, IOException {
 
     boolean any = false;
 
-    while(true) {
+    boolean done = false;
+
+    while(!done) {
       SegmentInfo info = null;
       MergePolicy.OneMerge merge = null;
       synchronized(this) {
+
+        if (stopMerges)
+          throw new MergePolicy.MergeAbortedException("rollback() was called or addIndexes* hit an unhandled exception");
+
         final int numSegments = segmentInfos.size();
+
+        done = true;
         for(int i=0;i<numSegments;i++) {
           info = segmentInfos.info(i);
           if (info.dir != directory) {
-            merge = new MergePolicy.OneMerge(segmentInfos.range(i, 1+i), info.getUseCompoundFile());
-            break;
+            done = false;
+            final MergePolicy.OneMerge newMerge = new MergePolicy.OneMerge(segmentInfos.range(i, 1+i), info.getUseCompoundFile());
+
+            // Returns true if no running merge conflicts
+            // with this one (and, records this merge as
+            // pending), ie, this segment is not currently
+            // being merged:
+            if (registerMerge(newMerge)) {
+              merge = newMerge;
+
+              // If this segment is not currently being
+              // merged, then advance it to running & run
+              // the merge ourself (below):
+              pendingMerges.remove(merge);
+              runningMerges.add(merge);
+              break;
+            }
           }
         }
+
+        if (!done && merge == null)
+          // We are not yet done (external segments still
+          // exist in segmentInfos), yet, all such segments
+          // are currently "covered" by a pending or running
+          // merge.  We now try to grab any pending merge
+          // that involves external segments:
+          merge = getNextExternalMerge();
+
+        if (!done && merge == null)
+          // We are not yet done, and, all external segments
+          // fall under merges that the merge scheduler is
+          // currently running.  So, we now wait and check
+          // back to see if the merge has completed.
+          doWait();
       }
 
       if (merge != null) {
-        if (registerMerge(merge)) {
-          pendingMerges.remove(merge);
-          runningMerges.add(merge);
-          any = true;
-          merge(merge);
-        } else
-          // This means there is a bug in the
-          // MergeScheduler.  MergeSchedulers in general are
-          // not allowed to run a merge involving segments
-          // external to this IndexWriter's directory in the
-          // background because this would put the index
-          // into an inconsistent state (where segmentInfos
-          // has been written with such external segments
-          // that an IndexReader would fail to load).
-          throw new MergePolicy.MergeException("segment \"" + info.name + " exists in external directory yet the MergeScheduler executed the merge in a separate thread",
-                                               directory);
-      } else
-        // No more external segments
-        break;
+        any = true;
+        merge(merge);
+      }
     }
 
     if (any)
@@ -3003,31 +3228,54 @@
     // Do not allow add docs or deletes while we are running:
     docWriter.pauseAllThreads();
 
+    // We must pre-acquire the write lock here (and not in
+    // startTransaction below) so that no other addIndexes
+    // is allowed to start up after we have flushed &
+    // optimized but before we then start our transaction.
+    // This is because the merging below requires that only
+    // one segment is present in the index:
+    acquireWrite();
+
     try {
-      optimize();					  // start with zero or 1 seg
 
-      final String mergedName = newSegmentName();
-      SegmentMerger merger = new SegmentMerger(this, mergedName, null);
+      boolean success = false;
+      SegmentInfo info = null;
+      String mergedName = null;
+      SegmentMerger merger = null;
+
+      try {
+        flush(true, false, true);
+        optimize();					  // start with zero or 1 seg
+        success = true;
+      } finally {
+        // Take care to release the write lock if we hit an
+        // exception before starting the transaction
+        if (!success)
+          releaseWrite();
+      }
 
-      SegmentInfo info;
+      // true means we already have write lock; if this call
+      // hits an exception it will release the write lock:
+      startTransaction(true);
 
-      IndexReader sReader = null;
       try {
+        mergedName = newSegmentName();
+        merger = new SegmentMerger(this, mergedName, null);
+
+        IndexReader sReader = null;
         synchronized(this) {
-          if (segmentInfos.size() == 1){ // add existing index, if any
+          if (segmentInfos.size() == 1) { // add existing index, if any
             sReader = SegmentReader.get(true, segmentInfos.info(0));
-            merger.add(sReader);
           }
         }
 
-        for (int i = 0; i < readers.length; i++)      // add new indexes
-          merger.add(readers[i]);
-
-        boolean success = false;
+        try {
+          if (sReader != null)
+            merger.add(sReader);
 
-        startTransaction();
+          for (int i = 0; i < readers.length; i++)      // add new indexes
+            merger.add(readers[i]);
 
-        try {
           int docCount = merger.merge();                // merge 'em
 
           if(sReader != null) {
@@ -3048,43 +3296,61 @@
           success = true;
 
         } finally {
-          if (!success) {
-            if (infoStream != null)
-              message("hit exception in addIndexes during merge");
-
-            rollbackTransaction();
-          } else {
-            commitTransaction();
+          if (sReader != null) {
+            sReader.close();
           }
         }
       } finally {
-        if (sReader != null) {
-          sReader.close();
+        if (!success) {
+          if (infoStream != null)
+            message("hit exception in addIndexes during merge");
+          rollbackTransaction();
+        } else {
+          commitTransaction();
         }
       }
     
       if (mergePolicy instanceof LogMergePolicy && getUseCompoundFile()) {
 
-        boolean success = false;
-
-        startTransaction();
+        List files = null;
 
-        try {
-          merger.createCompoundFile(mergedName + ".cfs");
-          synchronized(this) {
-            info.setUseCompoundFile(true);
+        synchronized(this) {
+          // Must incRef our files so that if another thread
+          // is running merge/optimize, it doesn't delete our
+          // segment's files before we have a change to
+          // finish making the compound file.
+          if (segmentInfos.contains(info)) {
+            files = info.files();
+            deleter.incRef(files);
           }
+        }
+
+        if (files != null) {
+
+          success = false;
+
+          startTransaction(false);
+
+          try {
+            merger.createCompoundFile(mergedName + ".cfs");
+            synchronized(this) {
+              info.setUseCompoundFile(true);
+            }
           
-          success = true;
+            success = true;
           
-        } finally {
-          if (!success) {
-            if (infoStream != null)
-              message("hit exception building compound file in addIndexes during merge");
+          } finally {
+
+            deleter.decRef(files);
+
+            if (!success) {
+              if (infoStream != null)
+                message("hit exception building compound file in addIndexes during merge");
 
-            rollbackTransaction();
-          } else {
-            commitTransaction();
+              rollbackTransaction();
+            } else {
+              commitTransaction();
+            }
           }
         }
       }
@@ -3133,6 +3399,7 @@
    * will internally call prepareCommit.
    */
   public final void prepareCommit() throws CorruptIndexException, IOException {
+    ensureOpen();
     prepareCommit(false);
   }
 
@@ -3182,17 +3449,41 @@
    * loss it may still lose data.  Lucene cannot guarantee
    * consistency on such devices.  </p>
    */
+
+  private boolean committing;
+
+  synchronized private void waitForCommit() {
+    // Only allow a single thread to do the commit, at a time:
+    while(committing)
+      doWait();
+    committing = true;
+  }
+
+  synchronized private void doneCommit() {
+    committing = false;
+    notifyAll();
+  }
+
   public final void commit() throws CorruptIndexException, IOException {
 
-    message("commit: start");
+    ensureOpen();
 
-    if (autoCommit || pendingCommit == null) {
-      message("commit: now prepare");
-      prepareCommit(true);
-    } else
-      message("commit: already prepared");
+    // Only let one thread do the prepare/finish at a time
+    waitForCommit();
 
-    finishCommit();
+    try {
+      message("commit: start");
+
+      if (autoCommit || pendingCommit == null) {
+        message("commit: now prepare");
+        prepareCommit(true);
+      } else
+        message("commit: already prepared");
+
+      finishCommit();
+    } finally {
+      doneCommit();
+    }
   }
 
   private synchronized final void finishCommit() throws CorruptIndexException, IOException {
@@ -3203,7 +3494,7 @@
         pendingCommit.finishCommit(directory);
         lastCommitChangeCount = pendingCommitChangeCount;
         segmentInfos.updateGeneration(pendingCommit);
-        setRollbackSegmentInfos();
+        setRollbackSegmentInfos(pendingCommit);
         deleter.checkpoint(pendingCommit, true);
       } finally {
         deleter.decRef(pendingCommit);
@@ -3228,7 +3519,8 @@
    *  be flushed
    */
   protected final void flush(boolean triggerMerge, boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException {
-    ensureOpen();
+    // We can be called during close, when closing==true, so we must pass false to ensureOpen:
+    ensureOpen(false);
     if (doFlush(flushDocStores, flushDeletes) && triggerMerge)
       maybeMerge();
   }
@@ -3238,12 +3530,14 @@
   // even while a flush is happening
   private synchronized final boolean doFlush(boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException {
 
-    // Make sure no threads are actively adding a document
+    ensureOpen(false);
 
     assert testPoint("startDoFlush");
 
     flushCount++;
 
+    // Make sure no threads are actively adding a document
+
     flushDeletes |= docWriter.deletesFull();
 
     // When autoCommit=true we must always flush deletes
@@ -3425,7 +3719,7 @@
         if (segmentInfos.indexOf(info) == -1)
           throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the index", directory);
         else
-          throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + merge + " vs " + segString() + "), which IndexWriter (currently) cannot handle",
+          throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + merge.segString(directory) + " vs " + segString() + "), which IndexWriter (currently) cannot handle",
                                                directory);
       }
     }
@@ -3439,7 +3733,7 @@
    *  deletes may have been flushed to the segments since
    *  the merge was started.  This method "carries over"
    *  such new deletes onto the newly merged segment, and
-   *  saves the results deletes file (incrementing the
+   *  saves the resulting deletes file (incrementing the
    *  delete generation for merge.info).  If no deletes were
    *  flushed, no new deletes file is saved. */
   synchronized private void commitMergedDeletes(MergePolicy.OneMerge merge) throws IOException {
@@ -3542,7 +3836,7 @@
       return false;
 
     if (infoStream != null)
-      message("commitMerge: " + merge.segString(directory));
+      message("commitMerge: " + merge.segString(directory) + " index=" + segString());
 
     assert merge.registerDone;
 
@@ -3588,6 +3882,7 @@
     merge.info.setHasProx(merger.hasProx());
 
     segmentInfos.subList(start, start + merge.segments.size()).clear();
+    assert !segmentInfos.contains(merge.info);
     segmentInfos.add(start, merge.info);
 
     // Must checkpoint before decrefing so any newly
@@ -3632,13 +3927,14 @@
           mergeInit(merge);
 
           if (infoStream != null)
-            message("now merge\n  merge=" + merge.segString(directory) + "\n  index=" + segString());
+            message("now merge\n  merge=" + merge.segString(directory) + "\n  merge=" + merge + "\n  index=" + segString());
 
           mergeMiddle(merge);
           success = true;
         } catch (MergePolicy.MergeAbortedException e) {
           merge.setException(e);
           addMergeException(merge);
+
           // We can ignore this exception, unless the merge
           // involves segments from external directories, in
           // which case we must throw it so, for example, the
@@ -3668,10 +3964,6 @@
               updatePendingMerges(merge.maxNumSegmentsOptimize, merge.optimize);
           } finally {
             runningMerges.remove(merge);
-            // Optimize may be waiting on the final optimize
-            // merge to finish; and finishMerges() may be
-            // waiting for all merges to finish:
-            notifyAll();
           }
         }
       }
@@ -3687,11 +3979,16 @@
    *  are now participating in a merge, and true is
    *  returned.  Else (the merge conflicts) false is
    *  returned. */
-  final synchronized boolean registerMerge(MergePolicy.OneMerge merge) {
+  final synchronized boolean registerMerge(MergePolicy.OneMerge merge) throws MergePolicy.MergeAbortedException {
 
     if (merge.registerDone)
       return true;
 
+    if (stopMerges) {
+      merge.abort();
+      throw new MergePolicy.MergeAbortedException("merge is aborted: " + merge.segString(directory));
+    }
+
     final int count = merge.segments.size();
     boolean isExternal = false;
     for(int i=0;i<count;i++) {
@@ -3704,6 +4001,8 @@
         isExternal = true;
     }
 
+    ensureContiguousMerge(merge);
+
     pendingMerges.add(merge);
 
     if (infoStream != null)
@@ -3928,6 +4227,10 @@
   /** Does fininishing for a merge, which is fast but holds
    *  the synchronized lock on IndexWriter instance. */
   final synchronized void mergeFinish(MergePolicy.OneMerge merge) throws IOException {
+    
+    // Optimize, addIndexes or finishMerges may be waiting
+    // on merges to finish.
+    notifyAll();
 
     if (merge.increfDone)
       decrefMergeSegments(merge);
@@ -4021,6 +4324,16 @@
       try {
         merger.createCompoundFile(compoundFileName);
         success = true;
+      } catch (IOException ioe) {
+        synchronized(this) {
+          if (merge.isAborted()) {
+            // This can happen if rollback or close(false)
+            // is called -- fall through to logic below to
+            // remove the partially created CFS:
+            success = true;
+          } else
+            throw ioe;
+        }
       } finally {
         if (!success) {
           if (infoStream != null)
@@ -4139,7 +4452,10 @@
       if (i > 0) {
         buffer.append(' ');
       }
-      buffer.append(infos.info(i).segString(directory));
+      final SegmentInfo info = infos.info(i);
+      buffer.append(info.segString(directory));
+      if (info.dir != directory)
+        buffer.append("**");
     }
     return buffer.toString();
   }
@@ -4227,6 +4543,20 @@
     }
   }
 
+  private synchronized void doWait() {
+    try {
+      // NOTE: the callers of this method should in theory
+      // be able to do simply wait(), but, as a defense
+      // against thread timing hazards where notifyAll()
+      // falls to be called, we wait for at most 1 second
+      // and then return so caller can check if wait
+      // conditions are satisified:
+      wait(1000);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
   /** Walk through all files referenced by the current
    *  segmentInfos and ask the Directory to sync each file,
    *  if it wasn't already.  If that succeeds, then we
@@ -4252,26 +4582,38 @@
 
       synchronized(this) {
 
-        assert lastCommitChangeCount <= changeCount;
+        // Wait for any running addIndexes to complete
+        // first, then block any from running until we've
+        // copied the segmentInfos we intend to sync:
+        blockAddIndexes(false);
 
-        if (changeCount == lastCommitChangeCount) {
-          if (infoStream != null)
-            message("  skip startCommit(): no changes pending");
-          return;
-        }
+        assert !hasExternalSegments();
 
-        // First, we clone & incref the segmentInfos we intend
-        // to sync, then, without locking, we sync() each file
-        // referenced by toSync, in the background.  Multiple
-        // threads can be doing this at once, if say a large
-        // merge and a small merge finish at the same time:
+        try {
 
-        if (infoStream != null)
-          message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount);
+          assert lastCommitChangeCount <= changeCount;
+
+          if (changeCount == lastCommitChangeCount) {
+            if (infoStream != null)
+              message("  skip startCommit(): no changes pending");
+            return;
+          }
 
-        toSync = (SegmentInfos) segmentInfos.clone();
-        deleter.incRef(toSync, false);
-        myChangeCount = changeCount;
+          // First, we clone & incref the segmentInfos we intend
+          // to sync, then, without locking, we sync() each file
+          // referenced by toSync, in the background.  Multiple
+          // threads can be doing this at once, if say a large
+          // merge and a small merge finish at the same time:
+
+          if (infoStream != null)
+            message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount);
+
+          toSync = (SegmentInfos) segmentInfos.clone();
+          deleter.incRef(toSync, false);
+          myChangeCount = changeCount;
+        } finally {
+          resumeAddIndexes();
+        }
       }
 
       assert testPoint("midStartCommit");
@@ -4295,7 +4637,7 @@
                 try {
                   // Because we incRef'd this commit point, above,
                   // the file had better exist:
-                  assert directory.fileExists(fileName);
+                  assert directory.fileExists(fileName): "file '" + fileName + "' does not exist dir=" + directory;
                   message("now sync " + fileName);
                   directory.sync(fileName);
                   success = true;
@@ -4328,11 +4670,7 @@
             // Wait now for any current pending commit to complete:
             while(pendingCommit != null) {
               message("wait for existing pendingCommit to finish...");
-              try {
-                wait();
-              } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-              }
+              doWait();
             }
 
             if (segmentInfos.getGeneration() > toSync.getGeneration())

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java?rev=690537&r1=690536&r2=690537&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java Sat Aug 30 10:16:29 2008
@@ -30,8 +30,7 @@
 import java.util.Vector;
 
 final class SegmentInfos extends Vector {
-  
-  
+
   /** The file format version, a negative number. */
   /* Works since counter, the old 1st entry, is always >= 0 */
   public static final int FORMAT = -1;
@@ -826,4 +825,19 @@
     prepareCommit(dir);
     finishCommit(dir);
   }
+
+  synchronized String segString(Directory directory) {
+    StringBuffer buffer = new StringBuffer();
+    final int count = size();
+    for(int i = 0; i < count; i++) {
+      if (i > 0) {
+        buffer.append(' ');
+      }
+      final SegmentInfo info = info(i);
+      buffer.append(info.segString(directory));
+      if (info.dir != directory)
+        buffer.append("**");
+    }
+    return buffer.toString();
+  }
 }

Modified: lucene/java/trunk/src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java?rev=690537&r1=690536&r2=690537&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestAddIndexesNoOptimize.java Sat Aug 30 10:16:29 2008
@@ -340,7 +340,7 @@
     writer.setMaxBufferedDocs(10);
     writer.setMergeFactor(4);
 
-    writer.addIndexesNoOptimize(new Directory[] { aux, aux });
+    writer.addIndexesNoOptimize(new Directory[] { aux, new RAMDirectory(aux) });
     assertEquals(1060, writer.docCount());
     assertEquals(1000, writer.getDocCount(0));
     writer.close();
@@ -369,7 +369,7 @@
     writer.setMaxBufferedDocs(4);
     writer.setMergeFactor(4);
 
-    writer.addIndexesNoOptimize(new Directory[] { aux, aux });
+    writer.addIndexesNoOptimize(new Directory[] { aux, new RAMDirectory(aux) });
     assertEquals(1020, writer.docCount());
     assertEquals(1000, writer.getDocCount(0));
     writer.close();

Modified: lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java?rev=690537&r1=690536&r2=690537&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java Sat Aug 30 10:16:29 2008
@@ -22,6 +22,7 @@
 import java.io.File;
 import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.lucene.util.LuceneTestCase;
@@ -125,7 +126,7 @@
         writer.close();
     }
 
-    private void addDoc(IndexWriter writer) throws IOException
+    private static void addDoc(IndexWriter writer) throws IOException
     {
         Document doc = new Document();
         doc.add(new Field("content", "aaa", Field.Store.NO, Field.Index.TOKENIZED));
@@ -437,6 +438,10 @@
                        (dir.getMaxUsedSizeInBytes()-startDiskUsage) < 2*(startDiskUsage + inputDiskUsage));
           }
 
+          // Make sure we don't hit disk full during close below:
+          dir.setMaxSizeInBytes(0);
+          dir.setRandomIOExceptionRate(0.0, 0);
+
           writer.close();
 
           // Wait for all BG threads to finish else
@@ -2338,6 +2343,9 @@
       IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
       ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
 
+      // We expect AlreadyClosedException
+      cms.setSuppressExceptions();
+
       writer.setMergeScheduler(cms);
       writer.setMaxBufferedDocs(10);
       writer.setMergeFactor(4);
@@ -2891,7 +2899,7 @@
       writer.setMergeScheduler(new SerialMergeScheduler());
       writer.setMergePolicy(new LogDocMergePolicy());
 
-      Directory[] indexDirs = { dir};
+      Directory[] indexDirs = {new MockRAMDirectory(dir)};
       writer.addIndexes(indexDirs);
       writer.close();
     }
@@ -3732,6 +3740,278 @@
     dir.close();
   }
 
+  private abstract static class RunAddIndexesThreads {
+
+    Directory dir, dir2;
+    final static int NUM_INIT_DOCS = 17;
+    IndexWriter writer2;
+    final List failures = new ArrayList();
+    volatile boolean didClose;
+    final IndexReader[] readers;
+    final int NUM_COPY;
+    final static int NUM_THREADS = 5;
+    final Thread[] threads = new Thread[NUM_THREADS];
+    final ConcurrentMergeScheduler cms;
+
+    public RunAddIndexesThreads(int numCopy) throws Throwable {
+      NUM_COPY = numCopy;
+      dir = new MockRAMDirectory();
+      IndexWriter writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
+      writer.setMaxBufferedDocs(2);
+      for (int i = 0; i < NUM_INIT_DOCS; i++)
+        addDoc(writer);
+      writer.close();
+
+      dir2 = new MockRAMDirectory();
+      writer2 = new IndexWriter(dir2, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
+      cms = (ConcurrentMergeScheduler) writer2.getMergeScheduler();
+
+      readers = new IndexReader[NUM_COPY];
+      for(int i=0;i<NUM_COPY;i++)
+        readers[i] = IndexReader.open(dir);
+    }
+
+    void launchThreads(final int numIter) {
+
+      for(int i=0;i<NUM_THREADS;i++) {
+        threads[i] = new Thread() {
+            public void run() {
+              try {
+
+                final Directory[] dirs = new Directory[NUM_COPY];
+                for(int k=0;k<NUM_COPY;k++)
+                  dirs[k] = new MockRAMDirectory(dir);
+
+                int j=0;
+
+                while(true) {
+                  // System.out.println(Thread.currentThread().getName() + ": iter j=" + j);
+                  if (numIter > 0 && j == numIter)
+                    break;
+                  doBody(j++, dirs);
+                }
+              } catch (Throwable t) {
+                handle(t);
+              }
+            }
+          };
+      }
+
+      for(int i=0;i<NUM_THREADS;i++)
+        threads[i].start();
+    }
+
+    void joinThreads() {
+      for(int i=0;i<NUM_THREADS;i++)
+        try {
+          threads[i].join();
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        }
+    }
+
+    void close(boolean doWait) throws Throwable {
+      didClose = true;
+      writer2.close(doWait);
+    }
+
+    void closeDir() throws Throwable {
+      for(int i=0;i<NUM_COPY;i++)
+        readers[i].close();
+      dir2.close();
+    }
+
+    abstract void doBody(int j, Directory[] dirs) throws Throwable;
+    abstract void handle(Throwable t);
+  }
+
+  private class CommitAndAddIndexes extends RunAddIndexesThreads {
+    public CommitAndAddIndexes(int numCopy) throws Throwable {
+      super(numCopy);
+    }
+
+    void handle(Throwable t) {
+      t.printStackTrace(System.out);
+      synchronized(failures) {
+        failures.add(t);
+      }
+    }
+
+    void doBody(int j, Directory[] dirs) throws Throwable {
+      switch(j%4) {
+      case 0:
+        writer2.addIndexes(dirs);
+        break;
+      case 1:
+        writer2.addIndexesNoOptimize(dirs);
+        break;
+      case 2:
+        writer2.addIndexes(readers);
+        break;
+      case 3:
+        writer2.commit();
+      }
+    }
+  }
+
+  // LUCENE-1335: test simultaneous addIndexes & commits
+  // from multiple threads
+  public void testAddIndexesWithThreads() throws Throwable {
+
+    final int NUM_ITER = 12;
+    final int NUM_COPY = 3;
+    CommitAndAddIndexes c = new CommitAndAddIndexes(NUM_COPY);
+    c.launchThreads(NUM_ITER);
+
+    for(int i=0;i<100;i++)
+      addDoc(c.writer2);
+
+    c.joinThreads();
+
+    assertEquals(100+NUM_COPY*(3*NUM_ITER/4)*c.NUM_THREADS*c.NUM_INIT_DOCS, c.writer2.numDocs());
+
+    c.close(true);
+
+    assertTrue(c.failures.size() == 0);
+
+    _TestUtil.checkIndex(c.dir2);
+
+    IndexReader reader = IndexReader.open(c.dir2);
+    assertEquals(100+NUM_COPY*(3*NUM_ITER/4)*c.NUM_THREADS*c.NUM_INIT_DOCS, reader.numDocs());
+    reader.close();
+
+    c.closeDir();
+  }
+
+  private class CommitAndAddIndexes2 extends CommitAndAddIndexes {
+    public CommitAndAddIndexes2(int numCopy) throws Throwable {
+      super(numCopy);
+    }
+
+    void handle(Throwable t) {
+      if (!(t instanceof AlreadyClosedException) && !(t instanceof NullPointerException)) {
+        t.printStackTrace(System.out);
+        synchronized(failures) {
+          failures.add(t);
+        }
+      }
+    }
+  }
+
+  // LUCENE-1335: test simultaneous addIndexes & close
+  public void testAddIndexesWithClose() throws Throwable {
+    final int NUM_COPY = 3;
+    CommitAndAddIndexes2 c = new CommitAndAddIndexes2(NUM_COPY);
+    //c.writer2.setInfoStream(System.out);
+    c.launchThreads(-1);
+
+    // Close w/o first stopping/joining the threads
+    c.close(true);
+    //c.writer2.close();
+
+    c.joinThreads();
+
+    _TestUtil.checkIndex(c.dir2);
+
+    c.closeDir();
+
+    assertTrue(c.failures.size() == 0);
+  }
+
+  private class CommitAndAddIndexes3 extends RunAddIndexesThreads {
+    public CommitAndAddIndexes3(int numCopy) throws Throwable {
+      super(numCopy);
+    }
+
+    void doBody(int j, Directory[] dirs) throws Throwable {
+      switch(j%5) {
+      case 0:
+        writer2.addIndexes(dirs);
+        break;
+      case 1:
+        writer2.addIndexesNoOptimize(dirs);
+        break;
+      case 2:
+        writer2.addIndexes(readers);
+        break;
+      case 3:
+        writer2.optimize();
+      case 4:
+        writer2.commit();
+      }
+    }
+
+    void handle(Throwable t) {
+      boolean report = true;
+
+      if (t instanceof AlreadyClosedException || t instanceof MergePolicy.MergeAbortedException || t instanceof NullPointerException) {
+        report = !didClose;
+      } else if (t instanceof IOException)  {
+        Throwable t2 = t.getCause();
+        if (t2 instanceof MergePolicy.MergeAbortedException) {
+          report = !didClose;
+        }
+      }
+      if (report) {
+        t.printStackTrace(System.out);
+        synchronized(failures) {
+          failures.add(t);
+        }
+      }
+    }
+  }
+
+  // LUCENE-1335: test simultaneous addIndexes & close
+  public void testAddIndexesWithCloseNoWait() throws Throwable {
+
+    final int NUM_COPY = 50;
+    CommitAndAddIndexes3 c = new CommitAndAddIndexes3(NUM_COPY);
+    c.launchThreads(-1);
+
+    try {
+      Thread.sleep(500);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    }
+
+    // Close w/o first stopping/joining the threads
+    c.close(false);
+
+    c.joinThreads();
+
+    _TestUtil.checkIndex(c.dir2);
+
+    c.closeDir();
+
+    assertTrue(c.failures.size() == 0);
+  }
+
+  // LUCENE-1335: test simultaneous addIndexes & close
+  public void testAddIndexesWithRollback() throws Throwable {
+    
+    final int NUM_COPY = 50;
+    CommitAndAddIndexes3 c = new CommitAndAddIndexes3(NUM_COPY);
+    c.launchThreads(-1);
+
+    try {
+      Thread.sleep(500);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    }
+
+    // Close w/o first stopping/joining the threads
+    c.didClose = true;
+    c.writer2.rollback();
+
+    c.joinThreads();
+
+    _TestUtil.checkIndex(c.dir2);
+
+    c.closeDir();
+
+    assertTrue(c.failures.size() == 0);
+  }
+
   // LUCENE-1347
   public class MockIndexWriter4 extends IndexWriter {
 



Mime
View raw message