lucene-java-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mikemcc...@apache.org
Subject svn commit: r677865 [2/5] - in /lucene/java/trunk: ./ src/java/org/apache/lucene/index/ src/java/org/apache/lucene/store/ src/java/org/apache/lucene/util/ src/test/org/apache/lucene/ src/test/org/apache/lucene/index/ src/test/org/apache/lucene/search/
Date Fri, 18 Jul 2008 09:20:14 GMT
Modified: lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=677865&r1=677864&r2=677865&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java Fri Jul 18 02:20:12 2008
@@ -25,21 +25,19 @@
 import org.apache.lucene.search.Scorer;
 import org.apache.lucene.search.Weight;
 import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.AlreadyClosedException;
-import org.apache.lucene.util.UnicodeUtil;
+import org.apache.lucene.util.ArrayUtil;
 
 import java.io.IOException;
 import java.io.PrintStream;
-import java.util.Arrays;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.ArrayList;
 import java.util.Map.Entry;
 import java.text.NumberFormat;
-import java.util.Collections;
 
 /**
  * This class accepts multiple added documents and directly
@@ -48,31 +46,23 @@
  * (with DocumentWriter) and doing standard merges on those
  * segments.
  *
- * When a document is added, its stored fields (if any) and
- * term vectors (if any) are immediately written to the
- * Directory (ie these do not consume RAM).  The freq/prox
- * postings are accumulated into a Postings hash table keyed
- * by term.  Each entry in this hash table holds a separate
- * byte stream (allocated as incrementally growing slices
- * into large shared byte[] arrays) for freq and prox, that
- * contains the postings data for multiple documents.  If
- * vectors are enabled, each unique term for each document
- * also allocates a PostingVector instance to similarly
- * track the offsets & positions byte stream.
- *
- * Once the Postings hash is full (ie is consuming the
- * allowed RAM) or the number of added docs is large enough
- * (in the case we are flushing by doc count instead of RAM
- * usage), we create a real segment and flush it to disk and
- * reset the Postings hash.
- *
- * In adding a document we first organize all of its fields
- * by field name.  We then process field by field, and
- * record the Posting hash per-field.  After each field we
- * flush its term vectors.  When it's time to flush the full
- * segment we first sort the fields by name, and then go
- * field by field and sorts its postings.
+ * Each added document is passed to the {@link DocConsumer},
+ * which in turn processes the document and interacts with
+ * other consumers in the indexing chain.  Certain
+ * consumers, like {@link StoredFieldsWriter} and {@link
+ * TermVectorsTermsWriter}, digest a document and
+ * immediately write bytes to the "doc store" files (ie,
+ * they do not consume RAM per document, except while they
+ * are processing the document).
  *
+ * Other consumers, eg {@link FreqProxTermsWriter} and
+ * {@link NormsWriter}, buffer bytes in RAM and flush only
+ * when a new segment is produced.
+
+ * Once we have used our allowed RAM buffer, or the number
+ * of added docs is large enough (in the case we are
+ * flushing by doc count instead of RAM usage), we create a
+ * real segment and flush it to the Directory.
  *
  * Threads:
  *
@@ -88,12 +78,6 @@
  * call).  Finally the synchronized "finishDocument" is
  * called to flush changes to the directory.
  *
- * Each ThreadState instance has its own Posting hash. Once
- * we're using too much RAM, we flush all Posting hashes to
- * a segment by merging the docIDs in the posting lists for
- * the same term across multiple thread states (see
- * writeSegment and appendPostings).
- *
  * When flush is called by IndexWriter, or, we flush
  * internally when autoCommit=false, we forcefully idle all
  * threads and flush only once they are all idle.  This
@@ -128,38 +112,79 @@
   IndexWriter writer;
   Directory directory;
 
-  FieldInfos fieldInfos = new FieldInfos(); // All fields we've seen
-  IndexOutput tvx, tvf, tvd;              // To write term vectors
-  FieldsWriter fieldsWriter;              // To write stored fields
-
   String segment;                         // Current segment we are working on
-  String docStoreSegment;                 // Current doc-store segment we are writing
+  private String docStoreSegment;         // Current doc-store segment we are writing
   private int docStoreOffset;                     // Current starting doc-store offset of current segment
 
   private int nextDocID;                          // Next docID to be added
   private int numDocsInRAM;                       // # docs buffered in RAM
   int numDocsInStore;                     // # docs written to doc stores
-  private int nextWriteDocID;                     // Next docID to be written
 
   // Max # ThreadState instances; if there are more threads
   // than this they share ThreadStates
   private final static int MAX_THREAD_STATE = 5;
   private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
   private final HashMap threadBindings = new HashMap();
-  private int numWaiting;
-  private final DocumentsWriterThreadState[] waitingThreadStates = new DocumentsWriterThreadState[MAX_THREAD_STATE];
-  private int pauseThreads;                       // Non-zero when we need all threads to
-                                                  // pause (eg to flush)
+
+  private int pauseThreads;               // Non-zero when we need all threads to
+                                          // pause (eg to flush)
   boolean flushPending;                   // True when a thread has decided to flush
   boolean bufferIsFull;                   // True when it's time to write segment
-  private int abortCount;                         // Non-zero while abort is pending or running
+  private boolean aborting;               // True if an abort is pending
 
   PrintStream infoStream;
-
-  boolean hasNorms;                       // Whether any norms were seen since last flush
+  int maxFieldLength = IndexWriter.DEFAULT_MAX_FIELD_LENGTH;
+  Similarity similarity;
 
   List newFiles;
 
+  static class DocState {
+    DocumentsWriter docWriter;
+    Analyzer analyzer;
+    int maxFieldLength;
+    PrintStream infoStream;
+    Similarity similarity;
+    int docID;
+    Document doc;
+    String maxTermPrefix;
+
+    // Only called by asserts
+    public boolean testPoint(String name) {
+      return docWriter.writer.testPoint(name);
+    }
+  }
+
+  static class FlushState {
+    DocumentsWriter docWriter;
+    Directory directory;
+    String segmentName;
+    String docStoreSegmentName;
+    int numDocsInRAM;
+    int numDocsInStore;
+    Collection flushedFiles;
+
+    public String segmentFileName(String ext) {
+      return segmentName + "." + ext;
+    }
+  }
+
+  /** Consumer returns this on each doc.  This holds any
+   *  state that must be flushed synchronized "in docID
+   *  order".  We gather these and flush them in order. */
+  abstract static class DocWriter {
+    DocWriter next;
+    int docID;
+    abstract void finish() throws IOException;
+    abstract void abort();
+    abstract long sizeInBytes();
+
+    void setNext(DocWriter next) {
+      this.next = next;
+    }
+  };
+
+  final DocConsumer consumer;
+
   // Deletes done after the last flush; these are discarded
   // on abort
   private BufferedDeletes deletesInRAM = new BufferedDeletes();
@@ -175,8 +200,15 @@
   // How much RAM we can use before flushing.  This is 0 if
   // we are flushing by doc count instead.
   private long ramBufferSize = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024);
+  private long waitQueuePauseBytes = (long) (ramBufferSize*0.1);
+  private long waitQueueResumeBytes = (long) (ramBufferSize*0.05);
+
+  // If we've allocated 5% over our RAM budget, we then
+  // free down to 95%
+  private long freeTrigger = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024*1.05);
+  private long freeLevel = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024*0.95);
 
-  // Flush @ this number of docs.  If rarmBufferSize is
+  // Flush @ this number of docs.  If ramBufferSize is
   // non-zero we will flush by RAM usage instead.
   private int maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS;
 
@@ -194,34 +226,80 @@
 
   private boolean closed;
 
-  // Coarse estimates used to measure RAM usage of buffered deletes
-  private static int OBJECT_HEADER_BYTES = 8;
-
-  BufferedNorms[] norms = new BufferedNorms[0];   // Holds norms until we flush
-
   DocumentsWriter(Directory directory, IndexWriter writer) throws IOException {
     this.directory = directory;
     this.writer = writer;
-    flushedDocCount = writer.docCount();
-    postingsFreeList = new Posting[0];
+    this.similarity = writer.getSimilarity();
+    flushedDocCount = writer.maxDoc();
+
+    /*
+      This is the current indexing chain:
+
+      DocConsumer / DocConsumerPerThread
+        --> code: DocFieldProcessor / DocFieldProcessorPerThread
+          --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
+            --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
+              --> code: DocInverter / DocInverterPerThread / DocInverterPerField
+                --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
+                  --> code: TermsHash / TermsHashPerThread / TermsHashPerField
+                    --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
+                      --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
+                      --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
+                --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
+                  --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
+              --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
+    */
+
+    // TODO FI: this should be something the user can pass in
+    // Build up indexing chain:
+    final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(this);
+    final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
+
+    final InvertedDocConsumer  termsHash = new TermsHash(this, true, freqProxWriter,
+                                                         new TermsHash(this, false, termVectorsWriter, null));
+    final NormsWriter normsWriter = new NormsWriter();
+    final DocInverter docInverter = new DocInverter(termsHash, normsWriter);
+    final StoredFieldsWriter fieldsWriter = new StoredFieldsWriter(this);
+    final DocFieldConsumers docFieldConsumers = new DocFieldConsumers(docInverter, fieldsWriter);
+    consumer = new DocFieldProcessor(this, docFieldConsumers);
   }
 
   /** If non-null, various details of indexing are printed
    *  here. */
-  void setInfoStream(PrintStream infoStream) {
+  synchronized void setInfoStream(PrintStream infoStream) {
     this.infoStream = infoStream;
+    for(int i=0;i<threadStates.length;i++)
+      threadStates[i].docState.infoStream = infoStream;
+  }
+
+  synchronized void setMaxFieldLength(int maxFieldLength) {
+    this.maxFieldLength = maxFieldLength;
+    for(int i=0;i<threadStates.length;i++)
+      threadStates[i].docState.maxFieldLength = maxFieldLength;
+  }
+
+  synchronized void setSimilarity(Similarity similarity) {
+    this.similarity = similarity;
+    for(int i=0;i<threadStates.length;i++)
+      threadStates[i].docState.similarity = similarity;
   }
 
   /** Set how much RAM we can use before flushing. */
-  void setRAMBufferSizeMB(double mb) {
+  synchronized void setRAMBufferSizeMB(double mb) {
     if (mb == IndexWriter.DISABLE_AUTO_FLUSH) {
       ramBufferSize = IndexWriter.DISABLE_AUTO_FLUSH;
+      waitQueuePauseBytes = 4*1024*1024;
+      waitQueueResumeBytes = 2*1024*1024;
     } else {
       ramBufferSize = (long) (mb*1024*1024);
+      waitQueuePauseBytes = (long) (ramBufferSize*0.1);
+      waitQueueResumeBytes = (long) (ramBufferSize*0.05);
+      freeTrigger = (long) (1.05 * ramBufferSize);
+      freeLevel = (long) (0.95 * ramBufferSize);
     }
   }
 
-  double getRAMBufferSizeMB() {
+  synchronized double getRAMBufferSizeMB() {
     if (ramBufferSize == IndexWriter.DISABLE_AUTO_FLUSH) {
       return ramBufferSize;
     } else {
@@ -252,7 +330,7 @@
   /** Returns the current doc store segment we are writing
    *  to.  This will be the same as segment when autoCommit
    *  * is true. */
-  String getDocStoreSegment() {
+  synchronized String getDocStoreSegment() {
     return docStoreSegment;
   }
 
@@ -265,51 +343,40 @@
   /** Closes the current open doc stores an returns the doc
    *  store segment name.  This returns null if there are *
    *  no buffered documents. */
-  String closeDocStore() throws IOException {
-
+  synchronized String closeDocStore() throws IOException {
+    
     assert allThreadsIdle();
 
-    List flushedFiles = files();
-
     if (infoStream != null)
-      message("closeDocStore: " + flushedFiles.size() + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore);
+      message("closeDocStore: " + openFiles.size() + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore);
+    
+    boolean success = false;
 
-    if (flushedFiles.size() > 0) {
-      files = null;
+    try {
+      initFlushState(true);
+      closedFiles.clear();
 
-      if (tvx != null) {
-        // At least one doc in this run had term vectors enabled
-        assert docStoreSegment != null;
-        tvx.close();
-        tvf.close();
-        tvd.close();
-        tvx = null;
-        assert 4+numDocsInStore*16 == directory.fileLength(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION):
-          "after flush: tvx size mismatch: " + numDocsInStore + " docs vs " + directory.fileLength(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION) + " length in bytes of " + docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION;
-      }
-
-      if (fieldsWriter != null) {
-        assert docStoreSegment != null;
-        fieldsWriter.close();
-        fieldsWriter = null;
-        assert 4+numDocsInStore*8 == directory.fileLength(docStoreSegment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION):
-          "after flush: fdx size mismatch: " + numDocsInStore + " docs vs " + directory.fileLength(docStoreSegment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION) + " length in bytes of " + docStoreSegment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION;
-      }
+      consumer.closeDocStore(flushState);
+      assert 0 == openFiles.size();
 
       String s = docStoreSegment;
       docStoreSegment = null;
       docStoreOffset = 0;
       numDocsInStore = 0;
+      success = true;
       return s;
-    } else {
-      return null;
+    } finally {
+      if (!success) {
+        abort();
+      }
     }
   }
 
-  List files = null;                      // Cached list of files we've created
-  private List abortedFiles = null;               // List of files that were written before last abort()
+  private Collection abortedFiles;               // List of files that were written before last abort()
 
-  List abortedFiles() {
+  private FlushState flushState;
+
+  Collection abortedFiles() {
     return abortedFiles;
   }
 
@@ -317,186 +384,106 @@
     writer.message("DW: " + message);
   }
 
-  /* Returns list of files in use by this instance,
-   * including any flushed segments. */
-  synchronized List files() {
-
-    if (files != null)
-      return files;
+  final List openFiles = new ArrayList();
+  final List closedFiles = new ArrayList();
 
-    files = new ArrayList();
+  /* Returns Collection of files in use by this instance,
+   * including any flushed segments. */
+  synchronized List openFiles() {
+    return (List) ((ArrayList) openFiles).clone();
+  }
 
-    // Stored fields:
-    if (fieldsWriter != null) {
-      assert docStoreSegment != null;
-      files.add(docStoreSegment + "." + IndexFileNames.FIELDS_EXTENSION);
-      files.add(docStoreSegment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION);
-    }
+  synchronized List closedFiles() {
+    return (List) ((ArrayList) closedFiles).clone();
+  }
 
-    // Vectors:
-    if (tvx != null) {
-      assert docStoreSegment != null;
-      files.add(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION);
-      files.add(docStoreSegment + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION);
-      files.add(docStoreSegment + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);
-    }
+  synchronized void addOpenFile(String name) {
+    assert !openFiles.contains(name);
+    openFiles.add(name);
+  }
 
-    return files;
+  synchronized void removeOpenFile(String name) {
+    assert openFiles.contains(name);
+    openFiles.remove(name);
+    closedFiles.add(name);
   }
 
   synchronized void setAborting() {
-    abortCount++;
+    aborting = true;
   }
 
   /** Called if we hit an exception at a bad time (when
    *  updating the index files) and must discard all
    *  currently buffered docs.  This resets our state,
-   *  discarding any docs added since last flush.  If ae is
-   *  non-null, it contains the root cause exception (which
-   *  we re-throw after we are done aborting). */
-  synchronized void abort(AbortException ae) throws IOException {
-
-    // Anywhere that throws an AbortException must first
-    // mark aborting to make sure while the exception is
-    // unwinding the un-synchronized stack, no thread grabs
-    // the corrupt ThreadState that hit the aborting
-    // exception:
-    assert ae == null || abortCount>0;
+   *  discarding any docs added since last flush. */
+  synchronized void abort() throws IOException {
 
     try {
-
-      if (infoStream != null)
-        message("docWriter: now abort");
+      message("docWriter: now abort");
 
       // Forcefully remove waiting ThreadStates from line
-      for(int i=0;i<numWaiting;i++)
-        waitingThreadStates[i].isIdle = true;
-      numWaiting = 0;
+      waitQueue.abort();
 
-      // Wait for all other threads to finish with DocumentsWriter:
+      // Wait for all other threads to finish with
+      // DocumentsWriter:
       pauseAllThreads();
 
-      assert 0 == numWaiting;
-
       try {
 
-        deletesInRAM.clear();
+        assert 0 == waitQueue.numWaiting;
+
+        waitQueue.waitingBytes = 0;
 
         try {
-          abortedFiles = files();
+          abortedFiles = openFiles();
         } catch (Throwable t) {
           abortedFiles = null;
         }
 
-        docStoreSegment = null;
-        numDocsInStore = 0;
-        docStoreOffset = 0;
-        files = null;
+        deletesInRAM.clear();
 
-        // Clear vectors & fields from ThreadStates
-        for(int i=0;i<threadStates.length;i++) {
-          DocumentsWriterThreadState state = threadStates[i];
-          state.tvfLocal.reset();
-          state.fdtLocal.reset();
-          if (state.localFieldsWriter != null) {
-            try {
-              state.localFieldsWriter.close();
-            } catch (Throwable t) {
-            }
-            state.localFieldsWriter = null;
-          }
-        }
+        openFiles.clear();
 
-        // Reset vectors writer
-        if (tvx != null) {
+        for(int i=0;i<threadStates.length;i++)
           try {
-            tvx.close();
+            threadStates[i].consumer.abort();
           } catch (Throwable t) {
           }
-          tvx = null;
-        }
-        if (tvd != null) {
-          try {
-            tvd.close();
-          } catch (Throwable t) {
-          }
-          tvd = null;
-        }
-        if (tvf != null) {
-          try {
-            tvf.close();
-          } catch (Throwable t) {
-          }
-          tvf = null;
-        }
 
-        // Reset fields writer
-        if (fieldsWriter != null) {
-          try {
-            fieldsWriter.close();
-          } catch (Throwable t) {
-          }
-          fieldsWriter = null;
+        try {
+          consumer.abort();
+        } catch (Throwable t) {
         }
 
-        // Discard pending norms:
-        final int numField = fieldInfos.size();
-        for (int i=0;i<numField;i++) {
-          FieldInfo fi = fieldInfos.fieldInfo(i);
-          if (fi.isIndexed && !fi.omitNorms) {
-            BufferedNorms n = norms[i];
-            if (n != null)
-              try {
-                n.reset();
-              } catch (Throwable t) {
-              }
-          }
-        }
+        docStoreSegment = null;
+        numDocsInStore = 0;
+        docStoreOffset = 0;
 
         // Reset all postings data
-        resetPostingsData();
+        doAfterFlush();
 
       } finally {
         resumeAllThreads();
       }
-
-      // If we have a root cause exception, re-throw it now:
-      if (ae != null) {
-        Throwable t = ae.getCause();
-        if (t instanceof IOException)
-          throw (IOException) t;
-        else if (t instanceof RuntimeException)
-          throw (RuntimeException) t;
-        else if (t instanceof Error)
-          throw (Error) t;
-        else
-          // Should not get here
-          assert false: "unknown exception: " + t;
-      }
     } finally {
-      if (ae != null)
-        abortCount--;
+      aborting = false;
       notifyAll();
     }
   }
 
   /** Reset after a flush */
-  private void resetPostingsData() throws IOException {
+  private void doAfterFlush() throws IOException {
     // All ThreadStates should be idle when we are called
     assert allThreadsIdle();
     threadBindings.clear();
+    waitQueue.reset();
     segment = null;
     numDocsInRAM = 0;
     nextDocID = 0;
-    nextWriteDocID = 0;
-    files = null;
-    balanceRAM();
     bufferIsFull = false;
     flushPending = false;
-    for(int i=0;i<threadStates.length;i++) {
-      threadStates[i].numThreads = 0;
-      threadStates[i].resetPostings();
-    }
+    for(int i=0;i<threadStates.length;i++)
+      threadStates[i].doAfterFlush();
     numBytesUsed = 0;
   }
 
@@ -510,7 +497,8 @@
         Thread.currentThread().interrupt();
       }
     }
-    return abortCount > 0;
+
+    return aborting;
   }
 
   synchronized void resumeAllThreads() {
@@ -527,71 +515,98 @@
     return true;
   }
 
+  synchronized private void initFlushState(boolean onlyDocStore) {
+    initSegmentName(onlyDocStore);
+
+    if (flushState == null) {
+      flushState = new FlushState();
+      flushState.directory = directory;
+      flushState.docWriter = this;
+    }
+
+    flushState.docStoreSegmentName = docStoreSegment;
+    flushState.segmentName = segment;
+    flushState.numDocsInRAM = numDocsInRAM;
+    flushState.numDocsInStore = numDocsInStore;
+    flushState.flushedFiles = new HashSet();
+  }
+
   /** Flush all pending docs to a new segment */
   synchronized int flush(boolean closeDocStore) throws IOException {
 
     assert allThreadsIdle();
 
-    if (segment == null)
-      // In case we are asked to flush an empty segment
-      segment = writer.newSegmentName();
-
-    newFiles = new ArrayList();
+    assert numDocsInRAM > 0;
 
-    docStoreOffset = numDocsInStore;
+    assert nextDocID == numDocsInRAM;
+    assert waitQueue.numWaiting == 0;
+    assert waitQueue.waitingBytes == 0;
 
-    int docCount;
+    initFlushState(false);
 
-    assert numDocsInRAM > 0;
+    docStoreOffset = numDocsInStore;
 
     if (infoStream != null)
-      message("flush postings as segment " + segment + " numDocs=" + numDocsInRAM);
+      message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM);
     
     boolean success = false;
 
     try {
 
       if (closeDocStore) {
-        assert docStoreSegment != null;
-        assert docStoreSegment.equals(segment);
-        newFiles.addAll(files());
+        assert flushState.docStoreSegmentName != null;
+        assert flushState.docStoreSegmentName.equals(flushState.segmentName);
         closeDocStore();
+        flushState.numDocsInStore = 0;
       }
-    
-      fieldInfos.write(directory, segment + ".fnm");
 
-      docCount = numDocsInRAM;
+      Collection threads = new HashSet();
+      for(int i=0;i<threadStates.length;i++)
+        threads.add(threadStates[i].consumer);
+      consumer.flush(threads, flushState);
 
-      newFiles.addAll(writeSegment());
+      if (infoStream != null) {
+        final long newSegmentSize = segmentSize(flushState.segmentName);
+        String message = "  oldRAMSize=" + numBytesUsed +
+          " newFlushedSize=" + newSegmentSize +
+          " docs/MB=" + nf.format(numDocsInRAM/(newSegmentSize/1024./1024.)) +
+          " new/old=" + nf.format(100.0*newSegmentSize/numBytesUsed) + "%";
+        message(message);
+      }
 
-      flushedDocCount += docCount;
+      flushedDocCount += flushState.numDocsInRAM;
+
+      doAfterFlush();
 
       success = true;
 
     } finally {
-      if (!success)
-        abort(null);
+      if (!success) {
+        abort();
+      }
     }
 
-    return docCount;
+    assert waitQueue.waitingBytes == 0;
+
+    return flushState.numDocsInRAM;
   }
 
   /** Build compound file for the segment we just flushed */
-  void createCompoundFile(String segment) throws IOException
-  {
+  void createCompoundFile(String segment) throws IOException {
+    
     CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, segment + "." + IndexFileNames.COMPOUND_FILE_EXTENSION);
-    final int size = newFiles.size();
-    for(int i=0;i<size;i++)
-      cfsWriter.addFile((String) newFiles.get(i));
+    Iterator it = flushState.flushedFiles.iterator();
+    while(it.hasNext())
+      cfsWriter.addFile((String) it.next());
       
     // Perform the merge
     cfsWriter.close();
   }
 
   /** Set flushPending if it is not already set and returns
-   *  whether it was set. This is used by IndexWriter to *
+   *  whether it was set. This is used by IndexWriter to
    *  trigger a single flush even when multiple threads are
-   *  * trying to do so. */
+   *  trying to do so. */
   synchronized boolean setFlushPending() {
     if (flushPending)
       return false;
@@ -605,350 +620,26 @@
     flushPending = false;
   }
 
-  private static final byte defaultNorm = Similarity.encodeNorm(1.0f);
-
-  /** Write norms in the "true" segment format.  This is
-   *  called only during commit, to create the .nrm file. */
-  void writeNorms(String segmentName, int totalNumDoc) throws IOException {
-
-    IndexOutput normsOut = directory.createOutput(segmentName + "." + IndexFileNames.NORMS_EXTENSION);
-
-    try {
-      normsOut.writeBytes(SegmentMerger.NORMS_HEADER, 0, SegmentMerger.NORMS_HEADER.length);
-
-      final int numField = fieldInfos.size();
-
-      for (int fieldIdx=0;fieldIdx<numField;fieldIdx++) {
-        FieldInfo fi = fieldInfos.fieldInfo(fieldIdx);
-        if (fi.isIndexed && !fi.omitNorms) {
-          BufferedNorms n = norms[fieldIdx];
-          final long v;
-          if (n == null)
-            v = 0;
-          else {
-            v = n.out.getFilePointer();
-            n.out.writeTo(normsOut);
-            n.reset();
-          }
-          if (v < totalNumDoc)
-            fillBytes(normsOut, defaultNorm, (int) (totalNumDoc-v));
-        }
-      }
-    } finally {
-      normsOut.close();
-    }
-  }
-
-  private DefaultSkipListWriter skipListWriter = null;
-
-  private boolean currentFieldStorePayloads;
-
-  /** Creates a segment from all Postings in the Postings
-   *  hashes across all ThreadStates & FieldDatas. */
-  private List writeSegment() throws IOException {
-
-    assert allThreadsIdle();
-
-    assert nextDocID == numDocsInRAM;
-
-    final String segmentName;
-
-    segmentName = segment;
-
-    TermInfosWriter termsOut = new TermInfosWriter(directory, segmentName, fieldInfos,
-                                                   writer.getTermIndexInterval());
-
-    IndexOutput freqOut = directory.createOutput(segmentName + ".frq");
-    IndexOutput proxOut = directory.createOutput(segmentName + ".prx");
-
-    // Gather all FieldData's that have postings, across all
-    // ThreadStates
-    ArrayList allFields = new ArrayList();
-    assert allThreadsIdle();
-    for(int i=0;i<threadStates.length;i++) {
-      DocumentsWriterThreadState state = threadStates[i];
-      state.trimFields();
-      final int numFields = state.numAllFieldData;
-      for(int j=0;j<numFields;j++) {
-        DocumentsWriterFieldData fp = state.allFieldDataArray[j];
-        if (fp.numPostings > 0)
-          allFields.add(fp);
-      }
-    }
-
-    // Sort by field name
-    Collections.sort(allFields);
-    final int numAllFields = allFields.size();
-
-    skipListWriter = new DefaultSkipListWriter(termsOut.skipInterval,
-                                               termsOut.maxSkipLevels,
-                                               numDocsInRAM, freqOut, proxOut);
-
-    int start = 0;
-    while(start < numAllFields) {
-
-      final String fieldName = ((DocumentsWriterFieldData) allFields.get(start)).fieldInfo.name;
-
-      int end = start+1;
-      while(end < numAllFields && ((DocumentsWriterFieldData) allFields.get(end)).fieldInfo.name.equals(fieldName))
-        end++;
-      
-      DocumentsWriterFieldData[] fields = new DocumentsWriterFieldData[end-start];
-      for(int i=start;i<end;i++)
-        fields[i-start] = (DocumentsWriterFieldData) allFields.get(i);
-
-      // If this field has postings then add them to the
-      // segment
-      appendPostings(fields, termsOut, freqOut, proxOut);
-
-      for(int i=0;i<fields.length;i++)
-        fields[i].resetPostingArrays();
-
-      start = end;
-    }
-
-    freqOut.close();
-    proxOut.close();
-    termsOut.close();
-    
-    // Record all files we have flushed
-    List flushedFiles = new ArrayList();
-    flushedFiles.add(segmentFileName(IndexFileNames.FIELD_INFOS_EXTENSION));
-    flushedFiles.add(segmentFileName(IndexFileNames.FREQ_EXTENSION));
-    flushedFiles.add(segmentFileName(IndexFileNames.PROX_EXTENSION));
-    flushedFiles.add(segmentFileName(IndexFileNames.TERMS_EXTENSION));
-    flushedFiles.add(segmentFileName(IndexFileNames.TERMS_INDEX_EXTENSION));
-
-    if (hasNorms) {
-      writeNorms(segmentName, numDocsInRAM);
-      flushedFiles.add(segmentFileName(IndexFileNames.NORMS_EXTENSION));
-    }
-
-    if (infoStream != null) {
-      final long newSegmentSize = segmentSize(segmentName);
-      String message = "  oldRAMSize=" + numBytesUsed + " newFlushedSize=" + newSegmentSize + " docs/MB=" + nf.format(numDocsInRAM/(newSegmentSize/1024./1024.)) + " new/old=" + nf.format(100.0*newSegmentSize/numBytesUsed) + "%";
-      message(message);
-    }
-
-    resetPostingsData();
-    
-    // Maybe downsize postingsFreeList array
-    if (postingsFreeList.length > 1.5*postingsFreeCount) {
-      int newSize = postingsFreeList.length;
-      while(newSize > 1.25*postingsFreeCount) {
-        newSize = (int) (newSize*0.8);
-      }
-      Posting[] newArray = new Posting[newSize];
-      System.arraycopy(postingsFreeList, 0, newArray, 0, postingsFreeCount);
-      postingsFreeList = newArray;
-    }
-
-    return flushedFiles;
-  }
-
   synchronized void pushDeletes() {
     deletesFlushed.update(deletesInRAM);
   }
 
-  /** Returns the name of the file with this extension, on
-   *  the current segment we are working on. */
-  private String segmentFileName(String extension) {
-    return segment + "." + extension;
-  }
-
-  private static int compareText(final char[] text1, int pos1, final char[] text2, int pos2) {
-    while(true) {
-      final char c1 = text1[pos1++];
-      final char c2 = text2[pos2++];
-      if (c1 != c2) {
-        if (0xffff == c2)
-          return 1;
-        else if (0xffff == c1)
-          return -1;
-        else
-          return c1-c2;
-      } else if (0xffff == c1)
-        return 0;
-    }
+  synchronized void close() {
+    closed = true;
+    notifyAll();
   }
 
-  private final TermInfo termInfo = new TermInfo(); // minimize consing
-
-  final UnicodeUtil.UTF8Result termsUTF8 = new UnicodeUtil.UTF8Result();
-
-  /* Walk through all unique text tokens (Posting
-   * instances) found in this field and serialize them
-   * into a single RAM segment. */
-  void appendPostings(DocumentsWriterFieldData[] fields,
-                      TermInfosWriter termsOut,
-                      IndexOutput freqOut,
-                      IndexOutput proxOut)
-    throws CorruptIndexException, IOException {
-
-    final int fieldNumber = fields[0].fieldInfo.number;
-    int numFields = fields.length;
-
-    final DocumentsWriterFieldMergeState[] mergeStates = new DocumentsWriterFieldMergeState[numFields];
-
-    for(int i=0;i<numFields;i++) {
-      DocumentsWriterFieldMergeState fms = mergeStates[i] = new DocumentsWriterFieldMergeState();
-      fms.field = fields[i];
-      fms.postings = fms.field.sortPostings();
-
-      assert fms.field.fieldInfo == fields[0].fieldInfo;
-
-      // Should always be true
-      boolean result = fms.nextTerm();
-      assert result;
+  synchronized void initSegmentName(boolean onlyDocStore) {
+    if (segment == null && (!onlyDocStore || docStoreSegment == null)) {
+      segment = writer.newSegmentName();
+      assert numDocsInRAM == 0;
     }
-
-    final int skipInterval = termsOut.skipInterval;
-    currentFieldStorePayloads = fields[0].fieldInfo.storePayloads;
-
-    DocumentsWriterFieldMergeState[] termStates = new DocumentsWriterFieldMergeState[numFields];
-
-    while(numFields > 0) {
-
-      // Get the next term to merge
-      termStates[0] = mergeStates[0];
-      int numToMerge = 1;
-
-      for(int i=1;i<numFields;i++) {
-        final char[] text = mergeStates[i].text;
-        final int textOffset = mergeStates[i].textOffset;
-        final int cmp = compareText(text, textOffset, termStates[0].text, termStates[0].textOffset);
-
-        if (cmp < 0) {
-          termStates[0] = mergeStates[i];
-          numToMerge = 1;
-        } else if (cmp == 0)
-          termStates[numToMerge++] = mergeStates[i];
-      }
-
-      int df = 0;
-      int lastPayloadLength = -1;
-
-      int lastDoc = 0;
-
-      final char[] text = termStates[0].text;
-      final int start = termStates[0].textOffset;
-
-      long freqPointer = freqOut.getFilePointer();
-      long proxPointer = proxOut.getFilePointer();
-
-      skipListWriter.resetSkip();
-
-      // Now termStates has numToMerge FieldMergeStates
-      // which all share the same term.  Now we must
-      // interleave the docID streams.
-      while(numToMerge > 0) {
-        
-        if ((++df % skipInterval) == 0) {
-          skipListWriter.setSkipData(lastDoc, currentFieldStorePayloads, lastPayloadLength);
-          skipListWriter.bufferSkip(df);
-        }
-
-        DocumentsWriterFieldMergeState minState = termStates[0];
-        for(int i=1;i<numToMerge;i++)
-          if (termStates[i].docID < minState.docID)
-            minState = termStates[i];
-
-        final int doc = minState.docID;
-        final int termDocFreq = minState.termFreq;
-
-        assert doc < numDocsInRAM;
-        assert doc > lastDoc || df == 1;
-
-        final int newDocCode = (doc-lastDoc)<<1;
-        lastDoc = doc;
-
-        final ByteSliceReader prox = minState.prox;
-
-        // Carefully copy over the prox + payload info,
-        // changing the format to match Lucene's segment
-        // format.
-        for(int j=0;j<termDocFreq;j++) {
-          final int code = prox.readVInt();
-          if (currentFieldStorePayloads) {
-            final int payloadLength;
-            if ((code & 1) != 0) {
-              // This position has a payload
-              payloadLength = prox.readVInt();
-            } else
-              payloadLength = 0;
-            if (payloadLength != lastPayloadLength) {
-              proxOut.writeVInt(code|1);
-              proxOut.writeVInt(payloadLength);
-              lastPayloadLength = payloadLength;
-            } else
-              proxOut.writeVInt(code & (~1));
-            if (payloadLength > 0)
-              copyBytes(prox, proxOut, payloadLength);
-          } else {
-            assert 0 == (code & 1);
-            proxOut.writeVInt(code>>1);
-          }
-        }
-
-        if (1 == termDocFreq) {
-          freqOut.writeVInt(newDocCode|1);
-        } else {
-          freqOut.writeVInt(newDocCode);
-          freqOut.writeVInt(termDocFreq);
-        }
-
-        if (!minState.nextDoc()) {
-
-          // Remove from termStates
-          int upto = 0;
-          for(int i=0;i<numToMerge;i++)
-            if (termStates[i] != minState)
-              termStates[upto++] = termStates[i];
-          numToMerge--;
-          assert upto == numToMerge;
-
-          // Advance this state to the next term
-
-          if (!minState.nextTerm()) {
-            // OK, no more terms, so remove from mergeStates
-            // as well
-            upto = 0;
-            for(int i=0;i<numFields;i++)
-              if (mergeStates[i] != minState)
-                mergeStates[upto++] = mergeStates[i];
-            numFields--;
-            assert upto == numFields;
-          }
-        }
-      }
-
-      assert df > 0;
-
-      // Done merging this term
-
-      long skipPointer = skipListWriter.writeSkip(freqOut);
-
-      // Write term
-      termInfo.set(df, freqPointer, proxPointer, (int) (skipPointer - freqPointer));
-
-      // TODO: we could do this incrementally
-      UnicodeUtil.UTF16toUTF8(text, start, termsUTF8);
-
-      // TODO: we could save O(n) re-scan of the term by
-      // computing the shared prefix with the last term
-      // while during the UTF8 encoding
-      termsOut.add(fieldNumber,
-                   termsUTF8.result,
-                   termsUTF8.length,
-                   termInfo);
+    if (docStoreSegment == null) {
+      docStoreSegment = segment;
+      assert numDocsInStore == 0;
     }
   }
 
-  synchronized void close() {
-    closed = true;
-    notifyAll();
-  }
-
   /** Returns a free (idle) ThreadState that may be used for
    * indexing this one document.  This call also pauses if a
    * flush is pending.  If delTerm is non-null then we
@@ -961,14 +652,16 @@
     // again.
     DocumentsWriterThreadState state = (DocumentsWriterThreadState) threadBindings.get(Thread.currentThread());
     if (state == null) {
-      // First time this thread has called us since last flush
+
+      // First time this thread has called us since last
+      // flush.  Find the least loaded thread state:
       DocumentsWriterThreadState minThreadState = null;
       for(int i=0;i<threadStates.length;i++) {
         DocumentsWriterThreadState ts = threadStates[i];
         if (minThreadState == null || ts.numThreads < minThreadState.numThreads)
           minThreadState = ts;
       }
-      if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length == MAX_THREAD_STATE)) {
+      if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length >= MAX_THREAD_STATE)) {
         state = minThreadState;
         state.numThreads++;
       } else {
@@ -987,46 +680,49 @@
     // not be paused nor a flush pending:
     waitReady(state);
 
-    if (segment == null)
-      segment = writer.newSegmentName();
+    // Allocate segment name if this is the first doc since
+    // last flush:
+    initSegmentName(false);
 
     state.isIdle = false;
 
+    boolean success = false;
     try {
-      boolean success = false;
-      try {
-        state.init(doc, nextDocID);
-        if (delTerm != null) {
-          addDeleteTerm(delTerm, state.docID);
-          state.doFlushAfter = timeToFlushDeletes();
-        }
-        // Only increment nextDocID & numDocsInRAM on successful init
-        nextDocID++;
-        numDocsInRAM++;
-
-        // We must at this point commit to flushing to ensure we
-        // always get N docs when we flush by doc count, even if
-        // > 1 thread is adding documents:
-        if (!flushPending && maxBufferedDocs != IndexWriter.DISABLE_AUTO_FLUSH
-            && numDocsInRAM >= maxBufferedDocs) {
-          flushPending = true;
-          state.doFlushAfter = true;
-        }
+      state.docState.docID = nextDocID;
 
-        success = true;
-      } finally {
-        if (!success) {
-          // Forcefully idle this ThreadState:
-          state.isIdle = true;
-          notifyAll();
-          if (state.doFlushAfter) {
-            state.doFlushAfter = false;
-            flushPending = false;
-          }
+      assert writer.testPoint("DocumentsWriter.ThreadState.init start");
+
+      if (delTerm != null) {
+        addDeleteTerm(delTerm, state.docState.docID);
+        state.doFlushAfter = timeToFlushDeletes();
+      }
+
+      assert writer.testPoint("DocumentsWriter.ThreadState.init after delTerm");
+
+      nextDocID++;
+      numDocsInRAM++;
+
+      // We must at this point commit to flushing to ensure we
+      // always get N docs when we flush by doc count, even if
+      // > 1 thread is adding documents:
+      if (!flushPending &&
+          maxBufferedDocs != IndexWriter.DISABLE_AUTO_FLUSH
+          && numDocsInRAM >= maxBufferedDocs) {
+        flushPending = true;
+        state.doFlushAfter = true;
+      }
+
+      success = true;
+    } finally {
+      if (!success) {
+        // Forcefully idle this ThreadState:
+        state.isIdle = true;
+        notifyAll();
+        if (state.doFlushAfter) {
+          state.doFlushAfter = false;
+          flushPending = false;
         }
       }
-    } catch (AbortException ae) {
-      abort(ae);
     }
 
     return state;
@@ -1049,25 +745,45 @@
 
     // This call is synchronized but fast
     final DocumentsWriterThreadState state = getThreadState(doc, delTerm);
+
+    final DocState docState = state.docState;
+    docState.doc = doc;
+    docState.analyzer = analyzer;
+
+    boolean success = false;
     try {
-      boolean success = false;
-      try {
-        try {
-          // This call is not synchronized and does all the work
-          state.processDocument(analyzer);
-        } finally {
-          // Note that we must call finishDocument even on
-          // exception, because for a non-aborting
-          // exception, a portion of the document has been
-          // indexed (and its ID is marked for deletion), so
-          // all index files must be updated to record this
-          // document.  This call is synchronized but fast.
-          finishDocument(state);
-        }
-        success = true;
-      } finally {
-        if (!success) {
-          synchronized(this) {
+      // This call is not synchronized and does all the
+      // work
+      final DocWriter perDoc = state.consumer.processDocument();
+        
+      // This call is synchronized but fast
+      finishDocument(state, perDoc);
+      success = true;
+    } finally {
+      if (!success) {
+        synchronized(this) {
+
+          if (aborting) {
+            state.isIdle = true;
+            notifyAll();
+            abort();
+          } else {
+            skipDocWriter.docID = docState.docID;
+            boolean success2 = false;
+            try {
+              waitQueue.add(skipDocWriter);
+              success2 = true;
+            } finally {
+              if (!success2) {
+                state.isIdle = true;
+                notifyAll();
+                abort();
+                return false;
+              }
+            }
+
+            state.isIdle = true;
+            notifyAll();
 
             // If this thread state had decided to flush, we
             // must clear it so another thread can flush
@@ -1081,12 +797,10 @@
             // since likely it was partially added.  This
             // keeps indexing as "all or none" (atomic) when
             // adding a document:
-            addDeleteDocID(state.docID);
+            addDeleteDocID(state.docState.docID);
           }
         }
       }
-    } catch (AbortException ae) {
-      abort(ae);
     }
 
     return state.doFlushAfter || timeToFlushDeletes();
@@ -1114,12 +828,14 @@
   }
 
   synchronized private void waitReady(DocumentsWriterThreadState state) {
-    while(!closed && ((state != null && !state.isIdle) || pauseThreads != 0 || flushPending || abortCount > 0))
+
+    while (!closed && ((state != null && !state.isIdle) || pauseThreads != 0 || flushPending || aborting)) {
       try {
         wait();
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
+    }
 
     if (closed)
       throw new AlreadyClosedException("this IndexWriter is closed");
@@ -1297,66 +1013,83 @@
     deletesInRAM.queries.put(query, new Integer(flushedDocCount + docID));
   }
 
+  synchronized boolean doBalanceRAM() {
+    return ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && !bufferIsFull && (numBytesUsed >= ramBufferSize || numBytesAlloc >= freeTrigger);
+  }
+
   /** Does the synchronized work to finish/flush the
-   * inverted document. */
-  private synchronized void finishDocument(DocumentsWriterThreadState state) throws IOException, AbortException {
-    if (abortCount > 0) {
-      // Forcefully idle this threadstate -- its state will
-      // be reset by abort()
-      state.isIdle = true;
-      notifyAll();
-      return;
-    }
+   *  inverted document. */
+  private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException {
 
-    if (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH
-        && numBytesUsed >= ramBufferSize)
+    if (doBalanceRAM())
+      // Must call this w/o holding synchronized(this) else
+      // we'll hit deadlock:
       balanceRAM();
 
-    // Now write the indexed document to the real files.
-    if (nextWriteDocID == state.docID) {
-      // It's my turn, so write everything now:
-      nextWriteDocID++;
-      state.writeDocument();
-      state.isIdle = true;
-      notifyAll();
+    synchronized(this) {
 
-      // If any states were waiting on me, sweep through and
-      // flush those that are enabled by my write.
-      if (numWaiting > 0) {
-        boolean any = true;
-        while(any) {
-          any = false;
-          for(int i=0;i<numWaiting;) {
-            final DocumentsWriterThreadState s = waitingThreadStates[i];
-            if (s.docID == nextWriteDocID) {
-              s.writeDocument();
-              s.isIdle = true;
-              nextWriteDocID++;
-              any = true;
-              if (numWaiting > i+1)
-                // Swap in the last waiting state to fill in
-                // the hole we just created.  It's important
-                // to do this as-we-go and not at the end of
-                // the loop, because if we hit an aborting
-                // exception in one of the s.writeDocument
-                // calls (above), it leaves this array in an
-                // inconsistent state:
-                waitingThreadStates[i] = waitingThreadStates[numWaiting-1];
-              numWaiting--;
-            } else {
-              assert !s.isIdle;
-              i++;
-            }
+      assert docWriter == null || docWriter.docID == perThread.docState.docID;
+
+
+      if (aborting) {
+
+        // We are currently aborting, and another thread is
+        // waiting for me to become idle.  We just forcefully
+        // idle this threadState; it will be fully reset by
+        // abort()
+        if (docWriter != null)
+          try {
+            docWriter.abort();
+          } catch (Throwable t) {
           }
-        }
+
+        perThread.isIdle = true;
+        notifyAll();
+        return;
       }
-    } else {
-      // Another thread got a docID before me, but, it
-      // hasn't finished its processing.  So add myself to
-      // the line but don't hold up this thread.
-      waitingThreadStates[numWaiting++] = state;
+
+      final boolean doPause;
+
+      if (docWriter != null)
+        doPause = waitQueue.add(docWriter);
+      else {
+        skipDocWriter.docID = perThread.docState.docID;
+        doPause = waitQueue.add(skipDocWriter);
+      }
+
+      if (doPause)
+        waitForWaitQueue();
+
+      if (bufferIsFull && !flushPending) {
+        flushPending = true;
+        perThread.doFlushAfter = true;
+      }
+
+      perThread.isIdle = true;
+      notifyAll();
+    }
+  }
+
+  synchronized void waitForWaitQueue() {
+    do {
+      try {
+        wait();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    } while (!waitQueue.doResume());
+  }
+
+  private static class SkipDocWriter extends DocWriter {
+    void finish() {
+    }
+    void abort() {
+    }
+    long sizeInBytes() {
+      return 0;
     }
   }
+  final SkipDocWriter skipDocWriter = new SkipDocWriter();
 
   long getRAMUsed() {
     return numBytesUsed;
@@ -1367,35 +1100,10 @@
 
   NumberFormat nf = NumberFormat.getInstance();
 
-  /* Used only when writing norms to fill in default norm
-   * value into the holes in docID stream for those docs
-   * that didn't have this field. */
-  static void fillBytes(IndexOutput out, byte b, int numBytes) throws IOException {
-    for(int i=0;i<numBytes;i++)
-      out.writeByte(b);
-  }
-
-  final byte[] copyByteBuffer = new byte[4096];
-
-  /** Copy numBytes from srcIn to destIn */
-  void copyBytes(IndexInput srcIn, IndexOutput destIn, long numBytes) throws IOException {
-    // TODO: we could do this more efficiently (save a copy)
-    // because it's always from a ByteSliceReader ->
-    // IndexOutput
-    while(numBytes > 0) {
-      final int chunk;
-      if (numBytes > 4096)
-        chunk = 4096;
-      else
-        chunk = (int) numBytes;
-      srcIn.readBytes(copyByteBuffer, 0, chunk);
-      destIn.writeBytes(copyByteBuffer, 0, chunk);
-      numBytes -= chunk;
-    }
-  }
-
-  // Used only when infoStream != null
+  // TODO FI: this is not flexible -- we can't hardwire
+  // extensions in here:
   private long segmentSize(String segmentName) throws IOException {
+    // Used only when infoStream != null
     assert infoStream != null;
     
     long size = directory.fileLength(segmentName + ".tii") +
@@ -1410,66 +1118,16 @@
     return size;
   }
 
-  final private static int POINTER_NUM_BYTE = 4;
-  final private static int INT_NUM_BYTE = 4;
-  final private static int CHAR_NUM_BYTE = 2;
-
-  // Why + 5*POINTER_NUM_BYTE below?
-  //   1: Posting has "vector" field which is a pointer
-  //   2: Posting is referenced by postingsFreeList array
-  //   3,4,5: Posting is referenced by postings hash, which
-  //          targets 25-50% fill factor; approximate this
-  //          as 3X # pointers
-  final static int POSTING_NUM_BYTE = OBJECT_HEADER_BYTES + 9*INT_NUM_BYTE + 5*POINTER_NUM_BYTE;
-
-  // Holds free pool of Posting instances
-  private Posting[] postingsFreeList;
-  private int postingsFreeCount;
-  private int postingsAllocCount;
-
-  /* Allocate more Postings from shared pool */
-  synchronized void getPostings(Posting[] postings) {
-    numBytesUsed += postings.length * POSTING_NUM_BYTE;
-    final int numToCopy;
-    if (postingsFreeCount < postings.length)
-      numToCopy = postingsFreeCount;
-    else
-      numToCopy = postings.length;
-    final int start = postingsFreeCount-numToCopy;
-    System.arraycopy(postingsFreeList, start,
-                     postings, 0, numToCopy);
-    postingsFreeCount -= numToCopy;
-
-    // Directly allocate the remainder if any
-    if (numToCopy < postings.length) {
-      final int extra = postings.length - numToCopy;
-      final int newPostingsAllocCount = postingsAllocCount + extra;
-      if (newPostingsAllocCount > postingsFreeList.length)
-        postingsFreeList = new Posting[(int) (1.25 * newPostingsAllocCount)];
-
-      balanceRAM();
-      for(int i=numToCopy;i<postings.length;i++) {
-        postings[i] = new Posting();
-        numBytesAlloc += POSTING_NUM_BYTE;
-        postingsAllocCount++;
-      }
-    }
-    assert numBytesUsed <= numBytesAlloc;
-  }
-
-  synchronized void recyclePostings(Posting[] postings, int numPostings) {
-    // Move all Postings from this ThreadState back to our
-    // free list.  We pre-allocated this array while we were
-    // creating Postings to make sure it's large enough
-    assert postingsFreeCount + numPostings <= postingsFreeList.length;
-    System.arraycopy(postings, 0, postingsFreeList, postingsFreeCount, numPostings);
-    postingsFreeCount += numPostings;
-  }
+  // Coarse estimates used to measure RAM usage of buffered deletes
+  final static int OBJECT_HEADER_BYTES = 8;
+  final static int POINTER_NUM_BYTE = 4;
+  final static int INT_NUM_BYTE = 4;
+  final static int CHAR_NUM_BYTE = 2;
 
   /* Initial chunks size of the shared byte[] blocks used to
      store postings data */
   final static int BYTE_BLOCK_SHIFT = 15;
-  final static int BYTE_BLOCK_SIZE = (int) Math.pow(2.0, BYTE_BLOCK_SHIFT);
+  final static int BYTE_BLOCK_SIZE = (int) (1 << BYTE_BLOCK_SHIFT);
   final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
   final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
 
@@ -1483,8 +1141,13 @@
         final int size = freeByteBlocks.size();
         final byte[] b;
         if (0 == size) {
+          // Always record a block allocated, even if
+          // trackAllocations is false.  This is necessary
+          // because this block will be shared between
+          // things that don't track allocations (term
+          // vectors) and things that do (freq/prox
+          // postings).
           numBytesAlloc += BYTE_BLOCK_SIZE;
-          balanceRAM();
           b = new byte[BYTE_BLOCK_SIZE];
         } else
           b = (byte[]) freeByteBlocks.remove(size-1);
@@ -1504,12 +1167,57 @@
     }
   }
 
+  /* Initial chunks size of the shared int[] blocks used to
+     store postings data */
+  final static int INT_BLOCK_SHIFT = 13;
+  final static int INT_BLOCK_SIZE = (int) (1 << INT_BLOCK_SHIFT);
+  final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
+
+  private ArrayList freeIntBlocks = new ArrayList();
+
+  /* Allocate another int[] from the shared pool */
+  synchronized int[] getIntBlock(boolean trackAllocations) {
+    final int size = freeIntBlocks.size();
+    final int[] b;
+    if (0 == size) {
+      // Always record a block allocated, even if
+      // trackAllocations is false.  This is necessary
+      // because this block will be shared between
+      // things that don't track allocations (term
+      // vectors) and things that do (freq/prox
+      // postings).
+      numBytesAlloc += INT_BLOCK_SIZE*INT_NUM_BYTE;
+      b = new int[INT_BLOCK_SIZE];
+    } else
+      b = (int[]) freeIntBlocks.remove(size-1);
+    if (trackAllocations)
+      numBytesUsed += INT_BLOCK_SIZE*INT_NUM_BYTE;
+    assert numBytesUsed <= numBytesAlloc;
+    return b;
+  }
+
+  synchronized void bytesAllocated(long numBytes) {
+    numBytesAlloc += numBytes;
+    assert numBytesUsed <= numBytesAlloc;
+  }
+
+  synchronized void bytesUsed(long numBytes) {
+    numBytesUsed += numBytes;
+    assert numBytesUsed <= numBytesAlloc;
+  }
+
+  /* Return int[]s to the pool */
+  synchronized void recycleIntBlocks(int[][] blocks, int start, int end) {
+    for(int i=start;i<end;i++)
+      freeIntBlocks.add(blocks[i]);
+  }
+
   ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator();
 
   /* Initial chunk size of the shared char[] blocks used to
      store term text */
   final static int CHAR_BLOCK_SHIFT = 14;
-  final static int CHAR_BLOCK_SIZE = (int) Math.pow(2.0, CHAR_BLOCK_SHIFT);
+  final static int CHAR_BLOCK_SIZE = (int) (1 << CHAR_BLOCK_SHIFT);
   final static int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1;
 
   final static int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE-1;
@@ -1522,16 +1230,19 @@
     final char[] c;
     if (0 == size) {
       numBytesAlloc += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
-      balanceRAM();
       c = new char[CHAR_BLOCK_SIZE];
     } else
       c = (char[]) freeCharBlocks.remove(size-1);
+    // We always track allocations of char blocks, for now,
+    // because nothing that skips allocation tracking
+    // (currently only term vectors) uses its own char
+    // blocks.
     numBytesUsed += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
     assert numBytesUsed <= numBytesAlloc;
     return c;
   }
 
-  /* Return a char[] to the pool */
+  /* Return char[]s to the pool */
   synchronized void recycleCharBlocks(char[][] blocks, int numBlocks) {
     for(int i=0;i<numBlocks;i++)
       freeCharBlocks.add(blocks[i]);
@@ -1552,76 +1263,70 @@
    * the other two.  This method just frees allocations from
    * the pools once we are over-budget, which balances the
    * pools to match the current docs. */
-  synchronized void balanceRAM() {
-
-    if (ramBufferSize == IndexWriter.DISABLE_AUTO_FLUSH || bufferIsFull)
-      return;
+  void balanceRAM() {
 
-    // We free our allocations if we've allocated 5% over
-    // our allowed RAM buffer
-    final long freeTrigger = (long) (1.05 * ramBufferSize);
-    final long freeLevel = (long) (0.95 * ramBufferSize);
-    
     // We flush when we've used our target usage
     final long flushTrigger = (long) ramBufferSize;
 
     if (numBytesAlloc > freeTrigger) {
+
       if (infoStream != null)
         message("  RAM: now balance allocations: usedMB=" + toMB(numBytesUsed) +
                 " vs trigger=" + toMB(flushTrigger) +
                 " allocMB=" + toMB(numBytesAlloc) +
                 " vs trigger=" + toMB(freeTrigger) +
-                " postingsFree=" + toMB(postingsFreeCount*POSTING_NUM_BYTE) +
                 " byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE) +
                 " charBlockFree=" + toMB(freeCharBlocks.size()*CHAR_BLOCK_SIZE*CHAR_NUM_BYTE));
 
-      // When we've crossed 100% of our target Postings
-      // RAM usage, try to free up until we're back down
-      // to 95%
       final long startBytesAlloc = numBytesAlloc;
 
-      final int postingsFreeChunk = (int) (BYTE_BLOCK_SIZE / POSTING_NUM_BYTE);
-
       int iter = 0;
 
-      // We free equally from each pool in 64 KB
+      // We free equally from each pool in 32 KB
       // chunks until we are below our threshold
       // (freeLevel)
 
+      boolean any = true;
+
       while(numBytesAlloc > freeLevel) {
-        if (0 == byteBlockAllocator.freeByteBlocks.size() && 0 == freeCharBlocks.size() && 0 == postingsFreeCount) {
-          // Nothing else to free -- must flush now.
-          bufferIsFull = true;
-          if (infoStream != null)
-            message("    nothing to free; now set bufferIsFull");
-          break;
-        }
+      
+        synchronized(this) {
+          if (0 == byteBlockAllocator.freeByteBlocks.size() && 0 == freeCharBlocks.size() && 0 == freeIntBlocks.size() && !any) {
+            // Nothing else to free -- must flush now.
+            bufferIsFull = numBytesUsed > flushTrigger;
+            if (infoStream != null) {
+              if (numBytesUsed > flushTrigger)
+                message("    nothing to free; now set bufferIsFull");
+              else
+                message("    nothing to free");
+            }
+            assert numBytesUsed <= numBytesAlloc;
+            break;
+          }
 
-        if ((0 == iter % 3) && byteBlockAllocator.freeByteBlocks.size() > 0) {
-          byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1);
-          numBytesAlloc -= BYTE_BLOCK_SIZE;
-        }
+          if ((0 == iter % 4) && byteBlockAllocator.freeByteBlocks.size() > 0) {
+            byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1);
+            numBytesAlloc -= BYTE_BLOCK_SIZE;
+          }
 
-        if ((1 == iter % 3) && freeCharBlocks.size() > 0) {
-          freeCharBlocks.remove(freeCharBlocks.size()-1);
-          numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
-        }
+          if ((1 == iter % 4) && freeCharBlocks.size() > 0) {
+            freeCharBlocks.remove(freeCharBlocks.size()-1);
+            numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
+          }
 
-        if ((2 == iter % 3) && postingsFreeCount > 0) {
-          final int numToFree;
-          if (postingsFreeCount >= postingsFreeChunk)
-            numToFree = postingsFreeChunk;
-          else
-            numToFree = postingsFreeCount;
-          Arrays.fill(postingsFreeList, postingsFreeCount-numToFree, postingsFreeCount, null);
-          postingsFreeCount -= numToFree;
-          postingsAllocCount -= numToFree;
-          numBytesAlloc -= numToFree * POSTING_NUM_BYTE;
+          if ((2 == iter % 4) && freeIntBlocks.size() > 0) {
+            freeIntBlocks.remove(freeIntBlocks.size()-1);
+            numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE;
+          }
         }
 
+        if ((3 == iter % 4) && any)
+          // Ask consumer to free any recycled state
+          any = consumer.freeRAM();
+
         iter++;
       }
-      
+
       if (infoStream != null)
         message("    after free: freedMB=" + nf.format((startBytesAlloc-numBytesAlloc)/1024./1024.) + " usedMB=" + nf.format(numBytesUsed/1024./1024.) + " allocMB=" + nf.format(numBytesAlloc/1024./1024.));
       
@@ -1631,14 +1336,132 @@
       // using, go ahead and flush.  This prevents
       // over-allocating and then freeing, with every
       // flush.
-      if (numBytesUsed > flushTrigger) {
-        if (infoStream != null)
-          message("  RAM: now flush @ usedMB=" + nf.format(numBytesUsed/1024./1024.) +
-                  " allocMB=" + nf.format(numBytesAlloc/1024./1024.) +
-                  " triggerMB=" + nf.format(flushTrigger/1024./1024.));
+      synchronized(this) {
 
-        bufferIsFull = true;
+        if (numBytesUsed > flushTrigger) {
+          if (infoStream != null)
+            message("  RAM: now flush @ usedMB=" + nf.format(numBytesUsed/1024./1024.) +
+                    " allocMB=" + nf.format(numBytesAlloc/1024./1024.) +
+                    " triggerMB=" + nf.format(flushTrigger/1024./1024.));
+
+          bufferIsFull = true;
+        }
       }
     }
   }
+
+  final WaitQueue waitQueue = new WaitQueue();
+
+  private class WaitQueue {
+    DocWriter[] waiting;
+    int nextWriteDocID;
+    int nextWriteLoc;
+    int numWaiting;
+    long waitingBytes;
+
+    public WaitQueue() {
+      waiting = new DocWriter[10];
+    }
+
+    synchronized void reset() {
+      // NOTE: nextWriteLoc doesn't need to be reset
+      assert numWaiting == 0;
+      assert waitingBytes == 0;
+      nextWriteDocID = 0;
+    }
+
+    synchronized boolean doResume() {
+      return waitingBytes <= waitQueueResumeBytes;
+    }
+
+    synchronized boolean doPause() {
+      return waitingBytes > waitQueuePauseBytes;
+    }
+
+    synchronized void abort() {
+      int count = 0;
+      for(int i=0;i<waiting.length;i++) {
+        final DocWriter doc = waiting[i];
+        if (doc != null) {
+          doc.abort();
+          waiting[i] = null;
+          count++;
+        }
+      }
+      waitingBytes = 0;
+      assert count == numWaiting;
+      numWaiting = 0;
+    }
+
+    private void writeDocument(DocWriter doc) throws IOException {
+      assert doc == skipDocWriter || nextWriteDocID == doc.docID;
+      boolean success = false;
+      try {
+        doc.finish();
+        nextWriteDocID++;
+        numDocsInStore++;
+        nextWriteLoc++;
+        assert nextWriteLoc <= waiting.length;
+        if (nextWriteLoc == waiting.length)
+          nextWriteLoc = 0;
+        success = true;
+      } finally {
+        if (!success)
+          setAborting();
+      }
+    }
+
+    synchronized public boolean add(DocWriter doc) throws IOException {
+
+      assert doc.docID >= nextWriteDocID;
+
+      if (doc.docID == nextWriteDocID) {
+        writeDocument(doc);
+        while(true) {
+          doc = waiting[nextWriteLoc];
+          if (doc != null) {
+            numWaiting--;
+            waiting[nextWriteLoc] = null;
+            waitingBytes -= doc.sizeInBytes();
+            writeDocument(doc);
+          } else
+            break;
+        }
+      } else {
+
+        // I finished before documents that were added
+        // before me.  This can easily happen when I am a
+        // small doc and the docs before me were large, or,
+        // just due to luck in the thread scheduling.  Just
+        // add myself to the queue and when that large doc
+        // finishes, it will flush me:
+        int gap = doc.docID - nextWriteDocID;
+        if (gap >= waiting.length) {
+          // Grow queue
+          DocWriter[] newArray = new DocWriter[ArrayUtil.getNextSize(gap)];
+          assert nextWriteLoc >= 0;
+          System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc);
+          System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc);
+          nextWriteLoc = 0;
+          waiting = newArray;
+          gap = doc.docID - nextWriteDocID;
+        }
+
+        int loc = nextWriteLoc + gap;
+        if (loc >= waiting.length)
+          loc -= waiting.length;
+
+        // We should only wrap one time
+        assert loc < waiting.length;
+
+        // Nobody should be in my spot!
+        assert waiting[loc] == null;
+        waiting[loc] = doc;
+        numWaiting++;
+        waitingBytes += doc.sizeInBytes();
+      }
+      
+      return doPause();
+    }
+  }
 }



Mime
View raw message