lucene-java-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mikemcc...@apache.org
Subject svn commit: r919060 - in /lucene/java/trunk: ./ src/java/org/apache/lucene/index/ src/java/org/apache/lucene/store/ src/test/org/apache/lucene/store/
Date Thu, 04 Mar 2010 16:46:18 GMT
Author: mikemccand
Date: Thu Mar  4 16:46:18 2010
New Revision: 919060

URL: http://svn.apache.org/viewvc?rev=919060&view=rev
Log:
LUCENE-2283: use shared byte[] pool to buffer pending stored fields & term vectors during
indexing; fixes excessive memory usage for mixed tiny & big docs with many threads

Modified:
    lucene/java/trunk/CHANGES.txt
    lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/java/trunk/src/java/org/apache/lucene/index/StoredFieldsWriter.java
    lucene/java/trunk/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
    lucene/java/trunk/src/java/org/apache/lucene/store/RAMFile.java
    lucene/java/trunk/src/java/org/apache/lucene/store/RAMOutputStream.java
    lucene/java/trunk/src/test/org/apache/lucene/store/TestHugeRamFile.java

Modified: lucene/java/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/java/trunk/CHANGES.txt?rev=919060&r1=919059&r2=919060&view=diff
==============================================================================
--- lucene/java/trunk/CHANGES.txt (original)
+++ lucene/java/trunk/CHANGES.txt Thu Mar  4 16:46:18 2010
@@ -101,6 +101,12 @@
 * LUCENE-2273: FieldCacheImpl.getCacheEntries() used WeakHashMap
   incorrectly and lead to ConcurrentModificationException.
   (Uwe Schindler, Robert Muir)
+
+* LUCENE-2283: Use shared memory pool for term vector and stored
+  fields buffers. This memory will be reclaimed if needed according to
+  the configured RAM Buffer Size for the IndexWriter.  This also fixes
+  potentially excessive memory usage when many threads are indexing a
+  mix of small and large documents.  (Tim Smith via Mike McCandless)
   
 New features
 

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=919060&r1=919059&r2=919060&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java Thu Mar  4 16:46:18
2010
@@ -37,6 +37,7 @@
 import org.apache.lucene.search.Weight;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMFile;
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.Constants;
 import org.apache.lucene.util.ThreadInterruptedException;
@@ -172,6 +173,46 @@
       this.next = next;
     }
   }
+
+  /**
+   * Create and return a new DocWriterBuffer.
+   */
+  PerDocBuffer newPerDocBuffer() {
+    return new PerDocBuffer();
+  }
+
+  /**
+   * RAMFile buffer for DocWriters.
+   */
+  class PerDocBuffer extends RAMFile {
+    
+    /**
+     * Allocate bytes used from shared pool.
+     */
+    protected byte[] newBuffer(int size) {
+      assert size == PER_DOC_BLOCK_SIZE;
+      return perDocAllocator.getByteBlock(false);
+    }
+    
+    /**
+     * Recycle the bytes used.
+     */
+    synchronized void recycle() {
+      if (buffers.size() > 0) {
+        setLength(0);
+        
+        // Recycle the blocks
+        final int blockCount = buffers.size();
+        
+        final byte[][] blocks = buffers.toArray( new byte[blockCount][] );
+        perDocAllocator.recycleByteBlocks(blocks, 0, blockCount);
+        buffers.clear();
+        sizeInBytes = 0;
+        
+        assert numBuffers() == 0;
+      }
+    }
+  }
   
   /**
    * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
@@ -1200,6 +1241,11 @@
   final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
 
   private class ByteBlockAllocator extends ByteBlockPool.Allocator {
+    final int blockSize;
+
+    ByteBlockAllocator(int blockSize) {
+      this.blockSize = blockSize;
+    }
 
     ArrayList<byte[]> freeByteBlocks = new ArrayList<byte[]>();
     
@@ -1216,12 +1262,12 @@
           // things that don't track allocations (term
           // vectors) and things that do (freq/prox
           // postings).
-          numBytesAlloc += BYTE_BLOCK_SIZE;
-          b = new byte[BYTE_BLOCK_SIZE];
+          numBytesAlloc += blockSize;
+          b = new byte[blockSize];
         } else
           b = freeByteBlocks.remove(size-1);
         if (trackAllocations)
-          numBytesUsed += BYTE_BLOCK_SIZE;
+          numBytesUsed += blockSize;
         assert numBytesUsed <= numBytesAlloc;
         return b;
       }
@@ -1282,7 +1328,12 @@
       freeIntBlocks.add(blocks[i]);
   }
 
-  ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator();
+  ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(BYTE_BLOCK_SIZE);
+
+  final static int PER_DOC_BLOCK_SIZE = 1024;
+
+  final ByteBlockAllocator perDocAllocator = new ByteBlockAllocator(PER_DOC_BLOCK_SIZE);
+
 
   /* Initial chunk size of the shared char[] blocks used to
      store term text */
@@ -1322,10 +1373,12 @@
     return nf.format(v/1024./1024.);
   }
 
-  /* We have three pools of RAM: Postings, byte blocks
-   * (holds freq/prox posting data) and char blocks (holds
-   * characters in the term).  Different docs require
-   * varying amount of storage from these three classes.
+  /* We have four pools of RAM: Postings, byte blocks
+   * (holds freq/prox posting data), char blocks (holds
+   * characters in the term) and per-doc buffers (stored fields/term vectors).  
+   * Different docs require varying amount of storage from 
+   * these four classes.
+   * 
    * For example, docs with many unique single-occurrence
    * short terms will use up the Postings RAM and hardly any
    * of the other two.  Whereas docs with very large terms
@@ -1349,6 +1402,7 @@
                 " deletesMB=" + toMB(deletesRAMUsed) +
                 " vs trigger=" + toMB(freeTrigger) +
                 " byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE)
+
+                " perDocFree=" + toMB(perDocAllocator.freeByteBlocks.size()*PER_DOC_BLOCK_SIZE)
+
                 " charBlockFree=" + toMB(freeCharBlocks.size()*CHAR_BLOCK_SIZE*CHAR_NUM_BYTE));
 
       final long startBytesAlloc = numBytesAlloc + deletesRAMUsed;
@@ -1364,7 +1418,11 @@
       while(numBytesAlloc+deletesRAMUsed > freeLevel) {
       
         synchronized(this) {
-          if (0 == byteBlockAllocator.freeByteBlocks.size() && 0 == freeCharBlocks.size()
&& 0 == freeIntBlocks.size() && !any) {
+          if (0 == perDocAllocator.freeByteBlocks.size() 
+              && 0 == byteBlockAllocator.freeByteBlocks.size() 
+              && 0 == freeCharBlocks.size() 
+              && 0 == freeIntBlocks.size() 
+              && !any) {
             // Nothing else to free -- must flush now.
             bufferIsFull = numBytesUsed+deletesRAMUsed > flushTrigger;
             if (infoStream != null) {
@@ -1377,23 +1435,34 @@
             break;
           }
 
-          if ((0 == iter % 4) && byteBlockAllocator.freeByteBlocks.size() > 0)
{
+          if ((0 == iter % 5) && byteBlockAllocator.freeByteBlocks.size() > 0)
{
             byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1);
             numBytesAlloc -= BYTE_BLOCK_SIZE;
           }
 
-          if ((1 == iter % 4) && freeCharBlocks.size() > 0) {
+          if ((1 == iter % 5) && freeCharBlocks.size() > 0) {
             freeCharBlocks.remove(freeCharBlocks.size()-1);
             numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
           }
 
-          if ((2 == iter % 4) && freeIntBlocks.size() > 0) {
+          if ((2 == iter % 5) && freeIntBlocks.size() > 0) {
             freeIntBlocks.remove(freeIntBlocks.size()-1);
             numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE;
           }
+
+          if ((3 == iter % 5) && perDocAllocator.freeByteBlocks.size() > 0) {
+            // Remove upwards of 32 blocks (each block is 1K)
+            for (int i = 0; i < 32; ++i) {
+              perDocAllocator.freeByteBlocks.remove(perDocAllocator.freeByteBlocks.size()
- 1);
+              numBytesAlloc -= PER_DOC_BLOCK_SIZE;
+              if (perDocAllocator.freeByteBlocks.size() == 0) {
+                break;
+              }
+            }
+          }
         }
 
-        if ((3 == iter % 4) && any)
+        if ((4 == iter % 5) && any)
           // Ask consumer to free any recycled state
           any = consumer.freeRAM();
 

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/StoredFieldsWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/StoredFieldsWriter.java?rev=919060&r1=919059&r2=919060&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/StoredFieldsWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/StoredFieldsWriter.java Thu Mar  4
16:46:18 2010
@@ -166,14 +166,13 @@
   }
 
   class PerDoc extends DocumentsWriter.DocWriter {
-
-    // TODO: use something more memory efficient; for small
-    // docs the 1024 buffer size of RAMOutputStream wastes alot
-    RAMOutputStream fdt = new RAMOutputStream();
+    final DocumentsWriter.PerDocBuffer buffer = docWriter.newPerDocBuffer();
+    RAMOutputStream fdt = new RAMOutputStream(buffer);
     int numStoredFields;
 
     void reset() {
       fdt.reset();
+      buffer.recycle();
       numStoredFields = 0;
     }
 
@@ -185,7 +184,7 @@
 
     @Override
     public long sizeInBytes() {
-      return fdt.sizeInBytes();
+      return buffer.getSizeInBytes();
     }
 
     @Override

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java?rev=919060&r1=919059&r2=919060&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java Thu Mar
 4 16:46:18 2010
@@ -248,9 +248,9 @@
 
   class PerDoc extends DocumentsWriter.DocWriter {
 
-    // TODO: use something more memory efficient; for small
-    // docs the 1024 buffer size of RAMOutputStream wastes alot
-    RAMOutputStream perDocTvf = new RAMOutputStream();
+    final DocumentsWriter.PerDocBuffer buffer = docWriter.newPerDocBuffer();
+    RAMOutputStream perDocTvf = new RAMOutputStream(buffer);
+
     int numVectorFields;
 
     int[] fieldNumbers = new int[1];
@@ -258,6 +258,7 @@
 
     void reset() {
       perDocTvf.reset();
+      buffer.recycle();
       numVectorFields = 0;
     }
 
@@ -281,7 +282,7 @@
 
     @Override
     public long sizeInBytes() {
-      return perDocTvf.sizeInBytes();
+      return buffer.getSizeInBytes();
     }
 
     @Override

Modified: lucene/java/trunk/src/java/org/apache/lucene/store/RAMFile.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/store/RAMFile.java?rev=919060&r1=919059&r2=919060&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/store/RAMFile.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/store/RAMFile.java Thu Mar  4 16:46:18 2010
@@ -20,44 +20,45 @@
 import java.util.ArrayList;
 import java.io.Serializable;
 
-class RAMFile implements Serializable {
+/** @lucene.internal */
+public class RAMFile implements Serializable {
 
   private static final long serialVersionUID = 1l;
 
-  private ArrayList<byte[]> buffers = new ArrayList<byte[]>();
+  protected ArrayList<byte[]> buffers = new ArrayList<byte[]>();
   long length;
   RAMDirectory directory;
-  long sizeInBytes;
+  protected long sizeInBytes;
 
   // This is publicly modifiable via Directory.touchFile(), so direct access not supported
   private long lastModified = System.currentTimeMillis();
 
   // File used as buffer, in no RAMDirectory
-  RAMFile() {}
+  protected RAMFile() {}
   
   RAMFile(RAMDirectory directory) {
     this.directory = directory;
   }
 
   // For non-stream access from thread that might be concurrent with writing
-  synchronized long getLength() {
+  public synchronized long getLength() {
     return length;
   }
 
-  synchronized void setLength(long length) {
+  protected synchronized void setLength(long length) {
     this.length = length;
   }
 
   // For non-stream access from thread that might be concurrent with writing
-  synchronized long getLastModified() {
+  public synchronized long getLastModified() {
     return lastModified;
   }
 
-  synchronized void setLastModified(long lastModified) {
+  protected synchronized void setLastModified(long lastModified) {
     this.lastModified = lastModified;
   }
 
-  final byte[] addBuffer(int size) {
+  protected final byte[] addBuffer(int size) {
     byte[] buffer = newBuffer(size);
     synchronized(this) {
       buffers.add(buffer);
@@ -70,11 +71,11 @@
     return buffer;
   }
 
-  final synchronized byte[] getBuffer(int index) {
+  protected final synchronized byte[] getBuffer(int index) {
     return buffers.get(index);
   }
 
-  final synchronized int numBuffers() {
+  protected final synchronized int numBuffers() {
     return buffers.size();
   }
 
@@ -84,11 +85,11 @@
    * @param size size of allocated buffer.
    * @return allocated buffer.
    */
-  byte[] newBuffer(int size) {
+  protected byte[] newBuffer(int size) {
     return new byte[size];
   }
 
-  synchronized long getSizeInBytes() {
+  public synchronized long getSizeInBytes() {
     return sizeInBytes;
   }
   

Modified: lucene/java/trunk/src/java/org/apache/lucene/store/RAMOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/store/RAMOutputStream.java?rev=919060&r1=919059&r2=919060&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/store/RAMOutputStream.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/store/RAMOutputStream.java Thu Mar  4 16:46:18
2010
@@ -40,7 +40,7 @@
     this(new RAMFile());
   }
 
-  RAMOutputStream(RAMFile f) {
+  public RAMOutputStream(RAMFile f) {
     file = f;
 
     // make sure that we switch to the
@@ -66,14 +66,13 @@
     }
   }
 
-  /** Resets this to an empty buffer. */
+  /** Resets this to an empty file. */
   public void reset() {
-    try {
-      seek(0);
-    } catch (IOException e) {                     // should never happen
-      throw new RuntimeException(e.toString());
-    }
-
+    currentBuffer = null;
+    currentBufferIndex = -1;
+    bufferPosition = 0;
+    bufferStart = 0;
+    bufferLength = 0;
     file.setLength(0);
   }
 

Modified: lucene/java/trunk/src/test/org/apache/lucene/store/TestHugeRamFile.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/store/TestHugeRamFile.java?rev=919060&r1=919059&r2=919060&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/store/TestHugeRamFile.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/store/TestHugeRamFile.java Thu Mar  4 16:46:18
2010
@@ -33,7 +33,7 @@
     private long capacity = 0;
     private HashMap<Integer,byte[]> singleBuffers = new HashMap<Integer,byte[]>();
     @Override
-    byte[] newBuffer(int size) {
+    protected byte[] newBuffer(int size) {
       capacity += size;
       if (capacity <= MAX_VALUE) {
         // below maxint we reuse buffers



Mime
View raw message