Author: mikemccand
Date: Tue Mar 11 05:05:43 2008
New Revision: 635896
URL: http://svn.apache.org/viewvc?rev=635896&view=rev
Log:
LUCENE-1200 (porting to 2.3): fix rare deadlock case in IndexWriter.addIndexes*
Modified:
lucene/java/branches/lucene_2_3/CHANGES.txt
lucene/java/branches/lucene_2_3/src/java/org/apache/lucene/index/IndexWriter.java
Modified: lucene/java/branches/lucene_2_3/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/java/branches/lucene_2_3/CHANGES.txt?rev=635896&r1=635895&r2=635896&view=diff
==============================================================================
--- lucene/java/branches/lucene_2_3/CHANGES.txt (original)
+++ lucene/java/branches/lucene_2_3/CHANGES.txt Tue Mar 11 05:05:43 2008
@@ -16,6 +16,9 @@
4. LUCENE-1199: Added defensive check for null indexReader before
calling close in IndexModifier.close() (Mike McCandless)
+ 5. LUCENE-1200: Fix rare deadlock case in addIndexes* when
+ ConcurrentMergeScheduler is in use (Mike McCandless)
+
======================= Release 2.3.1 2008-02-22 =======================
Bug fixes
Modified: lucene/java/branches/lucene_2_3/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/branches/lucene_2_3/src/java/org/apache/lucene/index/IndexWriter.java?rev=635896&r1=635895&r2=635896&view=diff
==============================================================================
--- lucene/java/branches/lucene_2_3/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/java/branches/lucene_2_3/src/java/org/apache/lucene/index/IndexWriter.java Tue
Mar 11 05:05:43 2008
@@ -1887,7 +1887,7 @@
* within the transactions, so they must be flushed before the
* transaction is started.
*/
- private void startTransaction() throws IOException {
+ private synchronized void startTransaction() throws IOException {
if (infoStream != null)
message("now start transaction");
@@ -1918,7 +1918,7 @@
* Rolls back the transaction and restores state to where
* we were at the start.
*/
- private void rollbackTransaction() throws IOException {
+ private synchronized void rollbackTransaction() throws IOException {
if (infoStream != null)
message("now rollback transaction");
@@ -1952,7 +1952,7 @@
* segments file and remove and pending deletions we have
* accumulated during the transaction
*/
- private void commitTransaction() throws IOException {
+ private synchronized void commitTransaction() throws IOException {
if (infoStream != null)
message("now commit transaction");
@@ -2130,6 +2130,10 @@
* each input Directory, so it is up to the caller to
* enforce this.
*
+ * <p><b>NOTE:</b> while this is running, any attempts to
+ * add or delete documents (with another thread) will be
+ * paused until this method completes.
+ *
* <p>After this completes, the index is optimized.
*
* <p>This method is transactional in how Exceptions are
@@ -2168,11 +2172,16 @@
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
- public synchronized void addIndexes(Directory[] dirs)
+ public void addIndexes(Directory[] dirs)
throws CorruptIndexException, IOException {
ensureOpen();
+
+ // Do not allow add docs or deletes while we are running:
+ docWriter.pauseAllThreads();
+
try {
+
if (infoStream != null)
message("flush at addIndexes");
flush();
@@ -2182,11 +2191,15 @@
startTransaction();
try {
- for (int i = 0; i < dirs.length; i++) {
- SegmentInfos sis = new SegmentInfos(); // read infos from dir
- sis.read(dirs[i]);
- for (int j = 0; j < sis.size(); j++) {
- segmentInfos.addElement(sis.info(j)); // add each info
+
+ synchronized(this) {
+ for (int i = 0; i < dirs.length; i++) {
+ SegmentInfos sis = new SegmentInfos(); // read infos from dir
+ sis.read(dirs[i]);
+ for (int j = 0; j < sis.size(); j++) {
+ final SegmentInfo info = sis.info(j);
+ segmentInfos.addElement(sis.info(j)); // add each info
+ }
}
}
@@ -2203,6 +2216,8 @@
} catch (OutOfMemoryError oom) {
hitOOM = true;
throw oom;
+ } finally {
+ docWriter.resumeAllThreads();
}
}
@@ -2224,6 +2239,10 @@
* each input Directory, so it is up to the caller to
* enforce this.
*
+ * <p><b>NOTE:</b> while this is running, any attempts to
+ * add or delete documents (with another thread) will be
+ * paused until this method completes.
+ *
* <p>
* This requires this index not be among those to be added, and the
* upper bound* of those segment doc counts not exceed maxMergeDocs.
@@ -2235,11 +2254,14 @@
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
- public synchronized void addIndexesNoOptimize(Directory[] dirs)
+ public void addIndexesNoOptimize(Directory[] dirs)
throws CorruptIndexException, IOException {
ensureOpen();
+ // Do not allow add docs or deletes while we are running:
+ docWriter.pauseAllThreads();
+
try {
if (infoStream != null)
message("flush at addIndexesNoOptimize");
@@ -2251,17 +2273,19 @@
try {
- for (int i = 0; i < dirs.length; i++) {
- if (directory == dirs[i]) {
- // cannot add this index: segments may be deleted in merge before added
- throw new IllegalArgumentException("Cannot add this index to itself");
- }
-
- SegmentInfos sis = new SegmentInfos(); // read infos from dir
- sis.read(dirs[i]);
- for (int j = 0; j < sis.size(); j++) {
- SegmentInfo info = sis.info(j);
- segmentInfos.addElement(info); // add each info
+ synchronized(this) {
+ for (int i = 0; i < dirs.length; i++) {
+ if (directory == dirs[i]) {
+ // cannot add this index: segments may be deleted in merge before added
+ throw new IllegalArgumentException("Cannot add this index to itself");
+ }
+
+ SegmentInfos sis = new SegmentInfos(); // read infos from dir
+ sis.read(dirs[i]);
+ for (int j = 0; j < sis.size(); j++) {
+ SegmentInfo info = sis.info(j);
+ segmentInfos.addElement(info); // add each info
+ }
}
}
@@ -2286,18 +2310,30 @@
} catch (OutOfMemoryError oom) {
hitOOM = true;
throw oom;
+ } finally {
+ docWriter.resumeAllThreads();
}
}
/* If any of our segments are using a directory != ours
* then copy them over. Currently this is only used by
* addIndexesNoOptimize(). */
- private synchronized void copyExternalSegments() throws CorruptIndexException, IOException
{
- final int numSegments = segmentInfos.size();
- for(int i=0;i<numSegments;i++) {
- SegmentInfo info = segmentInfos.info(i);
- if (info.dir != directory) {
- MergePolicy.OneMerge merge = new MergePolicy.OneMerge(segmentInfos.range(i, 1+i),
info.getUseCompoundFile());
+ private void copyExternalSegments() throws CorruptIndexException, IOException {
+ while(true) {
+ SegmentInfo info = null;
+ MergePolicy.OneMerge merge = null;
+ synchronized(this) {
+ final int numSegments = segmentInfos.size();
+ for(int i=0;i<numSegments;i++) {
+ info = segmentInfos.info(i);
+ if (info.dir != directory) {
+ merge = new MergePolicy.OneMerge(segmentInfos.range(i, 1+i), info.getUseCompoundFile());
+ break;
+ }
+ }
+ }
+
+ if (merge != null) {
if (registerMerge(merge)) {
pendingMerges.remove(merge);
runningMerges.add(merge);
@@ -2312,7 +2348,9 @@
// has been written with such external segments
// that an IndexReader would fail to load).
throw new MergePolicy.MergeException("segment \"" + info.name + " exists in external
directory yet the MergeScheduler executed the merge in a separate thread");
- }
+ } else
+ // No more external segments
+ break;
}
}
@@ -2320,6 +2358,16 @@
* <p>After this completes, the index is optimized. </p>
* <p>The provided IndexReaders are not closed.</p>
+ * <p><b>NOTE:</b> the index in each Directory must not be
+ * changed (opened by a writer) while this method is
+ * running. This method does not acquire a write lock in
+ * each input Directory, so it is up to the caller to
+ * enforce this.
+ *
+ * <p><b>NOTE:</b> while this is running, any attempts to
+ * add or delete documents (with another thread) will be
+ * paused until this method completes.
+ *
* <p>See {@link #addIndexes(Directory[])} for
* details on transactional semantics, temporary free
* space required in the Directory, and non-CFS segments
@@ -2327,10 +2375,14 @@
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
- public synchronized void addIndexes(IndexReader[] readers)
+ public void addIndexes(IndexReader[] readers)
throws CorruptIndexException, IOException {
ensureOpen();
+
+ // Do not allow add docs or deletes while we are running:
+ docWriter.pauseAllThreads();
+
try {
optimize(); // start with zero or 1 seg
@@ -2341,9 +2393,11 @@
IndexReader sReader = null;
try {
- if (segmentInfos.size() == 1){ // add existing index, if any
- sReader = SegmentReader.get(segmentInfos.info(0));
- merger.add(sReader);
+ synchronized(this) {
+ if (segmentInfos.size() == 1){ // add existing index, if any
+ sReader = SegmentReader.get(segmentInfos.info(0));
+ merger.add(sReader);
+ }
}
for (int i = 0; i < readers.length; i++) // add new indexes
@@ -2361,10 +2415,12 @@
sReader = null;
}
- segmentInfos.setSize(0); // pop old infos & add new
- info = new SegmentInfo(mergedName, docCount, directory, false, true,
- -1, null, false);
- segmentInfos.addElement(info);
+ synchronized(this) {
+ segmentInfos.setSize(0); // pop old infos & add new
+ info = new SegmentInfo(mergedName, docCount, directory, false, true,
+ -1, null, false);
+ segmentInfos.addElement(info);
+ }
success = true;
@@ -2392,7 +2448,9 @@
try {
merger.createCompoundFile(mergedName + ".cfs");
- info.setUseCompoundFile(true);
+ synchronized(this) {
+ info.setUseCompoundFile(true);
+ }
} finally {
if (!success) {
if (infoStream != null)
@@ -2407,6 +2465,8 @@
} catch (OutOfMemoryError oom) {
hitOOM = true;
throw oom;
+ } finally {
+ docWriter.resumeAllThreads();
}
}
|