lucene-java-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mikemcc...@apache.org
Subject svn commit: r620478 - /lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
Date Mon, 11 Feb 2008 12:55:50 GMT
Author: mikemccand
Date: Mon Feb 11 04:55:47 2008
New Revision: 620478

URL: http://svn.apache.org/viewvc?rev=620478&view=rev
Log:
LUCENE-1171: better robustness in DocumentsWriter on hitting OOM at various times

Modified:
    lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=620478&r1=620477&r2=620478&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java Mon Feb 11 04:55:47
2008
@@ -150,7 +150,7 @@
                                                   // pause (eg to flush)
   private boolean flushPending;                   // True when a thread has decided to flush
   private boolean bufferIsFull;                   // True when it's time to write segment
-  private boolean aborting;                       // True while abort is running
+  private int abortCount;                         // Non-zero while abort is pending or running
 
   private PrintStream infoStream;
 
@@ -323,7 +323,7 @@
   }
 
   synchronized void setAborting() {
-    aborting = true;
+    abortCount++;
   }
 
   /** Called if we hit an exception when adding docs,
@@ -338,7 +338,7 @@
     // unwinding the un-synchronized stack, no thread grabs
     // the corrupt ThreadState that hit the aborting
     // exception:
-    assert ae == null || aborting;
+    assert ae == null || abortCount>0;
 
     try {
 
@@ -361,18 +361,28 @@
         bufferedDeleteDocIDs.clear();
         numBufferedDeleteTerms = 0;
 
-        abortedFiles = files();
+        try {
+          abortedFiles = files();
+        } catch (Throwable t) {
+          abortedFiles = null;
+        }
 
-        // Discard pending norms:
-        final int numField = fieldInfos.size();
-        for (int i=0;i<numField;i++) {
-          FieldInfo fi = fieldInfos.fieldInfo(i);
-          if (fi.isIndexed && !fi.omitNorms) {
-            BufferedNorms n = norms[i];
-            if (n != null) {
-              n.out.reset();
-              n.reset();
+        docStoreSegment = null;
+        numDocsInStore = 0;
+        docStoreOffset = 0;
+        files = null;
+
+        // Clear vectors & fields from ThreadStates
+        for(int i=0;i<threadStates.length;i++) {
+          ThreadState state = threadStates[i];
+          state.tvfLocal.reset();
+          state.fdtLocal.reset();
+          if (state.localFieldsWriter != null) {
+            try {
+              state.localFieldsWriter.close();
+            } catch (Throwable t) {
             }
+            state.localFieldsWriter = null;
           }
         }
 
@@ -380,21 +390,21 @@
         if (tvx != null) {
           try {
             tvx.close();
-          } catch (IOException ioe) {
+          } catch (Throwable t) {
           }
           tvx = null;
         }
         if (tvd != null) {
           try {
             tvd.close();
-          } catch (IOException ioe) {
+          } catch (Throwable t) {
           }
           tvd = null;
         }
         if (tvf != null) {
           try {
             tvf.close();
-          } catch (IOException ioe) {
+          } catch (Throwable t) {
           }
           tvf = null;
         }
@@ -403,30 +413,28 @@
         if (fieldsWriter != null) {
           try {
             fieldsWriter.close();
-          } catch (IOException ioe) {
+          } catch (Throwable t) {
           }
           fieldsWriter = null;
         }
 
-        // Clear vectors & fields from ThreadStates
-        for(int i=0;i<threadStates.length;i++) {
-          ThreadState state = threadStates[i];
-          if (state.localFieldsWriter != null) {
-            state.localFieldsWriter.close();
-            state.localFieldsWriter = null;
+        // Discard pending norms:
+        final int numField = fieldInfos.size();
+        for (int i=0;i<numField;i++) {
+          FieldInfo fi = fieldInfos.fieldInfo(i);
+          if (fi.isIndexed && !fi.omitNorms) {
+            BufferedNorms n = norms[i];
+            if (n != null)
+              try {
+                n.reset();
+              } catch (Throwable t) {
+              }
           }
-          state.tvfLocal.reset();
-          state.fdtLocal.reset();
         }
 
         // Reset all postings data
         resetPostingsData();
 
-        docStoreSegment = null;
-        numDocsInStore = 0;
-        docStoreOffset = 0;
-        files = null;
-
       } finally {
         resumeAllThreads();
       }
@@ -445,7 +453,7 @@
           assert false: "unknown exception: " + t;
       }
     } finally {
-      aborting = false;
+      abortCount--;
       notifyAll();
     }
   }
@@ -454,20 +462,20 @@
   private void resetPostingsData() throws IOException {
     // All ThreadStates should be idle when we are called
     assert allThreadsIdle();
-    for(int i=0;i<threadStates.length;i++) {
-      threadStates[i].resetPostings();
-      threadStates[i].numThreads = 0;
-    }
     threadBindings.clear();
-    numBytesUsed = 0;
-    balanceRAM();
-    bufferIsFull = false;
-    flushPending = false;
     segment = null;
     numDocsInRAM = 0;
     nextDocID = 0;
     nextWriteDocID = 0;
     files = null;
+    balanceRAM();
+    bufferIsFull = false;
+    flushPending = false;
+    for(int i=0;i<threadStates.length;i++) {
+      threadStates[i].numThreads = 0;
+      threadStates[i].resetPostings();
+    }
+    numBytesUsed = 0;
   }
 
   // Returns true if an abort is in progress
@@ -480,7 +488,7 @@
         Thread.currentThread().interrupt();
       }
     }
-    return aborting;
+    return abortCount > 0;
   }
 
   synchronized void resumeAllThreads() {
@@ -627,19 +635,20 @@
     /** Clear the postings hash and return objects back to
      *  shared pool */
     public void resetPostings() throws IOException {
+      fieldGen = 0;
+      maxPostingsVectors = 0;
+      doFlushAfter = false;
       if (localFieldsWriter != null) {
         localFieldsWriter.close();
         localFieldsWriter = null;
       }
-      fieldGen = 0;
-      maxPostingsVectors = 0;
-      doFlushAfter = false;
       postingsPool.reset();
       charPool.reset();
       recyclePostings(postingsFreeList, postingsFreeCount);
       postingsFreeCount = 0;
       for(int i=0;i<numAllFieldData;i++) {
         FieldData fp = allFieldDataArray[i];
+        fp.lastGen = -1;
         if (fp.numPostings > 0)
           fp.resetPostingArrays();
       }
@@ -1383,6 +1392,8 @@
         final int limit = fieldCount;
         final Fieldable[] docFieldsFinal = docFields;
 
+        boolean doWriteVectors = true;
+
         // Walk through all occurrences in this doc for this
         // field:
         try {
@@ -1411,22 +1422,30 @@
 
             docFieldsFinal[j] = null;
           }
+        } catch (AbortException ae) {
+          doWriteVectors = false;
+          throw ae;
         } finally {
           if (postingsVectorsUpto > 0) {
-            // Add term vectors for this field
-            boolean success = false;
             try {
-              writeVectors(fieldInfo);
-              success = true;
-            } finally {
-              if (!success) {
-                // If we hit an exception inside
-                // writeVectors, the contents of tvfLocal
-                // can be corrupt, so we must discard all
-                // term vectors for this document:
-                numVectorFields = 0;
-                tvfLocal.reset();
+              if (doWriteVectors) {
+                // Add term vectors for this field
+                boolean success = false;
+                try {
+                  writeVectors(fieldInfo);
+                  success = true;
+                } finally {
+                  if (!success) {
+                    // If we hit an exception inside
+                    // writeVectors, the contents of tvfLocal
+                    // can be corrupt, so we must discard all
+                    // term vectors for this document:
+                    numVectorFields = 0;
+                    tvfLocal.reset();
+                  }
+                }
               }
+            } finally {
               if (postingsVectorsUpto > maxPostingsVectors)
                 maxPostingsVectors = postingsVectorsUpto;
               postingsVectorsUpto = 0;
@@ -1667,8 +1686,8 @@
 
             // Refill?
             if (0 == postingsFreeCount) {
-              postingsFreeCount = postingsFreeList.length;
               getPostings(postingsFreeList);
+              postingsFreeCount = postingsFreeList.length;
             }
 
             final int textLen1 = 1+tokenTextLen;
@@ -1773,7 +1792,7 @@
        *  occupied) or too large (< 20% occupied). */
       void rehashPostings(final int newSize) {
 
-        postingsHashMask = newSize-1;
+        final int newMask = newSize-1;
 
         Posting[] newHash = new Posting[newSize];
         for(int i=0;i<postingsHashSize;i++) {
@@ -1788,19 +1807,20 @@
             while (pos > start)
               code = (code*31) + text[--pos];
 
-            int hashPos = code & postingsHashMask;
+            int hashPos = code & newMask;
             assert hashPos >= 0;
             if (newHash[hashPos] != null) {
               final int inc = ((code>>8)+code)|1;
               do {
                 code += inc;
-                hashPos = code & postingsHashMask;
+                hashPos = code & newMask;
               } while (newHash[hashPos] != null);
             }
             newHash[hashPos] = p0;
           }
         }
 
+        postingsHashMask =  newMask;
         postingsHash = newHash;
         postingsHashSize = newSize;
         postingsHashHalfSize = newSize >> 1;
@@ -2335,7 +2355,7 @@
     // Next, wait until my thread state is idle (in case
     // it's shared with other threads) and for threads to
     // not be paused nor a flush pending:
-    while(!closed && (!state.isIdle || pauseThreads != 0 || flushPending || aborting))
+    while(!closed && (!state.isIdle || pauseThreads != 0 || flushPending || abortCount
> 0))
       try {
         wait();
       } catch (InterruptedException e) {
@@ -2557,7 +2577,7 @@
   /** Does the synchronized work to finish/flush the
    * inverted document. */
   private synchronized void finishDocument(ThreadState state) throws IOException, AbortException
{
-    if (aborting) {
+    if (abortCount > 0) {
       // Forcefully idle this threadstate -- its state will
       // be reset by abort()
       state.isIdle = true;
@@ -3003,9 +3023,10 @@
   // Holds free pool of Posting instances
   private Posting[] postingsFreeList;
   private int postingsFreeCount;
+  private int postingsAllocCount;
 
   /* Allocate more Postings from shared pool */
-  private synchronized void getPostings(Posting[] postings) {
+  synchronized void getPostings(Posting[] postings) {
     numBytesUsed += postings.length * POSTING_NUM_BYTE;
     final int numToCopy;
     if (postingsFreeCount < postings.length)
@@ -3019,25 +3040,27 @@
 
     // Directly allocate the remainder if any
     if (numToCopy < postings.length) {
-      numBytesAlloc += (postings.length - numToCopy) * POSTING_NUM_BYTE;
+      final int extra = postings.length - numToCopy;
+      final int newPostingsAllocCount = postingsAllocCount + extra;
+      if (newPostingsAllocCount > postingsFreeList.length)
+        postingsFreeList = new Posting[(int) (1.25 * newPostingsAllocCount)];
+
       balanceRAM();
-      for(int i=numToCopy;i<postings.length;i++)
+      for(int i=numToCopy;i<postings.length;i++) {
         postings[i] = new Posting();
+        numBytesAlloc += POSTING_NUM_BYTE;
+        postingsAllocCount++;
+      }
     }
   }
 
-  private synchronized void recyclePostings(Posting[] postings, int numPostings) {
+  synchronized void recyclePostings(Posting[] postings, int numPostings) {
     // Move all Postings from this ThreadState back to our
-    // free list
-    if (postingsFreeCount + numPostings > postingsFreeList.length) {
-      final int newSize = (int) (1.25 * (postingsFreeCount + numPostings));
-      Posting[] newArray = new Posting[newSize];
-      System.arraycopy(postingsFreeList, 0, newArray, 0, postingsFreeCount);
-      postingsFreeList = newArray;
-    }
+    // free list.  We pre-allocated this array while we were
+    // creating Postings to make sure it's large enough
+    assert postingsFreeCount + numPostings <= postingsFreeList.length;
     System.arraycopy(postings, 0, postingsFreeList, postingsFreeCount, numPostings);
     postingsFreeCount += numPostings;
-    numBytesUsed -= numPostings * POSTING_NUM_BYTE;
   }
 
   /* Initial chunks size of the shared byte[] blocks used to
@@ -3067,7 +3090,6 @@
   synchronized void recycleByteBlocks(byte[][] blocks, int start, int end) {
     for(int i=start;i<end;i++)
       freeByteBlocks.add(blocks[i]);
-    numBytesUsed -= (end-start) * BYTE_BLOCK_SIZE;
   }
 
   /* Initial chunk size of the shared char[] blocks used to
@@ -3098,7 +3120,6 @@
   synchronized void recycleCharBlocks(char[][] blocks, int numBlocks) {
     for(int i=0;i<numBlocks;i++)
       freeCharBlocks.add(blocks[i]);
-    numBytesUsed -= numBlocks * CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
   }
 
   String toMB(long v) {
@@ -3179,6 +3200,7 @@
             numToFree = postingsFreeCount;
           Arrays.fill(postingsFreeList, postingsFreeCount-numToFree, postingsFreeCount, null);
           postingsFreeCount -= numToFree;
+          postingsAllocCount -= numToFree;
           numBytesAlloc -= numToFree * POSTING_NUM_BYTE;
         }
 



Mime
View raw message