lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From busc...@apache.org
Subject svn commit: r966168 [1/4] - in /lucene/dev: branches/realtime_search/lucene/src/java/org/apache/lucene/index/ branches/realtime_search/lucene/src/java/org/apache/lucene/util/ branches/realtime_search/lucene/src/test/org/apache/lucene/index/ trunk/lucen...
Date Wed, 21 Jul 2010 10:27:22 GMT
Author: buschmi
Date: Wed Jul 21 10:27:20 2010
New Revision: 966168

URL: http://svn.apache.org/viewvc?rev=966168&view=rev
Log:
LUCENE-2324: Committing second version of the patch to the real-time branch.  It's not done yet, but easier to track progress using the branch.

Added:
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/util/ThreadSafeCloneableSortedMap.java
Removed:
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumerPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxFieldMergeState.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocConsumerPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumerPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/NormsWriterPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriterPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashConsumerPerThread.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermsHashPerThread.java
Modified:
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IntBlockPool.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/NormsWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ParallelPostingsArray.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/SegmentInfo.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashConsumer.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestByteSlices.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestNRTReaderWithThreads.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestThreadedOptimize.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java

Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java?rev=966168&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java Wed Jul 21 10:27:20 2010
@@ -0,0 +1,70 @@
+package org.apache.lucene.index;
+
+import java.util.TreeMap;
+
+import org.apache.lucene.search.Query;
+import org.apache.lucene.util.ThreadSafeCloneableSortedMap;
+
+public class BufferedDeletesInRAM {
+  static class Delete {
+    int flushCount;
+
+    public Delete(int flushCount) {
+      this.flushCount = flushCount;
+    }
+  }
+
+  final static class DeleteTerm extends Delete {
+    final Term term;
+
+    public DeleteTerm(Term term, int flushCount) {
+      super(flushCount);
+      this.term = term;
+    }
+  }
+
+  final static class DeleteTerms extends Delete {
+    final Term[] terms;
+
+    public DeleteTerms(Term[] terms, int flushCount) {
+      super(flushCount);
+      this.terms = terms;
+    }
+  }
+  
+  final static class DeleteQuery extends Delete {
+    final Query query;
+
+    public DeleteQuery(Query query, int flushCount) {
+      super(flushCount);
+      this.query = query;
+    }
+  }
+
+  final ThreadSafeCloneableSortedMap<Long, Delete> deletes = ThreadSafeCloneableSortedMap
+      .getThreadSafeSortedMap(new TreeMap<Long, Delete>());
+
+  final void addDeleteTerm(Term term, long sequenceID, int numThreadStates) {
+    deletes.put(sequenceID, new DeleteTerm(term, numThreadStates));
+  }
+
+  final void addDeleteTerms(Term[] terms, long sequenceID, int numThreadStates) {
+    deletes.put(sequenceID, new DeleteTerms(terms, numThreadStates));
+  }
+
+  final void addDeleteQuery(Query query, long sequenceID, int numThreadStates) {
+    deletes.put(sequenceID, new DeleteQuery(query, numThreadStates));
+  }
+
+  boolean hasDeletes() {
+    return !deletes.isEmpty();
+  }
+
+  void clear() {
+    deletes.clear();
+  }
+
+  int getNumDeletes() {
+    return this.deletes.size();
+  }
+}

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java Wed Jul 21 10:27:20 2010
@@ -50,10 +50,10 @@ final class ByteBlockPool {
   public byte[][] buffers = new byte[10][];
 
   int bufferUpto = -1;                        // Which buffer we are upto
-  public int byteUpto = DocumentsWriter.BYTE_BLOCK_SIZE;             // Where we are in head buffer
+  public int byteUpto = DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;             // Where we are in head buffer
 
   public byte[] buffer;                              // Current head buffer
-  public int byteOffset = -DocumentsWriter.BYTE_BLOCK_SIZE;          // Current head offset
+  public int byteOffset = -DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;          // Current head offset
 
   private final Allocator allocator;
 
@@ -95,11 +95,11 @@ final class ByteBlockPool {
     bufferUpto++;
 
     byteUpto = 0;
-    byteOffset += DocumentsWriter.BYTE_BLOCK_SIZE;
+    byteOffset += DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
   }
 
   public int newSlice(final int size) {
-    if (byteUpto > DocumentsWriter.BYTE_BLOCK_SIZE-size)
+    if (byteUpto > DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE-size)
       nextBuffer();
     final int upto = byteUpto;
     byteUpto += size;
@@ -123,7 +123,7 @@ final class ByteBlockPool {
     final int newSize = levelSizeArray[newLevel];
 
     // Maybe allocate another block
-    if (byteUpto > DocumentsWriter.BYTE_BLOCK_SIZE-newSize)
+    if (byteUpto > DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE-newSize)
       nextBuffer();
 
     final int newUpto = byteUpto;
@@ -151,8 +151,8 @@ final class ByteBlockPool {
   // Fill in a BytesRef from term's length & bytes encoded in
   // byte block
   final BytesRef setBytesRef(BytesRef term, int textStart) {
-    final byte[] bytes = term.bytes = buffers[textStart >> DocumentsWriter.BYTE_BLOCK_SHIFT];
-    int pos = textStart & DocumentsWriter.BYTE_BLOCK_MASK;
+    final byte[] bytes = term.bytes = buffers[textStart >> DocumentsWriterRAMAllocator.BYTE_BLOCK_SHIFT];
+    int pos = textStart & DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
     if ((bytes[pos] & 0x80) == 0) {
       // length is 1 byte
       term.length = bytes[pos];

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java Wed Jul 21 10:27:20 2010
@@ -48,16 +48,16 @@ final class ByteSliceReader extends Data
     this.endIndex = endIndex;
 
     level = 0;
-    bufferUpto = startIndex / DocumentsWriter.BYTE_BLOCK_SIZE;
-    bufferOffset = bufferUpto * DocumentsWriter.BYTE_BLOCK_SIZE;
+    bufferUpto = startIndex / DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
+    bufferOffset = bufferUpto * DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
     buffer = pool.buffers[bufferUpto];
-    upto = startIndex & DocumentsWriter.BYTE_BLOCK_MASK;
+    upto = startIndex & DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
 
     final int firstSize = ByteBlockPool.levelSizeArray[0];
 
     if (startIndex+firstSize >= endIndex) {
       // There is only this one slice to read
-      limit = endIndex & DocumentsWriter.BYTE_BLOCK_MASK;
+      limit = endIndex & DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
     } else
       limit = upto+firstSize-4;
   }
@@ -102,11 +102,11 @@ final class ByteSliceReader extends Data
     level = ByteBlockPool.nextLevelArray[level];
     final int newSize = ByteBlockPool.levelSizeArray[level];
 
-    bufferUpto = nextIndex / DocumentsWriter.BYTE_BLOCK_SIZE;
-    bufferOffset = bufferUpto * DocumentsWriter.BYTE_BLOCK_SIZE;
+    bufferUpto = nextIndex / DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
+    bufferOffset = bufferUpto * DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
 
     buffer = pool.buffers[bufferUpto];
-    upto = nextIndex & DocumentsWriter.BYTE_BLOCK_MASK;
+    upto = nextIndex & DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
 
     if (nextIndex + newSize >= endIndex) {
       // We are advancing to the final slice

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java Wed Jul 21 10:27:20 2010
@@ -42,9 +42,9 @@ final class ByteSliceWriter extends Data
    * Set up the writer to write at address.
    */
   public void init(int address) {
-    slice = pool.buffers[address >> DocumentsWriter.BYTE_BLOCK_SHIFT];
+    slice = pool.buffers[address >> DocumentsWriterRAMAllocator.BYTE_BLOCK_SHIFT];
     assert slice != null;
-    upto = address & DocumentsWriter.BYTE_BLOCK_MASK;
+    upto = address & DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
     offset0 = address;
     assert upto < slice.length;
   }
@@ -80,6 +80,6 @@ final class ByteSliceWriter extends Data
   }
 
   public int getAddress() {
-    return upto + (offset0 & DocumentsWriter.BYTE_BLOCK_NOT_MASK);
+    return upto + (offset0 & DocumentsWriterRAMAllocator.BYTE_BLOCK_NOT_MASK);
   }
 }
\ No newline at end of file

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java Wed Jul 21 10:27:20 2010
@@ -18,11 +18,10 @@ package org.apache.lucene.index;
  */
 
 import java.io.IOException;
-import java.util.Collection;
 
 abstract class DocConsumer {
-  abstract DocConsumerPerThread addThread(DocumentsWriterThreadState perThread) throws IOException;
-  abstract void flush(final Collection<DocConsumerPerThread> threads, final SegmentWriteState state) throws IOException;
+  abstract DocumentsWriterPerThread.DocWriter processDocument() throws IOException;
+  abstract void flush(final SegmentWriteState state) throws IOException;
   abstract void closeDocStore(final SegmentWriteState state) throws IOException;
   abstract void abort();
   abstract boolean freeRAM();

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java Wed Jul 21 10:27:20 2010
@@ -18,7 +18,6 @@ package org.apache.lucene.index;
  */
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Map;
 
 abstract class DocFieldConsumer {
@@ -27,7 +26,7 @@ abstract class DocFieldConsumer {
 
   /** Called when DocumentsWriter decides to create a new
    *  segment */
-  abstract void flush(Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException;
+  abstract void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException;
 
   /** Called when DocumentsWriter decides to close the doc
    *  stores */
@@ -36,14 +35,17 @@ abstract class DocFieldConsumer {
   /** Called when an aborting exception is hit */
   abstract void abort();
 
-  /** Add a new thread */
-  abstract DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) throws IOException;
-
   /** Called when DocumentsWriter is using too much RAM.
    *  The consumer should free RAM, if possible, returning
    *  true if any RAM was in fact freed. */
   abstract boolean freeRAM();
+  
+  abstract void startDocument() throws IOException;
 
+  abstract DocFieldConsumerPerField addField(FieldInfo fi);
+  
+  abstract DocumentsWriterPerThread.DocWriter finishDocument() throws IOException;
+  
   void setFieldInfos(FieldInfos fieldInfos) {
     this.fieldInfos = fieldInfos;
   }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java Wed Jul 21 10:27:20 2010
@@ -24,4 +24,5 @@ abstract class DocFieldConsumerPerField 
   /** Processes all occurrences of a single field */
   abstract void processFields(Fieldable[] fields, int count) throws IOException;
   abstract void abort();
+  abstract FieldInfo getFieldInfo();
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java Wed Jul 21 10:27:20 2010
@@ -17,12 +17,9 @@ package org.apache.lucene.index;
  * limitations under the License.
  */
 
+import java.io.IOException;
 import java.util.HashMap;
-import java.util.Collection;
-import java.util.Iterator;
 import java.util.Map;
-import java.util.HashSet;
-import java.io.IOException;
 
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.RamUsageEstimator;
@@ -33,10 +30,12 @@ import org.apache.lucene.util.RamUsageEs
 final class DocFieldConsumers extends DocFieldConsumer {
   final DocFieldConsumer one;
   final DocFieldConsumer two;
+  final DocumentsWriterPerThread.DocState docState;
 
-  public DocFieldConsumers(DocFieldConsumer one, DocFieldConsumer two) {
+  public DocFieldConsumers(DocFieldProcessor processor, DocFieldConsumer one, DocFieldConsumer two) {
     this.one = one;
     this.two = two;
+    this.docState = processor.docState;
   }
 
   @Override
@@ -47,33 +46,19 @@ final class DocFieldConsumers extends Do
   }
 
   @Override
-  public void flush(Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException {
-
-    Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> oneThreadsAndFields = new HashMap<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>();
-    Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> twoThreadsAndFields = new HashMap<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>();
-
-    for (Map.Entry<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> entry : threadsAndFields.entrySet()) {
+  public void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException {
 
-      final DocFieldConsumersPerThread perThread = (DocFieldConsumersPerThread) entry.getKey();
+    Map<FieldInfo, DocFieldConsumerPerField> oneFieldsToFlush = new HashMap<FieldInfo, DocFieldConsumerPerField>();
+    Map<FieldInfo, DocFieldConsumerPerField> twoFieldsToFlush = new HashMap<FieldInfo, DocFieldConsumerPerField>();
 
-      final Collection<DocFieldConsumerPerField> fields = entry.getValue();
-
-      Iterator<DocFieldConsumerPerField> fieldsIt = fields.iterator();
-      Collection<DocFieldConsumerPerField> oneFields = new HashSet<DocFieldConsumerPerField>();
-      Collection<DocFieldConsumerPerField> twoFields = new HashSet<DocFieldConsumerPerField>();
-      while(fieldsIt.hasNext()) {
-        DocFieldConsumersPerField perField = (DocFieldConsumersPerField) fieldsIt.next();
-        oneFields.add(perField.one);
-        twoFields.add(perField.two);
-      }
-
-      oneThreadsAndFields.put(perThread.one, oneFields);
-      twoThreadsAndFields.put(perThread.two, twoFields);
+    for (Map.Entry<FieldInfo, DocFieldConsumerPerField> fieldToFlush : fieldsToFlush.entrySet()) {
+      DocFieldConsumersPerField perField = (DocFieldConsumersPerField) fieldToFlush.getValue();
+      oneFieldsToFlush.put(fieldToFlush.getKey(), perField.one);
+      twoFieldsToFlush.put(fieldToFlush.getKey(), perField.two);
     }
-    
 
-    one.flush(oneThreadsAndFields, state);
-    two.flush(twoThreadsAndFields, state);
+    one.flush(oneFieldsToFlush, state);
+    two.flush(twoFieldsToFlush, state);
   }
 
   @Override
@@ -101,16 +86,11 @@ final class DocFieldConsumers extends Do
     return any;
   }
 
-  @Override
-  public DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) throws IOException {
-    return new DocFieldConsumersPerThread(docFieldProcessorPerThread, this, one.addThread(docFieldProcessorPerThread), two.addThread(docFieldProcessorPerThread));
-  }
-
   PerDoc[] docFreeList = new PerDoc[1];
   int freeCount;
   int allocCount;
 
-  synchronized PerDoc getPerDoc() {
+  PerDoc getPerDoc() {
     if (freeCount == 0) {
       allocCount++;
       if (allocCount > docFreeList.length) {
@@ -125,15 +105,15 @@ final class DocFieldConsumers extends Do
       return docFreeList[--freeCount];
   }
 
-  synchronized void freePerDoc(PerDoc perDoc) {
+  void freePerDoc(PerDoc perDoc) {
     assert freeCount < docFreeList.length;
     docFreeList[freeCount++] = perDoc;
   }
 
-  class PerDoc extends DocumentsWriter.DocWriter {
+  class PerDoc extends DocumentsWriterPerThread.DocWriter {
 
-    DocumentsWriter.DocWriter writerOne;
-    DocumentsWriter.DocWriter writerTwo;
+    DocumentsWriterPerThread.DocWriter writerOne;
+    DocumentsWriterPerThread.DocWriter writerTwo;
 
     @Override
     public long sizeInBytes() {
@@ -166,4 +146,35 @@ final class DocFieldConsumers extends Do
       }
     }
   }
+  
+  @Override
+  public DocumentsWriterPerThread.DocWriter finishDocument() throws IOException {
+    final DocumentsWriterPerThread.DocWriter oneDoc = one.finishDocument();
+    final DocumentsWriterPerThread.DocWriter twoDoc = two.finishDocument();
+    if (oneDoc == null)
+      return twoDoc;
+    else if (twoDoc == null)
+      return oneDoc;
+    else {
+      DocFieldConsumers.PerDoc both = getPerDoc();
+      both.docID = docState.docID;
+      assert oneDoc.docID == docState.docID;
+      assert twoDoc.docID == docState.docID;
+      both.writerOne = oneDoc;
+      both.writerTwo = twoDoc;
+      return both;
+    }
+  }
+  
+  @Override
+  public void startDocument() throws IOException {
+    one.startDocument();
+    two.startDocument();
+  }
+  
+  @Override
+  public DocFieldConsumerPerField addField(FieldInfo fi) {
+    return new DocFieldConsumersPerField(this, fi, one.addField(fi), two.addField(fi));
+  }
+
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java Wed Jul 21 10:27:20 2010
@@ -24,12 +24,14 @@ final class DocFieldConsumersPerField ex
 
   final DocFieldConsumerPerField one;
   final DocFieldConsumerPerField two;
-  final DocFieldConsumersPerThread perThread;
+  final DocFieldConsumers parent;
+  final FieldInfo fieldInfo;
 
-  public DocFieldConsumersPerField(DocFieldConsumersPerThread perThread, DocFieldConsumerPerField one, DocFieldConsumerPerField two) {
-    this.perThread = perThread;
+  public DocFieldConsumersPerField(DocFieldConsumers parent, FieldInfo fi, DocFieldConsumerPerField one, DocFieldConsumerPerField two) {
+    this.parent = parent;
     this.one = one;
     this.two = two;
+    this.fieldInfo = fi;
   }
 
   @Override
@@ -46,4 +48,9 @@ final class DocFieldConsumersPerField ex
       two.abort();
     }
   }
+
+  @Override
+  FieldInfo getFieldInfo() {
+    return fieldInfo;
+  }
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java Wed Jul 21 10:27:20 2010
@@ -19,8 +19,15 @@ package org.apache.lucene.index;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Map;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.RamUsageEstimator;
 
 
 /**
@@ -33,13 +40,27 @@ import java.util.HashMap;
 
 final class DocFieldProcessor extends DocConsumer {
 
-  final DocumentsWriter docWriter;
   final FieldInfos fieldInfos = new FieldInfos();
   final DocFieldConsumer consumer;
   final StoredFieldsWriter fieldsWriter;
 
-  public DocFieldProcessor(DocumentsWriter docWriter, DocFieldConsumer consumer) {
-    this.docWriter = docWriter;
+  // Holds all fields seen in current doc
+  DocFieldProcessorPerField[] fields = new DocFieldProcessorPerField[1];
+  int fieldCount;
+
+  // Hash table for all fields ever seen
+  DocFieldProcessorPerField[] fieldHash = new DocFieldProcessorPerField[2];
+  int hashMask = 1;
+  int totalFieldCount;
+
+  
+  float docBoost;
+  int fieldGen;
+  final DocumentsWriterPerThread.DocState docState;
+
+
+  public DocFieldProcessor(DocumentsWriterPerThread docWriter, DocFieldConsumer consumer) {
+    this.docState = docWriter.docState;
     this.consumer = consumer;
     consumer.setFieldInfos(fieldInfos);
     fieldsWriter = new StoredFieldsWriter(docWriter, fieldInfos);
@@ -52,16 +73,17 @@ final class DocFieldProcessor extends Do
   }
 
   @Override
-  public void flush(Collection<DocConsumerPerThread> threads, SegmentWriteState state) throws IOException {
+  public void flush(SegmentWriteState state) throws IOException {
 
-    Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>> childThreadsAndFields = new HashMap<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>>();
-    for ( DocConsumerPerThread thread : threads) {
-      DocFieldProcessorPerThread perThread = (DocFieldProcessorPerThread) thread;
-      childThreadsAndFields.put(perThread.consumer, perThread.fields());
-      perThread.trimFields(state);
+    Map<FieldInfo, DocFieldConsumerPerField> childFields = new HashMap<FieldInfo, DocFieldConsumerPerField>();
+    Collection<DocFieldConsumerPerField> fields = fields();
+    for (DocFieldConsumerPerField f : fields) {
+      childFields.put(f.getFieldInfo(), f);
     }
+    trimFields(state);
+
     fieldsWriter.flush(state);
-    consumer.flush(childThreadsAndFields, state);
+    consumer.flush(childFields, state);
 
     // Important to save after asking consumer to flush so
     // consumer can alter the FieldInfo* if necessary.  EG,
@@ -74,6 +96,15 @@ final class DocFieldProcessor extends Do
 
   @Override
   public void abort() {
+    for(int i=0;i<fieldHash.length;i++) {
+      DocFieldProcessorPerField field = fieldHash[i];
+      while(field != null) {
+        final DocFieldProcessorPerField next = field.next;
+        field.abort();
+        field = next;
+      }
+    }
+
     fieldsWriter.abort();
     consumer.abort();
   }
@@ -82,9 +113,317 @@ final class DocFieldProcessor extends Do
   public boolean freeRAM() {
     return consumer.freeRAM();
   }
+  
+  public Collection<DocFieldConsumerPerField> fields() {
+    Collection<DocFieldConsumerPerField> fields = new HashSet<DocFieldConsumerPerField>();
+    for(int i=0;i<fieldHash.length;i++) {
+      DocFieldProcessorPerField field = fieldHash[i];
+      while(field != null) {
+        fields.add(field.consumer);
+        field = field.next;
+      }
+    }
+    assert fields.size() == totalFieldCount;
+    return fields;
+  }
+
+  /** If there are fields we've seen but did not see again
+   *  in the last run, then free them up. */
+
+  void trimFields(SegmentWriteState state) {
+
+    for(int i=0;i<fieldHash.length;i++) {
+      DocFieldProcessorPerField perField = fieldHash[i];
+      DocFieldProcessorPerField lastPerField = null;
+
+      while (perField != null) {
+
+        if (perField.lastGen == -1) {
+
+          // This field was not seen since the previous
+          // flush, so, free up its resources now
+
+          // Unhash
+          if (lastPerField == null)
+            fieldHash[i] = perField.next;
+          else
+            lastPerField.next = perField.next;
+
+          if (state.infoStream != null) {
+            state.infoStream.println("  purge field=" + perField.fieldInfo.name);
+          }
+
+          totalFieldCount--;
+
+        } else {
+          // Reset
+          perField.lastGen = -1;
+          lastPerField = perField;
+        }
+
+        perField = perField.next;
+      }
+    }
+  }
+
+  private void rehash() {
+    final int newHashSize = (fieldHash.length*2);
+    assert newHashSize > fieldHash.length;
+
+    final DocFieldProcessorPerField newHashArray[] = new DocFieldProcessorPerField[newHashSize];
+
+    // Rehash
+    int newHashMask = newHashSize-1;
+    for(int j=0;j<fieldHash.length;j++) {
+      DocFieldProcessorPerField fp0 = fieldHash[j];
+      while(fp0 != null) {
+        final int hashPos2 = fp0.fieldInfo.name.hashCode() & newHashMask;
+        DocFieldProcessorPerField nextFP0 = fp0.next;
+        fp0.next = newHashArray[hashPos2];
+        newHashArray[hashPos2] = fp0;
+        fp0 = nextFP0;
+      }
+    }
+
+    fieldHash = newHashArray;
+    hashMask = newHashMask;
+  }
 
   @Override
-  public DocConsumerPerThread addThread(DocumentsWriterThreadState threadState) throws IOException {
-    return new DocFieldProcessorPerThread(threadState, this);
+  public DocumentsWriterPerThread.DocWriter processDocument() throws IOException {
+
+    consumer.startDocument();
+    fieldsWriter.startDocument();
+
+    final Document doc = docState.doc;
+
+    fieldCount = 0;
+    
+    final int thisFieldGen = fieldGen++;
+
+    final List<Fieldable> docFields = doc.getFields();
+    final int numDocFields = docFields.size();
+
+    // Absorb any new fields first seen in this document.
+    // Also absorb any changes to fields we had already
+    // seen before (eg suddenly turning on norms or
+    // vectors, etc.):
+
+    for(int i=0;i<numDocFields;i++) {
+      Fieldable field = docFields.get(i);
+      final String fieldName = field.name();
+
+      // Make sure we have a PerField allocated
+      final int hashPos = fieldName.hashCode() & hashMask;
+      DocFieldProcessorPerField fp = fieldHash[hashPos];
+      while(fp != null && !fp.fieldInfo.name.equals(fieldName)) {
+        fp = fp.next;
+      }
+        
+      if (fp == null) {
+
+        // TODO FI: we need to genericize the "flags" that a
+        // field holds, and, how these flags are merged; it
+        // needs to be more "pluggable" such that if I want
+        // to have a new "thing" my Fields can do, I can
+        // easily add it
+        FieldInfo fi = fieldInfos.add(fieldName, field.isIndexed(), field.isTermVectorStored(),
+                                      field.isStorePositionWithTermVector(), field.isStoreOffsetWithTermVector(),
+                                      field.getOmitNorms(), false, field.getOmitTermFreqAndPositions());
+
+        fp = new DocFieldProcessorPerField(this, fi);
+        fp.next = fieldHash[hashPos];
+        fieldHash[hashPos] = fp;
+        totalFieldCount++;
+
+        if (totalFieldCount >= fieldHash.length/2)
+          rehash();
+      } else {
+        fp.fieldInfo.update(field.isIndexed(), field.isTermVectorStored(),
+                            field.isStorePositionWithTermVector(), field.isStoreOffsetWithTermVector(),
+                            field.getOmitNorms(), false, field.getOmitTermFreqAndPositions());
+      }
+      
+      if (thisFieldGen != fp.lastGen) {
+
+        // First time we're seeing this field for this doc
+        fp.fieldCount = 0;
+
+        if (fieldCount == fields.length) {
+          final int newSize = fields.length*2;
+          DocFieldProcessorPerField newArray[] = new DocFieldProcessorPerField[newSize];
+          System.arraycopy(fields, 0, newArray, 0, fieldCount);
+          fields = newArray;
+        }
+
+        fields[fieldCount++] = fp;
+        fp.lastGen = thisFieldGen;
+      }
+
+      if (fp.fieldCount == fp.fields.length) {
+        Fieldable[] newArray = new Fieldable[fp.fields.length*2];
+        System.arraycopy(fp.fields, 0, newArray, 0, fp.fieldCount);
+        fp.fields = newArray;
+      }
+
+      fp.fields[fp.fieldCount++] = field;
+      if (field.isStored()) {
+        fieldsWriter.addField(field, fp.fieldInfo);
+      }
+    }
+
+    // If we are writing vectors then we must visit
+    // fields in sorted order so they are written in
+    // sorted order.  TODO: we actually only need to
+    // sort the subset of fields that have vectors
+    // enabled; we could save [small amount of] CPU
+    // here.
+    quickSort(fields, 0, fieldCount-1);
+
+    for(int i=0;i<fieldCount;i++)
+      fields[i].consumer.processFields(fields[i].fields, fields[i].fieldCount);
+
+    if (docState.maxTermPrefix != null && docState.infoStream != null) {
+      docState.infoStream.println("WARNING: document contains at least one immense term (whose UTF8 encoding is longer than the max length " + DocumentsWriterRAMAllocator.MAX_TERM_LENGTH_UTF8 + "), all of which were skipped.  Please correct the analyzer to not produce such terms.  The prefix of the first immense term is: '" + docState.maxTermPrefix + "...'"); 
+      docState.maxTermPrefix = null;
+    }
+
+    final DocumentsWriterPerThread.DocWriter one = fieldsWriter.finishDocument();
+    final DocumentsWriterPerThread.DocWriter two = consumer.finishDocument();
+    if (one == null) {
+      return two;
+    } else if (two == null) {
+      return one;
+    } else {
+      PerDoc both = getPerDoc();
+      both.docID = docState.docID;
+      assert one.docID == docState.docID;
+      assert two.docID == docState.docID;
+      both.one = one;
+      both.two = two;
+      return both;
+    }
+  }
+
+  void quickSort(DocFieldProcessorPerField[] array, int lo, int hi) {
+    if (lo >= hi)
+      return;
+    else if (hi == 1+lo) {
+      if (array[lo].fieldInfo.name.compareTo(array[hi].fieldInfo.name) > 0) {
+        final DocFieldProcessorPerField tmp = array[lo];
+        array[lo] = array[hi];
+        array[hi] = tmp;
+      }
+      return;
+    }
+
+    int mid = (lo + hi) >>> 1;
+
+    if (array[lo].fieldInfo.name.compareTo(array[mid].fieldInfo.name) > 0) {
+      DocFieldProcessorPerField tmp = array[lo];
+      array[lo] = array[mid];
+      array[mid] = tmp;
+    }
+
+    if (array[mid].fieldInfo.name.compareTo(array[hi].fieldInfo.name) > 0) {
+      DocFieldProcessorPerField tmp = array[mid];
+      array[mid] = array[hi];
+      array[hi] = tmp;
+
+      if (array[lo].fieldInfo.name.compareTo(array[mid].fieldInfo.name) > 0) {
+        DocFieldProcessorPerField tmp2 = array[lo];
+        array[lo] = array[mid];
+        array[mid] = tmp2;
+      }
+    }
+
+    int left = lo + 1;
+    int right = hi - 1;
+
+    if (left >= right)
+      return;
+
+    DocFieldProcessorPerField partition = array[mid];
+
+    for (; ;) {
+      while (array[right].fieldInfo.name.compareTo(partition.fieldInfo.name) > 0)
+        --right;
+
+      while (left < right && array[left].fieldInfo.name.compareTo(partition.fieldInfo.name) <= 0)
+        ++left;
+
+      if (left < right) {
+        DocFieldProcessorPerField tmp = array[left];
+        array[left] = array[right];
+        array[right] = tmp;
+        --right;
+      } else {
+        break;
+      }
+    }
+
+    quickSort(array, lo, left);
+    quickSort(array, left + 1, hi);
+  }
+
+  PerDoc[] docFreeList = new PerDoc[1];
+  int freeCount;
+  int allocCount;
+
+  PerDoc getPerDoc() {
+    if (freeCount == 0) {
+      allocCount++;
+      if (allocCount > docFreeList.length) {
+        // Grow our free list up front to make sure we have
+        // enough space to recycle all outstanding PerDoc
+        // instances
+        assert allocCount == 1+docFreeList.length;
+        docFreeList = new PerDoc[ArrayUtil.oversize(allocCount, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
+      }
+      return new PerDoc();
+    } else
+      return docFreeList[--freeCount];
+  }
+
+  void freePerDoc(PerDoc perDoc) {
+    assert freeCount < docFreeList.length;
+    docFreeList[freeCount++] = perDoc;
+  }
+
+  class PerDoc extends DocumentsWriterPerThread.DocWriter {
+
+    DocumentsWriterPerThread.DocWriter one;
+    DocumentsWriterPerThread.DocWriter two;
+
+    @Override
+    public long sizeInBytes() {
+      return one.sizeInBytes() + two.sizeInBytes();
+    }
+
+    @Override
+    public void finish() throws IOException {
+      try {
+        try {
+          one.finish();
+        } finally {
+          two.finish();
+        }
+      } finally {
+        freePerDoc(this);
+      }
+    }
+
+    @Override
+    public void abort() {
+      try {
+        try {
+          one.abort();
+        } finally {
+          two.abort();
+        }
+      } finally {
+        freePerDoc(this);
+      }
+    }
   }
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java Wed Jul 21 10:27:20 2010
@@ -34,8 +34,8 @@ final class DocFieldProcessorPerField {
   int fieldCount;
   Fieldable[] fields = new Fieldable[1];
 
-  public DocFieldProcessorPerField(final DocFieldProcessorPerThread perThread, final FieldInfo fieldInfo) {
-    this.consumer = perThread.consumer.addField(fieldInfo);
+  public DocFieldProcessorPerField(final DocFieldProcessor docFieldProcessor, final FieldInfo fieldInfo) {
+    this.consumer = docFieldProcessor.consumer.addField(fieldInfo);
     this.fieldInfo = fieldInfo;
   }
 

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java Wed Jul 21 10:27:20 2010
@@ -18,12 +18,13 @@ package org.apache.lucene.index;
  */
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
-
 import java.util.Map;
 
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.lucene.analysis.tokenattributes.OffsetAttribute;
+import org.apache.lucene.util.AttributeSource;
+
 
 /** This is a DocFieldConsumer that inverts each field,
  *  separately, from a Document, and accepts a
@@ -34,7 +35,32 @@ final class DocInverter extends DocField
   final InvertedDocConsumer consumer;
   final InvertedDocEndConsumer endConsumer;
 
-  public DocInverter(InvertedDocConsumer consumer, InvertedDocEndConsumer endConsumer) {
+  final DocumentsWriterPerThread.DocState docState;
+
+  final FieldInvertState fieldState = new FieldInvertState();
+
+  final SingleTokenAttributeSource singleToken = new SingleTokenAttributeSource();
+  
+  static class SingleTokenAttributeSource extends AttributeSource {
+    final CharTermAttribute termAttribute;
+    final OffsetAttribute offsetAttribute;
+    
+    private SingleTokenAttributeSource() {
+      termAttribute = addAttribute(CharTermAttribute.class);
+      offsetAttribute = addAttribute(OffsetAttribute.class);
+    }
+    
+    public void reinit(String stringValue, int startOffset,  int endOffset) {
+      termAttribute.setEmpty().append(stringValue);
+      offsetAttribute.setOffset(startOffset, endOffset);
+    }
+  }
+  
+  // Used to read a string value for a field
+  final ReusableStringReader stringReader = new ReusableStringReader();
+
+  public DocInverter(DocumentsWriterPerThread.DocState docState, InvertedDocConsumer consumer, InvertedDocEndConsumer endConsumer) {
+    this.docState = docState;
     this.consumer = consumer;
     this.endConsumer = endConsumer;
   }
@@ -47,33 +73,37 @@ final class DocInverter extends DocField
   }
 
   @Override
-  void flush(Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException {
-
-    Map<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>> childThreadsAndFields = new HashMap<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>>();
-    Map<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>> endChildThreadsAndFields = new HashMap<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>>();
-
-    for (Map.Entry<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> entry : threadsAndFields.entrySet() ) {
+  void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException {
 
+    Map<FieldInfo, InvertedDocConsumerPerField> childFieldsToFlush = new HashMap<FieldInfo, InvertedDocConsumerPerField>();
+    Map<FieldInfo, InvertedDocEndConsumerPerField> endChildFieldsToFlush = new HashMap<FieldInfo, InvertedDocEndConsumerPerField>();
 
-      DocInverterPerThread perThread = (DocInverterPerThread) entry.getKey();
-
-      Collection<InvertedDocConsumerPerField> childFields = new HashSet<InvertedDocConsumerPerField>();
-      Collection<InvertedDocEndConsumerPerField> endChildFields = new HashSet<InvertedDocEndConsumerPerField>();
-      for (final DocFieldConsumerPerField field: entry.getValue() ) {  
-        DocInverterPerField perField = (DocInverterPerField) field;
-        childFields.add(perField.consumer);
-        endChildFields.add(perField.endConsumer);
-      }
-
-      childThreadsAndFields.put(perThread.consumer, childFields);
-      endChildThreadsAndFields.put(perThread.endConsumer, endChildFields);
+    for (Map.Entry<FieldInfo, DocFieldConsumerPerField> fieldToFlush : fieldsToFlush.entrySet()) {
+      DocInverterPerField perField = (DocInverterPerField) fieldToFlush.getValue();
+      childFieldsToFlush.put(fieldToFlush.getKey(), perField.consumer);
+      endChildFieldsToFlush.put(fieldToFlush.getKey(), perField.endConsumer);
     }
     
-    consumer.flush(childThreadsAndFields, state);
-    endConsumer.flush(endChildThreadsAndFields, state);
+    consumer.flush(childFieldsToFlush, state);
+    endConsumer.flush(endChildFieldsToFlush, state);
+  }
+  
+  @Override
+  public void startDocument() throws IOException {
+    consumer.startDocument();
+    endConsumer.startDocument();
   }
 
   @Override
+  public DocumentsWriterPerThread.DocWriter finishDocument() throws IOException {
+    // TODO: allow endConsumer.finishDocument to also return
+    // a DocWriter
+    endConsumer.finishDocument();
+    return consumer.finishDocument();
+  }
+
+
+  @Override
   public void closeDocStore(SegmentWriteState state) throws IOException {
     consumer.closeDocStore(state);
     endConsumer.closeDocStore(state);
@@ -81,17 +111,21 @@ final class DocInverter extends DocField
 
   @Override
   void abort() {
-    consumer.abort();
-    endConsumer.abort();
+    try {
+      consumer.abort();
+    } finally {
+      endConsumer.abort();
+    }
   }
 
   @Override
   public boolean freeRAM() {
     return consumer.freeRAM();
   }
-
+  
   @Override
-  public DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) {
-    return new DocInverterPerThread(docFieldProcessorPerThread, this);
+  public DocFieldConsumerPerField addField(FieldInfo fi) {
+    return new DocInverterPerField(this, fi);
   }
+
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java Wed Jul 21 10:27:20 2010
@@ -35,20 +35,20 @@ import org.apache.lucene.analysis.tokena
 
 final class DocInverterPerField extends DocFieldConsumerPerField {
 
-  final private DocInverterPerThread perThread;
-  final private FieldInfo fieldInfo;
+  final private DocInverter parent;
+  final FieldInfo fieldInfo;
   final InvertedDocConsumerPerField consumer;
   final InvertedDocEndConsumerPerField endConsumer;
-  final DocumentsWriter.DocState docState;
+  final DocumentsWriterPerThread.DocState docState;
   final FieldInvertState fieldState;
 
-  public DocInverterPerField(DocInverterPerThread perThread, FieldInfo fieldInfo) {
-    this.perThread = perThread;
+  public DocInverterPerField(DocInverter parent, FieldInfo fieldInfo) {
+    this.parent = parent;
     this.fieldInfo = fieldInfo;
-    docState = perThread.docState;
-    fieldState = perThread.fieldState;
-    this.consumer = perThread.consumer.addField(this, fieldInfo);
-    this.endConsumer = perThread.endConsumer.addField(this, fieldInfo);
+    docState = parent.docState;
+    fieldState = parent.fieldState;
+    this.consumer = parent.consumer.addField(this, fieldInfo);
+    this.endConsumer = parent.endConsumer.addField(this, fieldInfo);
   }
 
   @Override
@@ -84,8 +84,8 @@ final class DocInverterPerField extends 
         if (!field.isTokenized()) {		  // un-tokenized field
           String stringValue = field.stringValue();
           final int valueLength = stringValue.length();
-          perThread.singleToken.reinit(stringValue, 0, valueLength);
-          fieldState.attributeSource = perThread.singleToken;
+          parent.singleToken.reinit(stringValue, 0, valueLength);
+          fieldState.attributeSource = parent.singleToken;
           consumer.start(field);
 
           boolean success = false;
@@ -93,8 +93,9 @@ final class DocInverterPerField extends 
             consumer.add();
             success = true;
           } finally {
-            if (!success)
+            if (!success) {
               docState.docWriter.setAborting();
+            }
           }
           fieldState.offset += valueLength;
           fieldState.length++;
@@ -119,8 +120,8 @@ final class DocInverterPerField extends 
               if (stringValue == null) {
                 throw new IllegalArgumentException("field must have either TokenStream, String or Reader value");
               }
-              perThread.stringReader.init(stringValue);
-              reader = perThread.stringReader;
+              parent.stringReader.init(stringValue);
+              reader = parent.stringReader;
             }
           
             // Tokenize field and add to postingTable
@@ -173,8 +174,9 @@ final class DocInverterPerField extends 
                 consumer.add();
                 success = true;
               } finally {
-                if (!success)
+                if (!success) {
                   docState.docWriter.setAborting();
+                }
               }
               fieldState.position++;
               if (++fieldState.length >= maxFieldLength) {
@@ -208,4 +210,9 @@ final class DocInverterPerField extends 
     consumer.finish();
     endConsumer.finish();
   }
+
+  @Override
+  FieldInfo getFieldInfo() {
+    return this.fieldInfo;
+  }
 }

Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=966168&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Wed Jul 21 10:27:20 2010
@@ -0,0 +1,459 @@
+package org.apache.lucene.index;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.codecs.Codec;
+import org.apache.lucene.search.Similarity;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMFile;
+import org.apache.lucene.util.ArrayUtil;
+
+public class DocumentsWriterPerThread {
+  
+  /**
+   * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
+   * which returns the DocConsumer that the DocumentsWriter calls to process the
+   * documents. 
+   */
+  abstract static class IndexingChain {
+    abstract DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread);
+  }
+
+  
+  static final IndexingChain defaultIndexingChain = new IndexingChain() {
+
+    @Override
+    DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread) {
+      /*
+      This is the current indexing chain:
+
+      DocConsumer / DocConsumerPerThread
+        --> code: DocFieldProcessor / DocFieldProcessorPerThread
+          --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
+            --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
+              --> code: DocInverter / DocInverterPerThread / DocInverterPerField
+                --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
+                  --> code: TermsHash / TermsHashPerThread / TermsHashPerField
+                    --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
+                      --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
+                      --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
+                --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
+                  --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
+              --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
+    */
+
+    // Build up indexing chain:
+
+      final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriterPerThread);
+      final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
+
+      final InvertedDocConsumer  termsHash = new TermsHash(documentsWriterPerThread, freqProxWriter,
+                                                           new TermsHash(documentsWriterPerThread, termVectorsWriter, null));
+      final NormsWriter normsWriter = new NormsWriter();
+      final DocInverter docInverter = new DocInverter(documentsWriterPerThread.docState, termsHash, normsWriter);
+      return new DocFieldProcessor(documentsWriterPerThread, docInverter);
+    }
+  };
+  
+  static class DocState {
+    final DocumentsWriterPerThread docWriter;
+    Analyzer analyzer;
+    int maxFieldLength;
+    PrintStream infoStream;
+    Similarity similarity;
+    int docID;
+    Document doc;
+    String maxTermPrefix;
+
+    DocState(DocumentsWriterPerThread docWriter) {
+      this.docWriter = docWriter;
+    }
+    
+    // Only called by asserts
+    public boolean testPoint(String name) {
+      return docWriter.writer.testPoint(name);
+    }
+  }
+  
+  /** Called if we hit an exception at a bad time (when
+   *  updating the index files) and must discard all
+   *  currently buffered docs.  This resets our state,
+   *  discarding any docs added since last flush. */
+  void abort() throws IOException {
+    try {
+      if (infoStream != null) {
+        message("docWriter: now abort");
+      }
+      try {
+        consumer.abort();
+      } catch (Throwable t) {
+      }
+
+      docStoreSegment = null;
+      numDocsInStore = 0;
+      docStoreOffset = 0;
+
+      // Reset all postings data
+      doAfterFlush();
+
+    } finally {
+      aborting = false;
+      if (infoStream != null) {
+        message("docWriter: done abort");
+      }
+    }
+  }
+
+  
+  final DocumentsWriterRAMAllocator ramAllocator = new DocumentsWriterRAMAllocator();
+
+  final DocumentsWriter parent;
+  final IndexWriter writer;
+  
+  final Directory directory;
+  final DocState docState;
+  final DocConsumer consumer;
+  private DocFieldProcessor docFieldProcessor;
+  
+  String segment;                         // Current segment we are working on
+  private String docStoreSegment;         // Current doc-store segment we are writing
+  private int docStoreOffset;                     // Current starting doc-store offset of current segment
+  boolean aborting;               // True if an abort is pending
+  
+  private final PrintStream infoStream;
+  private int numDocsInRAM;
+  private int numDocsInStore;
+  private int flushedDocCount;
+  SegmentWriteState flushState;
+
+  long[] sequenceIDs = new long[8];
+  
+  final List<String> closedFiles = new ArrayList<String>();
+  
+  long numBytesUsed;
+  
+  public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, IndexingChain indexingChain) {
+    this.directory = directory;
+    this.parent = parent;
+    this.writer = parent.indexWriter;
+    this.infoStream = parent.indexWriter.getInfoStream();
+    this.docState = new DocState(this);
+    this.docState.similarity = parent.config.getSimilarity();
+    this.docState.maxFieldLength = parent.config.getMaxFieldLength();
+    
+    consumer = indexingChain.getChain(this);
+    if (consumer instanceof DocFieldProcessor) {
+      docFieldProcessor = (DocFieldProcessor) consumer;
+    }
+
+  }
+  
+  void setAborting() {
+    aborting = true;
+  }
+  
+  public void addDocument(Document doc, Analyzer analyzer) throws IOException {
+    docState.doc = doc;
+    docState.analyzer = analyzer;
+    docState.docID = numDocsInRAM;
+    initSegmentName(false);
+  
+    final DocWriter perDoc;
+    
+    boolean success = false;
+    try {
+      perDoc = consumer.processDocument();
+      
+      success = true;
+    } finally {
+      if (!success) {
+        if (!aborting) {
+          // mark document as deleted
+          commitDocument(-1);
+        }
+      }
+    }
+
+    success = false;
+    try {
+      if (perDoc != null) {
+        perDoc.finish();
+      }
+      
+      success = true;
+    } finally {
+      if (!success) {
+        setAborting();
+      }
+    }
+
+  }
+
+  public void commitDocument(long sequenceID) {
+    if (numDocsInRAM == sequenceIDs.length) {
+      sequenceIDs = ArrayUtil.grow(sequenceIDs);
+    }
+    
+    sequenceIDs[numDocsInRAM] = sequenceID;
+    numDocsInRAM++;
+    numDocsInStore++;
+  }
+  
+  int getNumDocsInRAM() {
+    return numDocsInRAM;
+  }
+  
+  long getMinSequenceID() {
+    if (numDocsInRAM == 0) {
+      return -1;
+    }
+    return sequenceIDs[0];
+  }
+  
+  /** Returns true if any of the fields in the current
+  *  buffered docs have omitTermFreqAndPositions==false */
+  boolean hasProx() {
+    return (docFieldProcessor != null) ? docFieldProcessor.fieldInfos.hasProx()
+                                      : true;
+  }
+  
+  Codec getCodec() {
+    return flushState.codec;
+  }
+  
+  void initSegmentName(boolean onlyDocStore) {
+    if (segment == null && (!onlyDocStore || docStoreSegment == null)) {
+      // this call is synchronized on IndexWriter.segmentInfos
+      segment = writer.newSegmentName();
+      assert numDocsInRAM == 0;
+    }
+    if (docStoreSegment == null) {
+      docStoreSegment = segment;
+      assert numDocsInStore == 0;
+    }
+  }
+
+  
+  private void initFlushState(boolean onlyDocStore) {
+    initSegmentName(onlyDocStore);
+    flushState = new SegmentWriteState(infoStream, directory, segment, docFieldProcessor.fieldInfos,
+                                       docStoreSegment, numDocsInRAM, numDocsInStore, writer.getConfig().getTermIndexInterval(),
+                                       writer.codecs);
+  }
+  
+  /** Reset after a flush */
+  private void doAfterFlush() throws IOException {
+    segment = null;
+    numDocsInRAM = 0;
+  }
+    
+  /** Flush all pending docs to a new segment */
+  SegmentInfo flush(boolean closeDocStore) throws IOException {
+    assert numDocsInRAM > 0;
+
+    initFlushState(closeDocStore);
+
+    docStoreOffset = numDocsInStore;
+
+    if (infoStream != null) {
+      message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM);
+    }
+    
+    boolean success = false;
+
+    try {
+
+      if (closeDocStore) {
+        assert flushState.docStoreSegmentName != null;
+        assert flushState.docStoreSegmentName.equals(flushState.segmentName);
+        closeDocStore();
+        flushState.numDocsInStore = 0;
+      }
+      
+      consumer.flush(flushState);
+
+      if (infoStream != null) {
+        SegmentInfo si = new SegmentInfo(flushState.segmentName,
+            flushState.numDocs,
+            directory, false,
+            docStoreOffset, flushState.docStoreSegmentName,
+            false,    
+            hasProx(),
+            getCodec());
+
+        final long newSegmentSize = si.sizeInBytes();
+        String message = "  ramUsed=" + ramAllocator.nf.format(((double) numBytesUsed)/1024./1024.) + " MB" +
+          " newFlushedSize=" + newSegmentSize +
+          " docs/MB=" + ramAllocator.nf.format(numDocsInRAM/(newSegmentSize/1024./1024.)) +
+          " new/old=" + ramAllocator.nf.format(100.0*newSegmentSize/numBytesUsed) + "%";
+        message(message);
+      }
+
+      flushedDocCount += flushState.numDocs;
+
+      long maxSequenceID = sequenceIDs[numDocsInRAM-1];
+      doAfterFlush();
+      
+      // Create new SegmentInfo, but do not add to our
+      // segmentInfos until deletes are flushed
+      // successfully.
+      SegmentInfo newSegment = new SegmentInfo(flushState.segmentName,
+                                   flushState.numDocs,
+                                   directory, false,
+                                   docStoreOffset, flushState.docStoreSegmentName,
+                                   false,    
+                                   hasProx(),
+                                   getCodec());
+
+      
+      newSegment.setMinSequenceID(sequenceIDs[0]);
+      newSegment.setMaxSequenceID(maxSequenceID);
+      
+      IndexWriter.setDiagnostics(newSegment, "flush");
+      success = true;
+
+      return newSegment;
+    } finally {
+      if (!success) {
+        setAborting();
+      }
+    }
+  }
+
+  /** Closes the current open doc stores an returns the doc
+   *  store segment name.  This returns null if there are *
+   *  no buffered documents. */
+  String closeDocStore() throws IOException {
+
+    // nocommit
+//    if (infoStream != null)
+//      message("closeDocStore: " + openFiles.size() + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore);
+    
+    boolean success = false;
+
+    try {
+      initFlushState(true);
+      closedFiles.clear();
+
+      consumer.closeDocStore(flushState);
+      // nocommit
+      //assert 0 == openFiles.size();
+
+      String s = docStoreSegment;
+      docStoreSegment = null;
+      docStoreOffset = 0;
+      numDocsInStore = 0;
+      success = true;
+      return s;
+    } finally {
+      if (!success) {
+        parent.abort();
+      }
+    }
+  }
+
+  
+  /** Get current segment name we are writing. */
+  String getSegment() {
+    return segment;
+  }
+  
+  /** Returns the current doc store segment we are writing
+   *  to. */
+  String getDocStoreSegment() {
+    return docStoreSegment;
+  }
+
+  /** Returns the doc offset into the shared doc store for
+   *  the current buffered docs. */
+  int getDocStoreOffset() {
+    return docStoreOffset;
+  }
+
+
+  @SuppressWarnings("unchecked")
+  List<String> closedFiles() {
+    return (List<String>) ((ArrayList<String>) closedFiles).clone();
+  }
+
+  void addOpenFile(String name) {
+    synchronized(parent.openFiles) {
+      assert !parent.openFiles.contains(name);
+      parent.openFiles.add(name);
+    }
+  }
+
+  void removeOpenFile(String name) {
+    synchronized(parent.openFiles) {
+      assert parent.openFiles.contains(name);
+      parent.openFiles.remove(name);
+    }
+    closedFiles.add(name);
+  }
+  
+  /** Consumer returns this on each doc.  This holds any
+   *  state that must be flushed synchronized "in docID
+   *  order".  We gather these and flush them in order. */
+  abstract static class DocWriter {
+    DocWriter next;
+    int docID;
+    abstract void finish() throws IOException;
+    abstract void abort();
+    abstract long sizeInBytes();
+
+    void setNext(DocWriter next) {
+      this.next = next;
+    }
+  }
+
+  /**
+   * Create and return a new DocWriterBuffer.
+   */
+  PerDocBuffer newPerDocBuffer() {
+    return new PerDocBuffer();
+  }
+
+  /**
+   * RAMFile buffer for DocWriters.
+   */
+  class PerDocBuffer extends RAMFile {
+    
+    /**
+     * Allocate bytes used from shared pool.
+     */
+    protected byte[] newBuffer(int size) {
+      assert size == DocumentsWriterRAMAllocator.PER_DOC_BLOCK_SIZE;
+      return ramAllocator.perDocAllocator.getByteBlock();
+    }
+    
+    /**
+     * Recycle the bytes used.
+     */
+    synchronized void recycle() {
+      if (buffers.size() > 0) {
+        setLength(0);
+        
+        // Recycle the blocks
+        ramAllocator.perDocAllocator.recycleByteBlocks(buffers);
+        buffers.clear();
+        sizeInBytes = 0;
+        
+        assert numBuffers() == 0;
+      }
+    }
+  }
+  
+  void bytesUsed(long numBytes) {
+    ramAllocator.bytesUsed(numBytes);
+  }
+  
+  void message(String message) {
+    if (infoStream != null)
+      writer.message("DW: " + message);
+  }
+}

Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java?rev=966168&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java Wed Jul 21 10:27:20 2010
@@ -0,0 +1,148 @@
+package org.apache.lucene.index;
+
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.lucene.util.Constants;
+
+class DocumentsWriterRAMAllocator {
+  final ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(BYTE_BLOCK_SIZE);
+  final ByteBlockAllocator perDocAllocator = new ByteBlockAllocator(PER_DOC_BLOCK_SIZE);
+
+  
+  class ByteBlockAllocator extends ByteBlockPool.Allocator {
+    final int blockSize;
+
+    ByteBlockAllocator(int blockSize) {
+      this.blockSize = blockSize;
+    }
+
+    ArrayList<byte[]> freeByteBlocks = new ArrayList<byte[]>();
+    
+    /* Allocate another byte[] from the shared pool */
+    @Override
+    byte[] getByteBlock() {
+      final int size = freeByteBlocks.size();
+      final byte[] b;
+      if (0 == size) {
+        b = new byte[blockSize];
+        // Always record a block allocated, even if
+        // trackAllocations is false.  This is necessary
+        // because this block will be shared between
+        // things that don't track allocations (term
+        // vectors) and things that do (freq/prox
+        // postings).
+        numBytesUsed += blockSize;
+      } else
+        b = freeByteBlocks.remove(size-1);
+      return b;
+    }
+
+    /* Return byte[]'s to the pool */
+    @Override
+    void recycleByteBlocks(byte[][] blocks, int start, int end) {
+      for(int i=start;i<end;i++) {
+        freeByteBlocks.add(blocks[i]);
+      }
+    }
+
+    @Override
+    void recycleByteBlocks(List<byte[]> blocks) {
+      final int size = blocks.size();
+      for(int i=0;i<size;i++) {
+        freeByteBlocks.add(blocks.get(i));
+      }
+    }
+  }
+
+  private ArrayList<int[]> freeIntBlocks = new ArrayList<int[]>();
+
+  /* Allocate another int[] from the shared pool */
+  int[] getIntBlock() {
+    final int size = freeIntBlocks.size();
+    final int[] b;
+    if (0 == size) {
+      b = new int[INT_BLOCK_SIZE];
+      // Always record a block allocated, even if
+      // trackAllocations is false.  This is necessary
+      // because this block will be shared between
+      // things that don't track allocations (term
+      // vectors) and things that do (freq/prox
+      // postings).
+      numBytesUsed += INT_BLOCK_SIZE*INT_NUM_BYTE;
+    } else
+      b = freeIntBlocks.remove(size-1);
+    return b;
+  }
+
+  void bytesUsed(long numBytes) {
+    numBytesUsed += numBytes;
+  }
+
+  /* Return int[]s to the pool */
+  void recycleIntBlocks(int[][] blocks, int start, int end) {
+    for(int i=start;i<end;i++)
+      freeIntBlocks.add(blocks[i]);
+  }
+
+  long getRAMUsed() {
+    return numBytesUsed;
+  }
+
+  long numBytesUsed;
+
+  NumberFormat nf = NumberFormat.getInstance();
+
+  final static int PER_DOC_BLOCK_SIZE = 1024;
+  
+  // Coarse estimates used to measure RAM usage of buffered deletes
+  final static int OBJECT_HEADER_BYTES = 8;
+  final static int POINTER_NUM_BYTE = Constants.JRE_IS_64BIT ? 8 : 4;
+  final static int INT_NUM_BYTE = 4;
+  final static int CHAR_NUM_BYTE = 2;
+
+  /* Rough logic: HashMap has an array[Entry] w/ varying
+     load factor (say 2 * POINTER).  Entry is object w/ Term
+     key, BufferedDeletes.Num val, int hash, Entry next
+     (OBJ_HEADER + 3*POINTER + INT).  Term is object w/
+     String field and String text (OBJ_HEADER + 2*POINTER).
+     We don't count Term's field since it's interned.
+     Term's text is String (OBJ_HEADER + 4*INT + POINTER +
+     OBJ_HEADER + string.length*CHAR).  BufferedDeletes.num is
+     OBJ_HEADER + INT. */
+ 
+  final static int BYTES_PER_DEL_TERM = 8*POINTER_NUM_BYTE + 5*OBJECT_HEADER_BYTES + 6*INT_NUM_BYTE;
+
+  /* Rough logic: del docIDs are List<Integer>.  Say list
+     allocates ~2X size (2*POINTER).  Integer is OBJ_HEADER
+     + int */
+  final static int BYTES_PER_DEL_DOCID = 2*POINTER_NUM_BYTE + OBJECT_HEADER_BYTES + INT_NUM_BYTE;
+
+  /* Rough logic: HashMap has an array[Entry] w/ varying
+     load factor (say 2 * POINTER).  Entry is object w/
+     Query key, Integer val, int hash, Entry next
+     (OBJ_HEADER + 3*POINTER + INT).  Query we often
+     undercount (say 24 bytes).  Integer is OBJ_HEADER + INT. */
+  final static int BYTES_PER_DEL_QUERY = 5*POINTER_NUM_BYTE + 2*OBJECT_HEADER_BYTES + 2*INT_NUM_BYTE + 24;
+
+  /* Initial chunks size of the shared byte[] blocks used to
+     store postings data */
+  final static int BYTE_BLOCK_SHIFT = 15;
+  final static int BYTE_BLOCK_SIZE = 1 << BYTE_BLOCK_SHIFT;
+  final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
+  final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
+
+  final static int MAX_TERM_LENGTH_UTF8 = BYTE_BLOCK_SIZE-2;
+
+  /* Initial chunks size of the shared int[] blocks used to
+     store postings data */
+  final static int INT_BLOCK_SHIFT = 13;
+  final static int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT;
+  final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
+
+  String toMB(long v) {
+    return nf.format(v/1024./1024.);
+  }
+
+}

Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java?rev=966168&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java Wed Jul 21 10:27:20 2010
@@ -0,0 +1,255 @@
+package org.apache.lucene.index;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+abstract class DocumentsWriterThreadPool {
+  public static abstract class Task<T> {
+    private boolean clearThreadBindings = false;
+    
+    protected void clearThreadBindings() {
+      this.clearThreadBindings = true;
+    }
+    
+    boolean doClearThreadBindings() {
+      return clearThreadBindings;
+    }
+  }
+
+  public static abstract class PerThreadTask<T> extends Task<T> {
+    abstract T process(final DocumentsWriterPerThread perThread) throws IOException;
+  }
+  
+  public static abstract class AllThreadsTask<T> extends Task<T> {
+    abstract T process(final Iterator<DocumentsWriterPerThread> threadsIterator) throws IOException;
+  }
+
+  protected abstract static class ThreadState {
+    private DocumentsWriterPerThread perThread;
+    private boolean isIdle = true;
+    
+    void start() {/* extension hook */}
+    void finish() {/* extension hook */}
+  }
+  
+  private int pauseThreads = 0;
+  
+  protected final int maxNumThreadStates;
+  protected ThreadState[] allThreadStates = new ThreadState[0];
+  
+  private final Lock lock = new ReentrantLock();
+  private final Condition threadStateAvailable = lock.newCondition();
+  private boolean globalLock;
+  private boolean aborting;
+
+  DocumentsWriterThreadPool(int maxNumThreadStates) {
+    this.maxNumThreadStates = (maxNumThreadStates < 1) ? IndexWriterConfig.DEFAULT_MAX_THREAD_STATES : maxNumThreadStates;
+  }
+  
+  public final int getMaxThreadStates() {
+    return this.maxNumThreadStates;
+  }
+  
+  void pauseAllThreads() {
+    lock.lock();
+    try {
+      pauseThreads++;
+      while(!allThreadsIdle()) {
+        try {
+          threadStateAvailable.await();
+        } catch (InterruptedException ie) {
+          throw new ThreadInterruptedException(ie);
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  void resumeAllThreads() {
+    lock.lock();
+    try {
+      pauseThreads--;
+      assert pauseThreads >= 0;
+      if (0 == pauseThreads) {
+        threadStateAvailable.signalAll();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private boolean allThreadsIdle() {
+    for (ThreadState state : allThreadStates) {
+      if (!state.isIdle) {
+        return false;
+      }
+    }
+    
+    return true;
+  }
+  
+  void abort() throws IOException {
+    pauseAllThreads();
+    aborting = true;
+    for (ThreadState state : allThreadStates) {
+      state.perThread.abort();
+    }
+  }
+  
+  void finishAbort() {
+    aborting = false;
+    resumeAllThreads();
+  }
+
+  public <T> T executeAllThreads(AllThreadsTask<T> task) throws IOException {
+    T result = null;
+    
+    lock.lock();
+    try {
+      try {
+        while (globalLock) {
+          threadStateAvailable.await();
+        }
+      } catch (InterruptedException ie) {
+        throw new ThreadInterruptedException(ie);
+      }
+      
+      globalLock = true;
+      pauseAllThreads();
+    } finally {
+      lock.unlock();
+    }
+
+    
+    // all threads are idle now
+    
+    try {
+      final ThreadState[] localAllThreads = allThreadStates;
+      
+      result = task.process(new Iterator<DocumentsWriterPerThread>() {
+        int i = 0;
+  
+        @Override
+        public boolean hasNext() {
+          return i < localAllThreads.length;
+        }
+  
+        @Override
+        public DocumentsWriterPerThread next() {
+          return localAllThreads[i++].perThread;
+        }
+  
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException("remove() not supported.");
+        }
+      });
+      return result;
+    } finally {
+      lock.lock();
+      try {
+        try {
+          if (task.doClearThreadBindings()) {
+            clearAllThreadBindings();
+          }
+        } finally {
+          globalLock = false;
+          resumeAllThreads();
+          threadStateAvailable.signalAll();
+        }
+      } finally {
+        lock.unlock();
+      }
+      
+    }
+  }
+
+  
+  public final <T> T executePerThread(DocumentsWriter documentsWriter, Document doc, PerThreadTask<T> task) throws IOException {
+    ThreadState state = acquireThreadState(documentsWriter, doc);
+    boolean success = false;
+    try {
+      T result = task.process(state.perThread);
+      success = true;
+      return result;
+    } finally {
+      boolean abort = false;
+      if (!success && state.perThread.aborting) {
+        state.perThread.aborting = false;
+        abort = true;
+      }
+
+      returnDocumentsWriterPerThread(state, task.doClearThreadBindings());
+      
+      if (abort) {
+        documentsWriter.abort();
+      }
+    }
+  }
+  
+  protected final <T extends ThreadState> T addNewThreadState(DocumentsWriter documentsWriter, T threadState) {
+    // Just create a new "private" thread state
+    ThreadState[] newArray = new ThreadState[1+allThreadStates.length];
+    if (allThreadStates.length > 0)
+      System.arraycopy(allThreadStates, 0, newArray, 0, allThreadStates.length);
+    threadState.perThread = documentsWriter.newDocumentsWriterPerThread(); 
+    newArray[allThreadStates.length] = threadState;
+
+    allThreadStates = newArray;
+    return threadState;
+  }
+  
+  protected abstract ThreadState selectThreadState(Thread requestingThread, DocumentsWriter documentsWriter, Document doc);
+  protected void clearThreadBindings(ThreadState flushedThread) {
+    // subclasses can optionally override this to cleanup after a thread flushed
+  }
+
+  protected void clearAllThreadBindings() {
+    // subclasses can optionally override this to cleanup after a thread flushed
+  }
+  
+  
+  private final ThreadState acquireThreadState(DocumentsWriter documentsWriter, Document doc) {
+    lock.lock();
+    try {
+      ThreadState threadState = selectThreadState(Thread.currentThread(), documentsWriter, doc);
+      
+      try {
+        while (!threadState.isIdle || globalLock || aborting) {
+          threadStateAvailable.await();
+        }
+      } catch (InterruptedException ie) {
+        throw new ThreadInterruptedException(ie);
+      }
+      
+      threadState.isIdle = false;
+      threadState.start();
+      
+      return threadState;
+      
+    } finally {
+      lock.unlock();
+    }
+  }
+  
+  private final void returnDocumentsWriterPerThread(ThreadState state, boolean clearThreadBindings) {
+    lock.lock();
+    try {
+      state.finish();
+      if (clearThreadBindings) {
+        clearThreadBindings(state);
+      }
+      state.isIdle = true;
+      threadStateAvailable.signalAll();
+    } finally {
+      lock.unlock();
+    }
+  }
+}



Mime
View raw message