lucene-java-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mikemcc...@apache.org
Subject svn commit: r620756 - in /lucene/java/branches/lucene_2_3: CHANGES.txt src/java/org/apache/lucene/index/DocumentsWriter.java
Date Tue, 12 Feb 2008 10:56:22 GMT
Author: mikemccand
Date: Tue Feb 12 02:56:19 2008
New Revision: 620756

URL: http://svn.apache.org/viewvc?rev=620756&view=rev
Log:
LUCENE-1171 (backport to 2.3): fix cases where DocumentsWriter can deadlock on hitting OOM
error

Modified:
    lucene/java/branches/lucene_2_3/CHANGES.txt
    lucene/java/branches/lucene_2_3/src/java/org/apache/lucene/index/DocumentsWriter.java

Modified: lucene/java/branches/lucene_2_3/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/java/branches/lucene_2_3/CHANGES.txt?rev=620756&r1=620755&r2=620756&view=diff
==============================================================================
--- lucene/java/branches/lucene_2_3/CHANGES.txt (original)
+++ lucene/java/branches/lucene_2_3/CHANGES.txt Tue Feb 12 02:56:19 2008
@@ -11,6 +11,9 @@
     documents have mixed term vectors (Suresh Guvvala via Mike
     McCandless).
 	
+ 2. LUCENE-1171: Fixed some cases where OOM errors could cause
+    deadlock in IndexWriter (Mike McCandless).
+	
 New features
 
 Optimizations

Modified: lucene/java/branches/lucene_2_3/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/branches/lucene_2_3/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=620756&r1=620755&r2=620756&view=diff
==============================================================================
--- lucene/java/branches/lucene_2_3/src/java/org/apache/lucene/index/DocumentsWriter.java
(original)
+++ lucene/java/branches/lucene_2_3/src/java/org/apache/lucene/index/DocumentsWriter.java
Tue Feb 12 02:56:19 2008
@@ -150,7 +150,7 @@
                                                   // pause (eg to flush)
   private boolean flushPending;                   // True when a thread has decided to flush
   private boolean bufferIsFull;                   // True when it's time to write segment
-  private boolean aborting;                       // True while abort is running
+  private int abortCount;                         // Non-zero while abort is pending or running
 
   private PrintStream infoStream;
 
@@ -323,7 +323,7 @@
   }
 
   synchronized void setAborting() {
-    aborting = true;
+    abortCount++;
   }
 
   /** Called if we hit an exception when adding docs,
@@ -338,7 +338,7 @@
     // unwinding the un-synchronized stack, no thread grabs
     // the corrupt ThreadState that hit the aborting
     // exception:
-    assert ae == null || aborting;
+    assert ae == null || abortCount>0;
 
     try {
 
@@ -361,18 +361,28 @@
         bufferedDeleteDocIDs.clear();
         numBufferedDeleteTerms = 0;
 
-        abortedFiles = files();
+        try {
+          abortedFiles = files();
+        } catch (Throwable t) {
+          abortedFiles = null;
+        }
 
-        // Discard pending norms:
-        final int numField = fieldInfos.size();
-        for (int i=0;i<numField;i++) {
-          FieldInfo fi = fieldInfos.fieldInfo(i);
-          if (fi.isIndexed && !fi.omitNorms) {
-            BufferedNorms n = norms[i];
-            if (n != null) {
-              n.out.reset();
-              n.reset();
+        docStoreSegment = null;
+        numDocsInStore = 0;
+        docStoreOffset = 0;
+        files = null;
+
+        // Clear vectors & fields from ThreadStates
+        for(int i=0;i<threadStates.length;i++) {
+          ThreadState state = threadStates[i];
+          state.tvfLocal.reset();
+          state.fdtLocal.reset();
+          if (state.localFieldsWriter != null) {
+            try {
+              state.localFieldsWriter.close();
+            } catch (Throwable t) {
             }
+            state.localFieldsWriter = null;
           }
         }
 
@@ -380,21 +390,21 @@
         if (tvx != null) {
           try {
             tvx.close();
-          } catch (IOException ioe) {
+          } catch (Throwable t) {
           }
           tvx = null;
         }
         if (tvd != null) {
           try {
             tvd.close();
-          } catch (IOException ioe) {
+          } catch (Throwable t) {
           }
           tvd = null;
         }
         if (tvf != null) {
           try {
             tvf.close();
-          } catch (IOException ioe) {
+          } catch (Throwable t) {
           }
           tvf = null;
         }
@@ -403,30 +413,28 @@
         if (fieldsWriter != null) {
           try {
             fieldsWriter.close();
-          } catch (IOException ioe) {
+          } catch (Throwable t) {
           }
           fieldsWriter = null;
         }
 
-        // Clear vectors & fields from ThreadStates
-        for(int i=0;i<threadStates.length;i++) {
-          ThreadState state = threadStates[i];
-          if (state.localFieldsWriter != null) {
-            state.localFieldsWriter.close();
-            state.localFieldsWriter = null;
+        // Discard pending norms:
+        final int numField = fieldInfos.size();
+        for (int i=0;i<numField;i++) {
+          FieldInfo fi = fieldInfos.fieldInfo(i);
+          if (fi.isIndexed && !fi.omitNorms) {
+            BufferedNorms n = norms[i];
+            if (n != null)
+              try {
+                n.reset();
+              } catch (Throwable t) {
+              }
           }
-          state.tvfLocal.reset();
-          state.fdtLocal.reset();
         }
 
         // Reset all postings data
         resetPostingsData();
 
-        docStoreSegment = null;
-        numDocsInStore = 0;
-        docStoreOffset = 0;
-        files = null;
-
       } finally {
         resumeAllThreads();
       }
@@ -445,7 +453,8 @@
           assert false: "unknown exception: " + t;
       }
     } finally {
-      aborting = false;
+      if (ae != null)
+        abortCount--;
       notifyAll();
     }
   }
@@ -454,20 +463,20 @@
   private void resetPostingsData() throws IOException {
     // All ThreadStates should be idle when we are called
     assert allThreadsIdle();
-    for(int i=0;i<threadStates.length;i++) {
-      threadStates[i].resetPostings();
-      threadStates[i].numThreads = 0;
-    }
     threadBindings.clear();
-    numBytesUsed = 0;
-    balanceRAM();
-    bufferIsFull = false;
-    flushPending = false;
     segment = null;
     numDocsInRAM = 0;
     nextDocID = 0;
     nextWriteDocID = 0;
     files = null;
+    balanceRAM();
+    bufferIsFull = false;
+    flushPending = false;
+    for(int i=0;i<threadStates.length;i++) {
+      threadStates[i].numThreads = 0;
+      threadStates[i].resetPostings();
+    }
+    numBytesUsed = 0;
   }
 
   // Returns true if an abort is in progress
@@ -480,7 +489,7 @@
         Thread.currentThread().interrupt();
       }
     }
-    return aborting;
+    return abortCount > 0;
   }
 
   synchronized void resumeAllThreads() {
@@ -627,19 +636,20 @@
     /** Clear the postings hash and return objects back to
      *  shared pool */
     public void resetPostings() throws IOException {
+      fieldGen = 0;
+      maxPostingsVectors = 0;
+      doFlushAfter = false;
       if (localFieldsWriter != null) {
         localFieldsWriter.close();
         localFieldsWriter = null;
       }
-      fieldGen = 0;
-      maxPostingsVectors = 0;
-      doFlushAfter = false;
       postingsPool.reset();
       charPool.reset();
       recyclePostings(postingsFreeList, postingsFreeCount);
       postingsFreeCount = 0;
       for(int i=0;i<numAllFieldData;i++) {
         FieldData fp = allFieldDataArray[i];
+        fp.lastGen = -1;
         if (fp.numPostings > 0)
           fp.resetPostingArrays();
       }
@@ -1382,6 +1392,8 @@
         final int limit = fieldCount;
         final Fieldable[] docFieldsFinal = docFields;
 
+        boolean doWriteVectors = true;
+
         // Walk through all occurrences in this doc for this
         // field:
         try {
@@ -1410,22 +1422,30 @@
 
             docFieldsFinal[j] = null;
           }
+        } catch (AbortException ae) {
+          doWriteVectors = false;
+          throw ae;
         } finally {
           if (postingsVectorsUpto > 0) {
-            // Add term vectors for this field
-            boolean success = false;
             try {
-              writeVectors(fieldInfo);
-              success = true;
-            } finally {
-              if (!success) {
-                // If we hit an exception inside
-                // writeVectors, the contents of tvfLocal
-                // can be corrupt, so we must discard all
-                // term vectors for this document:
-                numVectorFields = 0;
-                tvfLocal.reset();
+              if (doWriteVectors) {
+                // Add term vectors for this field
+                boolean success = false;
+                try {
+                  writeVectors(fieldInfo);
+                  success = true;
+                } finally {
+                  if (!success) {
+                    // If we hit an exception inside
+                    // writeVectors, the contents of tvfLocal
+                    // can be corrupt, so we must discard all
+                    // term vectors for this document:
+                    numVectorFields = 0;
+                    tvfLocal.reset();
+                  }
+                }
               }
+            } finally {
               if (postingsVectorsUpto > maxPostingsVectors)
                 maxPostingsVectors = postingsVectorsUpto;
               postingsVectorsUpto = 0;
@@ -1666,8 +1686,8 @@
 
             // Refill?
             if (0 == postingsFreeCount) {
-              postingsFreeCount = postingsFreeList.length;
               getPostings(postingsFreeList);
+              postingsFreeCount = postingsFreeList.length;
             }
 
             final int textLen1 = 1+tokenTextLen;
@@ -1772,7 +1792,7 @@
        *  occupied) or too large (< 20% occupied). */
       void rehashPostings(final int newSize) {
 
-        postingsHashMask = newSize-1;
+        final int newMask = newSize-1;
 
         Posting[] newHash = new Posting[newSize];
         for(int i=0;i<postingsHashSize;i++) {
@@ -1787,19 +1807,20 @@
             while (pos > start)
               code = (code*31) + text[--pos];
 
-            int hashPos = code & postingsHashMask;
+            int hashPos = code & newMask;
             assert hashPos >= 0;
             if (newHash[hashPos] != null) {
               final int inc = ((code>>8)+code)|1;
               do {
                 code += inc;
-                hashPos = code & postingsHashMask;
+                hashPos = code & newMask;
               } while (newHash[hashPos] != null);
             }
             newHash[hashPos] = p0;
           }
         }
 
+        postingsHashMask =  newMask;
         postingsHash = newHash;
         postingsHashSize = newSize;
         postingsHashHalfSize = newSize >> 1;
@@ -2334,7 +2355,7 @@
     // Next, wait until my thread state is idle (in case
     // it's shared with other threads) and for threads to
     // not be paused nor a flush pending:
-    while(!closed && (!state.isIdle || pauseThreads != 0 || flushPending || aborting))
+    while(!closed && (!state.isIdle || pauseThreads != 0 || flushPending || abortCount
> 0))
       try {
         wait();
       } catch (InterruptedException e) {
@@ -2556,7 +2577,7 @@
   /** Does the synchronized work to finish/flush the
    * inverted document. */
   private synchronized void finishDocument(ThreadState state) throws IOException, AbortException
{
-    if (aborting) {
+    if (abortCount > 0) {
       // Forcefully idle this threadstate -- its state will
       // be reset by abort()
       state.isIdle = true;
@@ -3002,9 +3023,10 @@
   // Holds free pool of Posting instances
   private Posting[] postingsFreeList;
   private int postingsFreeCount;
+  private int postingsAllocCount;
 
   /* Allocate more Postings from shared pool */
-  private synchronized void getPostings(Posting[] postings) {
+  synchronized void getPostings(Posting[] postings) {
     numBytesUsed += postings.length * POSTING_NUM_BYTE;
     final int numToCopy;
     if (postingsFreeCount < postings.length)
@@ -3018,25 +3040,27 @@
 
     // Directly allocate the remainder if any
     if (numToCopy < postings.length) {
-      numBytesAlloc += (postings.length - numToCopy) * POSTING_NUM_BYTE;
+      final int extra = postings.length - numToCopy;
+      final int newPostingsAllocCount = postingsAllocCount + extra;
+      if (newPostingsAllocCount > postingsFreeList.length)
+        postingsFreeList = new Posting[(int) (1.25 * newPostingsAllocCount)];
+
       balanceRAM();
-      for(int i=numToCopy;i<postings.length;i++)
+      for(int i=numToCopy;i<postings.length;i++) {
         postings[i] = new Posting();
+        numBytesAlloc += POSTING_NUM_BYTE;
+        postingsAllocCount++;
+      }
     }
   }
 
-  private synchronized void recyclePostings(Posting[] postings, int numPostings) {
+  synchronized void recyclePostings(Posting[] postings, int numPostings) {
     // Move all Postings from this ThreadState back to our
-    // free list
-    if (postingsFreeCount + numPostings > postingsFreeList.length) {
-      final int newSize = (int) (1.25 * (postingsFreeCount + numPostings));
-      Posting[] newArray = new Posting[newSize];
-      System.arraycopy(postingsFreeList, 0, newArray, 0, postingsFreeCount);
-      postingsFreeList = newArray;
-    }
+    // free list.  We pre-allocated this array while we were
+    // creating Postings to make sure it's large enough
+    assert postingsFreeCount + numPostings <= postingsFreeList.length;
     System.arraycopy(postings, 0, postingsFreeList, postingsFreeCount, numPostings);
     postingsFreeCount += numPostings;
-    numBytesUsed -= numPostings * POSTING_NUM_BYTE;
   }
 
   /* Initial chunks size of the shared byte[] blocks used to
@@ -3066,7 +3090,6 @@
   synchronized void recycleByteBlocks(byte[][] blocks, int start, int end) {
     for(int i=start;i<end;i++)
       freeByteBlocks.add(blocks[i]);
-    numBytesUsed -= (end-start) * BYTE_BLOCK_SIZE;
   }
 
   /* Initial chunk size of the shared char[] blocks used to
@@ -3097,7 +3120,6 @@
   synchronized void recycleCharBlocks(char[][] blocks, int numBlocks) {
     for(int i=0;i<numBlocks;i++)
       freeCharBlocks.add(blocks[i]);
-    numBytesUsed -= numBlocks * CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
   }
 
   String toMB(long v) {
@@ -3178,6 +3200,7 @@
             numToFree = postingsFreeCount;
           Arrays.fill(postingsFreeList, postingsFreeCount-numToFree, postingsFreeCount, null);
           postingsFreeCount -= numToFree;
+          postingsAllocCount -= numToFree;
           numBytesAlloc -= numToFree * POSTING_NUM_BYTE;
         }
 



Mime
View raw message