lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sim...@apache.org
Subject svn commit: r1141100 - in /lucene/dev/trunk/lucene/src: java/org/apache/lucene/index/values/ java/org/apache/lucene/util/ test/org/apache/lucene/index/values/ test/org/apache/lucene/util/
Date Wed, 29 Jun 2011 13:39:32 GMT
Author: simonw
Date: Wed Jun 29 13:39:32 2011
New Revision: 1141100

URL: http://svn.apache.org/viewvc?rev=1141100&view=rev
Log:
LUCENE-3216: keep doc values in memory during indexing while merge directly to the target
file

Added:
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/util/TestByteBlockPool.java
Modified:
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/Bytes.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/FixedDerefBytesImpl.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/FixedSortedBytesImpl.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/FixedStraightBytesImpl.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/VarDerefBytesImpl.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/VarSortedBytesImpl.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/VarStraightBytesImpl.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/util/ByteBlockPool.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/values/TestDocValues.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/values/TestDocValuesIndexing.java

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/Bytes.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/Bytes.java?rev=1141100&r1=1141099&r2=1141100&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/Bytes.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/Bytes.java Wed Jun 29
13:39:32 2011
@@ -31,7 +31,6 @@ import org.apache.lucene.store.Directory
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.util.AttributeSource;
-import org.apache.lucene.util.ByteBlockPool;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.CodecUtil;
 import org.apache.lucene.util.IOUtils;
@@ -116,7 +115,7 @@ public final class Bytes {
 
     if (fixedSize) {
       if (mode == Mode.STRAIGHT) {
-        return new FixedStraightBytesImpl.Writer(dir, id);
+        return new FixedStraightBytesImpl.Writer(dir, id, bytesUsed);
       } else if (mode == Mode.DEREF) {
         return new FixedDerefBytesImpl.Writer(dir, id, bytesUsed);
       } else if (mode == Mode.SORTED) {
@@ -337,37 +336,56 @@ public final class Bytes {
   // TODO: open up this API?!
   static abstract class BytesWriterBase extends Writer {
     private final String id;
-    protected IndexOutput idxOut;
-    protected IndexOutput datOut;
+    private IndexOutput idxOut;
+    private IndexOutput datOut;
     protected BytesRef bytesRef;
-    protected final ByteBlockPool pool;
+    private final Directory dir;
+    private final String codecName;
+    private final int version;
 
     protected BytesWriterBase(Directory dir, String id, String codecName,
-        int version, boolean initIndex, ByteBlockPool pool,
+        int version,
         AtomicLong bytesUsed) throws IOException {
       super(bytesUsed);
       this.id = id;
-      this.pool = pool;
-      datOut = dir.createOutput(IndexFileNames.segmentFileName(id, "",
-            DATA_EXTENSION));
+      this.dir = dir;
+      this.codecName = codecName;
+      this.version = version;
+    }
+    
+    protected IndexOutput getDataOut() throws IOException {
+      if (datOut == null) {
+        boolean success = false;
+        try {
+          datOut = dir.createOutput(IndexFileNames.segmentFileName(id, "",
+              DATA_EXTENSION));
+          CodecUtil.writeHeader(datOut, codecName, version);
+          success = true;
+        } finally {
+          if (!success) {
+            IOUtils.closeSafely(true, datOut);
+          }
+        }
+      }
+      return datOut;
+    }
+
+    protected IndexOutput getIndexOut() throws IOException {
       boolean success = false;
       try {
-        CodecUtil.writeHeader(datOut, codecName, version);
-        if (initIndex) {
+        if (idxOut == null) {
           idxOut = dir.createOutput(IndexFileNames.segmentFileName(id, "",
               INDEX_EXTENSION));
           CodecUtil.writeHeader(idxOut, codecName, version);
-        } else {
-          idxOut = null;
         }
         success = true;
       } finally {
         if (!success) {
-          IOUtils.closeSafely(true, datOut, idxOut);
+          IOUtils.closeSafely(true, idxOut);
         }
       }
+      return idxOut;
     }
-
     /**
      * Must be called only with increasing docIDs. It's OK for some docIDs to be
      * skipped; they will be filled with 0 bytes.
@@ -376,15 +394,7 @@ public final class Bytes {
     public abstract void add(int docID, BytesRef bytes) throws IOException;
 
     @Override
-    public void finish(int docCount) throws IOException {
-      try {
-        IOUtils.closeSafely(false, datOut, idxOut);
-      } finally {
-        if (pool != null) {
-          pool.reset();
-        }
-      }
-    }
+    public abstract void finish(int docCount) throws IOException;
 
     @Override
     protected void mergeDoc(int docID) throws IOException {

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/FixedDerefBytesImpl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/FixedDerefBytesImpl.java?rev=1141100&r1=1141099&r2=1141100&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/FixedDerefBytesImpl.java
(original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/FixedDerefBytesImpl.java
Wed Jun 29 13:39:32 2011
@@ -25,11 +25,13 @@ import org.apache.lucene.index.values.By
 import org.apache.lucene.index.values.Bytes.BytesWriterBase;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.AttributeSource;
 import org.apache.lucene.util.ByteBlockPool;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefHash;
+import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.PagedBytes;
 import org.apache.lucene.util.RamUsageEstimator;
 import org.apache.lucene.util.ByteBlockPool.Allocator;
@@ -51,9 +53,7 @@ class FixedDerefBytesImpl {
   static class Writer extends BytesWriterBase {
     private int size = -1;
     private int[] docToID;
-    private final BytesRefHash hash = new BytesRefHash(pool,
-        BytesRefHash.DEFAULT_CAPACITY, new TrackingDirectBytesStartArray(
-            BytesRefHash.DEFAULT_CAPACITY, bytesUsed));
+    private final BytesRefHash hash;
     public Writer(Directory dir, String id, AtomicLong bytesUsed)
         throws IOException {
       this(dir, id, new DirectTrackingAllocator(ByteBlockPool.BYTE_BLOCK_SIZE, bytesUsed),
@@ -62,11 +62,12 @@ class FixedDerefBytesImpl {
 
     public Writer(Directory dir, String id, Allocator allocator,
         AtomicLong bytesUsed) throws IOException {
-      super(dir, id, CODEC_NAME, VERSION_CURRENT, true,
-          new ByteBlockPool(allocator), bytesUsed);
+      super(dir, id, CODEC_NAME, VERSION_CURRENT, bytesUsed);
+      hash = new BytesRefHash(new ByteBlockPool(allocator),
+          BytesRefHash.DEFAULT_CAPACITY, new TrackingDirectBytesStartArray(
+              BytesRefHash.DEFAULT_CAPACITY, bytesUsed));
       docToID = new int[1];
-      bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT); // TODO BytesRefHash
-                                                            // uses bytes too!
+      bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT);
     }
 
     @Override
@@ -75,20 +76,14 @@ class FixedDerefBytesImpl {
         return;
       if (size == -1) {
         size = bytes.length;
-        datOut.writeInt(size);
       } else if (bytes.length != size) {
         throw new IllegalArgumentException("expected bytes size=" + size
             + " but got " + bytes.length);
       }
       int ord = hash.add(bytes);
-
-      if (ord >= 0) {
-        // new added entry
-        datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length);
-      } else {
+      if (ord < 0) {
         ord = (-ord) - 1;
       }
-
       if (docID >= docToID.length) {
         final int size = docToID.length;
         docToID = ArrayUtil.grow(docToID, 1 + docID);
@@ -102,11 +97,27 @@ class FixedDerefBytesImpl {
     // some last docs that we didn't see
     @Override
     public void finish(int docCount) throws IOException {
+      boolean success = false;
+      final int numValues = hash.size();
+      final IndexOutput datOut = getDataOut();
       try {
-        if (size == -1) {
-          datOut.writeInt(size);
+        datOut.writeInt(size);
+        if (size != -1) {
+          final BytesRef bytesRef = new BytesRef(size);
+          for (int i = 0; i < numValues; i++) {
+            hash.get(i, bytesRef);
+            datOut.writeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length);
+          }
         }
-        final int count = 1 + hash.size();
+        success = true;
+      } finally {
+        IOUtils.closeSafely(!success, datOut);
+        hash.close();
+      }
+      success = false;
+      final IndexOutput idxOut = getIndexOut();
+      try {
+        final int count = 1 + numValues;
         idxOut.writeInt(count - 1);
         // write index
         final PackedInts.Writer w = PackedInts.getWriter(idxOut, docCount,
@@ -120,9 +131,9 @@ class FixedDerefBytesImpl {
           w.add(0);
         }
         w.finish();
+        success = true;
       } finally {
-        hash.close();
-        super.finish(docCount);
+        IOUtils.closeSafely(!success, idxOut);
         bytesUsed
             .addAndGet((-docToID.length) * RamUsageEstimator.NUM_BYTES_INT);
         docToID = null;

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/FixedSortedBytesImpl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/FixedSortedBytesImpl.java?rev=1141100&r1=1141099&r2=1141100&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/FixedSortedBytesImpl.java
(original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/FixedSortedBytesImpl.java
Wed Jun 29 13:39:32 2011
@@ -27,12 +27,14 @@ import org.apache.lucene.index.values.By
 import org.apache.lucene.index.values.FixedDerefBytesImpl.Reader.DerefBytesEnum;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.AttributeSource;
 import org.apache.lucene.util.ByteBlockPool;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefHash;
 import org.apache.lucene.util.CodecUtil;
+import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.PagedBytes;
 import org.apache.lucene.util.RamUsageEstimator;
 import org.apache.lucene.util.ByteBlockPool.Allocator;
@@ -56,10 +58,7 @@ class FixedSortedBytesImpl {
     private int size = -1;
     private int[] docToEntry;
     private final Comparator<BytesRef> comp;
-
-    private final BytesRefHash hash = new BytesRefHash(pool,
-        BytesRefHash.DEFAULT_CAPACITY, new TrackingDirectBytesStartArray(
-            BytesRefHash.DEFAULT_CAPACITY, bytesUsed));
+    private final BytesRefHash hash;
 
     public Writer(Directory dir, String id, Comparator<BytesRef> comp,
         AtomicLong bytesUsed) throws IOException {
@@ -69,10 +68,12 @@ class FixedSortedBytesImpl {
 
     public Writer(Directory dir, String id, Comparator<BytesRef> comp,
         Allocator allocator, AtomicLong bytesUsed) throws IOException {
-      super(dir, id, CODEC_NAME, VERSION_CURRENT, true,
-          new ByteBlockPool(allocator), bytesUsed);
+      super(dir, id, CODEC_NAME, VERSION_CURRENT, bytesUsed);
+      ByteBlockPool pool = new ByteBlockPool(allocator);
+      hash = new BytesRefHash(pool, BytesRefHash.DEFAULT_CAPACITY,
+          new TrackingDirectBytesStartArray(BytesRefHash.DEFAULT_CAPACITY,
+              bytesUsed));
       docToEntry = new int[1];
-      // docToEntry[0] = -1;
       bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT);
       this.comp = comp;
     }
@@ -83,7 +84,6 @@ class FixedSortedBytesImpl {
         return; // default - skip it
       if (size == -1) {
         size = bytes.length;
-        datOut.writeInt(size);
       } else if (bytes.length != size) {
         throw new IllegalArgumentException("expected bytes size=" + size
             + " but got " + bytes.length);
@@ -104,26 +104,36 @@ class FixedSortedBytesImpl {
     // some last docs that we didn't see
     @Override
     public void finish(int docCount) throws IOException {
+      final IndexOutput datOut = getDataOut();
+      boolean success = false;
+      final int count = hash.size();
+      final int[] address = new int[count];
+
       try {
-        if (size == -1) {// no data added
-          datOut.writeInt(size);
-        }
-        final int[] sortedEntries = hash.sort(comp);
-        final int count = hash.size();
-        int[] address = new int[count];
-        // first dump bytes data, recording address as we go
-        for (int i = 0; i < count; i++) {
-          final int e = sortedEntries[i];
-          final BytesRef bytes = hash.get(e, new BytesRef());
-          assert bytes.length == size;
-          datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length);
-          address[e] = 1 + i;
+        datOut.writeInt(size);
+        if (size != -1) {
+          final int[] sortedEntries = hash.sort(comp);
+          // first dump bytes data, recording address as we go
+          final BytesRef bytesRef = new BytesRef(size);
+          for (int i = 0; i < count; i++) {
+            final int e = sortedEntries[i];
+            final BytesRef bytes = hash.get(e, bytesRef);
+            assert bytes.length == size;
+            datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length);
+            address[e] = 1 + i;
+          }
         }
-
+        success = true;
+      } finally {
+        IOUtils.closeSafely(!success, datOut);
+        hash.close();
+      }
+      final IndexOutput idxOut = getIndexOut();
+      success = false;
+      try {
         idxOut.writeInt(count);
-
         // next write index
-        PackedInts.Writer w = PackedInts.getWriter(idxOut, docCount,
+        final PackedInts.Writer w = PackedInts.getWriter(idxOut, docCount,
             PackedInts.bitsRequired(count));
         final int limit;
         if (docCount > docToEntry.length) {
@@ -148,11 +158,10 @@ class FixedSortedBytesImpl {
         }
         w.finish();
       } finally {
-        super.finish(docCount);
+        IOUtils.closeSafely(!success, idxOut);
         bytesUsed.addAndGet((-docToEntry.length)
             * RamUsageEstimator.NUM_BYTES_INT);
         docToEntry = null;
-        hash.close();
       }
     }
   }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/FixedStraightBytesImpl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/FixedStraightBytesImpl.java?rev=1141100&r1=1141099&r2=1141100&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/FixedStraightBytesImpl.java
(original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/FixedStraightBytesImpl.java
Wed Jun 29 13:39:32 2011
@@ -17,14 +17,20 @@ package org.apache.lucene.index.values;
  * limitations under the License.
  */
 
+import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
+
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.lucene.index.values.Bytes.BytesBaseSource;
 import org.apache.lucene.index.values.Bytes.BytesReaderBase;
 import org.apache.lucene.index.values.Bytes.BytesWriterBase;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.util.AttributeSource;
+import org.apache.lucene.util.ByteBlockPool;
+import org.apache.lucene.util.ByteBlockPool.DirectTrackingAllocator;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.PagedBytes;
@@ -44,30 +50,59 @@ class FixedStraightBytesImpl {
     private int size = -1;
     // start at -1 if the first added value is > 0
     private int lastDocID = -1;
-    private byte[] oneRecord;
-
-    public Writer(Directory dir, String id) throws IOException {
-      super(dir, id, CODEC_NAME, VERSION_CURRENT, false, null, null);
+    private final ByteBlockPool pool;
+    private boolean merge;
+    private final int byteBlockSize;
+    private IndexOutput datOut;
+
+    public Writer(Directory dir, String id, AtomicLong bytesUsed) throws IOException {
+      super(dir, id, CODEC_NAME, VERSION_CURRENT, bytesUsed);
+      pool = new ByteBlockPool(new DirectTrackingAllocator(bytesUsed));
+      byteBlockSize = BYTE_BLOCK_SIZE;
     }
 
-
     @Override
     public void add(int docID, BytesRef bytes) throws IOException {
+      assert lastDocID < docID;
+      assert !merge;
       if (size == -1) {
+        if (bytes.length > BYTE_BLOCK_SIZE) {
+          throw new IllegalArgumentException("bytes arrays > " + Short.MAX_VALUE + " are
not supported");
+        }
         size = bytes.length;
-        datOut.writeInt(size);
-        oneRecord = new byte[size];
+        pool.nextBuffer();
       } else if (bytes.length != size) {
         throw new IllegalArgumentException("expected bytes size=" + size
             + " but got " + bytes.length);
       }
-      fill(docID);
-      assert bytes.bytes.length >= bytes.length;
-      datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length);
+      if (lastDocID+1 < docID) {
+        advancePool(docID);
+      }
+      pool.copy(bytes);
+      lastDocID = docID;
+    }
+    
+    private final void advancePool(int docID) {
+      assert !merge;
+      long numBytes = (docID - (lastDocID+1))*size;
+      while(numBytes > 0) {
+        if (numBytes + pool.byteUpto < byteBlockSize) {
+          pool.byteUpto += numBytes;
+          numBytes = 0;
+        } else {
+          numBytes -= byteBlockSize - pool.byteUpto;
+          pool.nextBuffer();
+        }
+      }
+      assert numBytes == 0;
     }
 
     @Override
     protected void merge(MergeState state) throws IOException {
+      merge = true;
+      datOut = getDataOut();
+      boolean success = false;
+      try {
       if (state.bits == null && state.reader instanceof Reader) {
         Reader reader = (Reader) state.reader;
         final int maxDocs = reader.maxDoc;
@@ -77,48 +112,92 @@ class FixedStraightBytesImpl {
         if (size == -1) {
           size = reader.size;
           datOut.writeInt(size);
-          oneRecord = new byte[size];
         }
-        fill(state.docBase);
+        if (lastDocID+1 < state.docBase) {
+          fill(datOut, state.docBase);
+          lastDocID = state.docBase-1;
+        }
         // TODO should we add a transfer to API to each reader?
         final IndexInput cloneData = reader.cloneData();
         try {
           datOut.copyBytes(cloneData, size * maxDocs);
         } finally {
-          cloneData.close();  
+          IOUtils.closeSafely(true, cloneData);  
         }
         
-        lastDocID += maxDocs - 1;
+        lastDocID += maxDocs;
       } else {
         super.merge(state);
       }
+      success = true;
+      } finally {
+        if (!success) {
+          IOUtils.closeSafely(!success, datOut);
+        }
+      }
+    }
+    
+    
+
+    @Override
+    protected void mergeDoc(int docID) throws IOException {
+      assert lastDocID < docID;
+      if (size == -1) {
+        size = bytesRef.length;
+        datOut.writeInt(size);
+      }
+      assert size == bytesRef.length;
+      if (lastDocID+1 < docID) {
+        fill(datOut, docID);
+      }
+      datOut.writeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length);
+      lastDocID = docID;
     }
 
+
+
     // Fills up to but not including this docID
-    private void fill(int docID) throws IOException {
+    private void fill(IndexOutput datOut, int docID) throws IOException {
       assert size >= 0;
-      for (int i = lastDocID + 1; i < docID; i++) {
-        datOut.writeBytes(oneRecord, size);
+      final long numBytes = (docID - (lastDocID+1))*size;
+      final byte zero = 0;
+      for (long i = 0; i < numBytes; i++) {
+        datOut.writeByte(zero);
       }
-      lastDocID = docID;
     }
 
     @Override
     public void finish(int docCount) throws IOException {
+      boolean success = false;
       try {
-        if (size == -1) {// no data added
-          datOut.writeInt(0);
+        if (!merge) {
+          // indexing path - no disk IO until here
+          assert datOut == null;
+          datOut = getDataOut();
+          if (size == -1) {
+            datOut.writeInt(0);
+          } else {
+            datOut.writeInt(size);
+            pool.writePool(datOut);
+          }
+          if (lastDocID + 1 < docCount) {
+            fill(datOut, docCount);
+          }
         } else {
-          fill(docCount);
+          // merge path - datOut should be initialized
+          assert datOut != null;
+          if (size == -1) {// no data added
+            datOut.writeInt(0);
+          } else {
+            fill(datOut, docCount);
+          }
         }
+        success = true;
       } finally {
-        super.finish(docCount);
+        pool.dropBuffersAndReset();
+        IOUtils.closeSafely(!success, datOut);
       }
     }
-
-    public long ramBytesUsed() {
-      return oneRecord == null ? 0 : oneRecord.length;
-    }
   }
   
   public static class Reader extends BytesReaderBase {

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/VarDerefBytesImpl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/VarDerefBytesImpl.java?rev=1141100&r1=1141099&r2=1141100&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/VarDerefBytesImpl.java
(original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/VarDerefBytesImpl.java
Wed Jun 29 13:39:32 2011
@@ -27,12 +27,14 @@ import org.apache.lucene.index.values.Fi
 import org.apache.lucene.store.DataOutput;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.AttributeSource;
 import org.apache.lucene.util.ByteBlockPool;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefHash;
 import org.apache.lucene.util.CodecUtil;
+import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.PagedBytes;
 import org.apache.lucene.util.RamUsageEstimator;
 import org.apache.lucene.util.ByteBlockPool.Allocator;
@@ -113,7 +115,7 @@ class VarDerefBytesImpl {
 
     private final AddressByteStartArray array = new AddressByteStartArray(1,
         bytesUsed);
-    private final BytesRefHash hash = new BytesRefHash(pool, 16, array);
+    private final BytesRefHash hash;
 
     public Writer(Directory dir, String id, AtomicLong bytesUsed)
         throws IOException {
@@ -123,8 +125,8 @@ class VarDerefBytesImpl {
 
     public Writer(Directory dir, String id, Allocator allocator,
         AtomicLong bytesUsed) throws IOException {
-      super(dir, id, CODEC_NAME, VERSION_CURRENT, true,
-          new ByteBlockPool(allocator), bytesUsed);
+      super(dir, id, CODEC_NAME, VERSION_CURRENT, bytesUsed);
+      hash = new BytesRefHash(new ByteBlockPool(allocator), 16, array);
       docToAddress = new int[1];
       bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT);
     }
@@ -144,8 +146,7 @@ class VarDerefBytesImpl {
       final int docAddress;
       if (e >= 0) {
         docAddress = array.address[e] = address;
-        address += writePrefixLength(datOut, bytes);
-        datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length);
+        address += bytes.length < 128 ? 1 : 2;
         address += bytes.length;
       } else {
         docAddress = array.address[(-e) - 1];
@@ -169,6 +170,24 @@ class VarDerefBytesImpl {
     // some last docs that we didn't see
     @Override
     public void finish(int docCount) throws IOException {
+      final IndexOutput datOut = getDataOut();
+      boolean success = false;
+      try {
+        final int size = hash.size();
+        final BytesRef bytesRef = new BytesRef();
+        for (int i = 0; i < size; i++) {
+          hash.get(i, bytesRef);
+          writePrefixLength(datOut, bytesRef);
+          datOut.writeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length);
+        }
+        success = true;
+      } finally {
+        hash.close();
+        IOUtils.closeSafely(!success, datOut);
+      }
+      
+      final IndexOutput idxOut = getIndexOut();
+      success = false;
       try {
         idxOut.writeInt(address - 1);
         // write index
@@ -189,9 +208,9 @@ class VarDerefBytesImpl {
           w.add(0);
         }
         w.finish();
+        success = true;
       } finally {
-        hash.close();
-        super.finish(docCount);
+        IOUtils.closeSafely(!success,idxOut);
         bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT
             * (-docToAddress.length));
         docToAddress = null;

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/VarSortedBytesImpl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/VarSortedBytesImpl.java?rev=1141100&r1=1141099&r2=1141100&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/VarSortedBytesImpl.java
(original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/VarSortedBytesImpl.java
Wed Jun 29 13:39:32 2011
@@ -27,11 +27,13 @@ import org.apache.lucene.index.values.By
 import org.apache.lucene.index.values.Bytes.BytesWriterBase;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.AttributeSource;
 import org.apache.lucene.util.ByteBlockPool;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefHash;
+import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.PagedBytes;
 import org.apache.lucene.util.RamUsageEstimator;
 import org.apache.lucene.util.ByteBlockPool.Allocator;
@@ -56,9 +58,7 @@ class VarSortedBytesImpl {
     private int[] docToEntry;
     private final Comparator<BytesRef> comp;
 
-    private final BytesRefHash hash = new BytesRefHash(pool,
-        BytesRefHash.DEFAULT_CAPACITY, new TrackingDirectBytesStartArray(
-            BytesRefHash.DEFAULT_CAPACITY, bytesUsed));
+    private final BytesRefHash hash; 
 
     public Writer(Directory dir, String id, Comparator<BytesRef> comp,
         AtomicLong bytesUsed) throws IOException {
@@ -68,13 +68,14 @@ class VarSortedBytesImpl {
 
     public Writer(Directory dir, String id, Comparator<BytesRef> comp,
         Allocator allocator, AtomicLong bytesUsed) throws IOException {
-      super(dir, id, CODEC_NAME, VERSION_CURRENT, true,
-          new ByteBlockPool(allocator), bytesUsed);
+      super(dir, id, CODEC_NAME, VERSION_CURRENT, bytesUsed);
+      this.hash = new BytesRefHash(new ByteBlockPool(allocator),
+          BytesRefHash.DEFAULT_CAPACITY, new TrackingDirectBytesStartArray(
+              BytesRefHash.DEFAULT_CAPACITY, bytesUsed));
       this.comp = comp;
       docToEntry = new int[1];
       docToEntry[0] = -1;
       bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT);
-
     }
 
     @Override
@@ -99,14 +100,16 @@ class VarSortedBytesImpl {
     @Override
     public void finish(int docCount) throws IOException {
       final int count = hash.size();
+      final IndexOutput datOut = getDataOut();
+      long offset = 0;
+      long lastOffset = 0;
+      final int[] index = new int[count];
+      final long[] offsets = new long[count];
+      boolean success = false;
       try {
         final int[] sortedEntries = hash.sort(comp);
         // first dump bytes data, recording index & offset as
         // we go
-        long offset = 0;
-        long lastOffset = 0;
-        final int[] index = new int[count];
-        final long[] offsets = new long[count];
         for (int i = 0; i < count; i++) {
           final int e = sortedEntries[i];
           offsets[i] = offset;
@@ -118,7 +121,14 @@ class VarSortedBytesImpl {
           lastOffset = offset;
           offset += bytes.length;
         }
-
+        success = true;
+      } finally {
+        IOUtils.closeSafely(!success, datOut);
+        hash.close();
+      }
+      final IndexOutput idxOut = getIndexOut();
+      success = false;
+      try {
         // total bytes of data
         idxOut.writeLong(offset);
 
@@ -145,11 +155,12 @@ class VarSortedBytesImpl {
           offsetWriter.add(offsets[i]);
         }
         offsetWriter.finish();
+        success = true;
       } finally {
-        super.finish(docCount);
         bytesUsed.addAndGet((-docToEntry.length)
             * RamUsageEstimator.NUM_BYTES_INT);
-        hash.close();
+        docToEntry = null;
+        IOUtils.closeSafely(!success, idxOut);
       }
     }
   }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/VarStraightBytesImpl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/VarStraightBytesImpl.java?rev=1141100&r1=1141099&r2=1141100&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/VarStraightBytesImpl.java
(original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/values/VarStraightBytesImpl.java
Wed Jun 29 13:39:32 2011
@@ -25,12 +25,17 @@ import org.apache.lucene.index.values.By
 import org.apache.lucene.index.values.Bytes.BytesWriterBase;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.AttributeSource;
+import org.apache.lucene.util.ByteBlockPool;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.PagedBytes;
 import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.ByteBlockPool.DirectTrackingAllocator;
 import org.apache.lucene.util.packed.PackedInts;
+import org.apache.lucene.util.packed.PackedInts.ReaderIterator;
 
 // Variable length byte[] per document, no sharing
 
@@ -48,11 +53,15 @@ class VarStraightBytesImpl {
     // start at -1 if the first added value is > 0
     private int lastDocID = -1;
     private long[] docToAddress;
-
+    private final ByteBlockPool pool;
+    private IndexOutput datOut;
+    private boolean merge = false;
     public Writer(Directory dir, String id, AtomicLong bytesUsed)
         throws IOException {
-      super(dir, id, CODEC_NAME, VERSION_CURRENT, true, null, bytesUsed);
+      super(dir, id, CODEC_NAME, VERSION_CURRENT, bytesUsed);
+      pool = new ByteBlockPool(new DirectTrackingAllocator(bytesUsed));
       docToAddress = new long[1];
+      pool.nextBuffer(); // init
       bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT);
     }
 
@@ -67,21 +76,109 @@ class VarStraightBytesImpl {
       for (int i = lastDocID + 1; i < docID; i++) {
         docToAddress[i] = address;
       }
-      lastDocID = docID;
     }
 
     @Override
     public void add(int docID, BytesRef bytes) throws IOException {
-      if (bytes.length == 0)
+      assert !merge;
+      if (bytes.length == 0) {
         return; // default
+      }
       fill(docID);
       docToAddress[docID] = address;
-      datOut.writeBytes(bytes.bytes, bytes.offset, bytes.length);
+      pool.copy(bytes);
       address += bytes.length;
+      lastDocID = docID;
     }
+    
+    @Override
+    protected void merge(MergeState state) throws IOException {
+      merge = true;
+      datOut = getDataOut();
+      boolean success = false;
+      try {
+        if (state.bits == null && state.reader instanceof Reader) {
+          // bulk merge since we don't have any deletes
+          Reader reader = (Reader) state.reader;
+          final int maxDocs = reader.maxDoc;
+          if (maxDocs == 0) {
+            return;
+          }
+          if (lastDocID+1 < state.docBase) {
+            fill(state.docBase);
+            lastDocID = state.docBase-1;
+          }
+          final long numDataBytes;
+          final IndexInput cloneIdx = reader.cloneIndex();
+          try {
+            numDataBytes = cloneIdx.readVLong();
+            final ReaderIterator iter = PackedInts.getReaderIterator(cloneIdx);
+            for (int i = 0; i < maxDocs; i++) {
+              long offset = iter.next();
+              ++lastDocID;
+              if (lastDocID >= docToAddress.length) {
+                int oldSize = docToAddress.length;
+                docToAddress = ArrayUtil.grow(docToAddress, 1 + lastDocID);
+                bytesUsed.addAndGet((docToAddress.length - oldSize)
+                    * RamUsageEstimator.NUM_BYTES_INT);
+              }
+              docToAddress[lastDocID] = address + offset;
+            }
+            address += numDataBytes; // this is the address after all addr pointers are updated
+            iter.close();
+          } finally {
+            IOUtils.closeSafely(true, cloneIdx);
+          }
+          final IndexInput cloneData = reader.cloneData();
+          try {
+            datOut.copyBytes(cloneData, numDataBytes);
+          } finally {
+            IOUtils.closeSafely(true, cloneData);  
+          }
+        } else {
+          super.merge(state);
+        }
+        success = true;
+      } finally {
+        if (!success) {
+          IOUtils.closeSafely(!success, datOut);
+        }
+      }
+    }
+    
+    @Override
+    protected void mergeDoc(int docID) throws IOException {
+      assert merge;
+      assert lastDocID < docID;
+      if (bytesRef.length == 0) {
+        return; // default
+      }
+      fill(docID);
+      datOut.writeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length);
+      docToAddress[docID] = address;
+      address += bytesRef.length;
+      lastDocID = docID;
+    }
+    
 
     @Override
     public void finish(int docCount) throws IOException {
+      boolean success = false;
+      assert (!merge && datOut == null) || (merge && datOut != null); 
+      final IndexOutput datOut = getDataOut();
+      try {
+        if (!merge) {
+          // header is already written in getDataOut()
+          pool.writePool(datOut);
+        }
+        success = true;
+      } finally {
+        IOUtils.closeSafely(!success, datOut); 
+        pool.dropBuffersAndReset();
+      }
+
+      success = false;
+      final IndexOutput idxOut = getIndexOut();
       try {
         if (lastDocID == -1) {
           idxOut.writeVLong(0);
@@ -101,11 +198,12 @@ class VarStraightBytesImpl {
           }
           w.finish();
         }
+        success = true;
       } finally {
         bytesUsed.addAndGet(-(docToAddress.length)
             * RamUsageEstimator.NUM_BYTES_INT);
         docToAddress = null;
-        super.finish(docCount);
+        IOUtils.closeSafely(!success, idxOut);
       }
     }
 
@@ -179,21 +277,23 @@ class VarStraightBytesImpl {
     }
 
     private class VarStraightBytesEnum extends ValuesEnum {
-      private final PackedInts.Reader addresses;
+      private final PackedInts.ReaderIterator addresses;
       private final IndexInput datIn;
       private final IndexInput idxIn;
       private final long fp;
       private final long totBytes;
       private int pos = -1;
+      private long nextAddress;
 
       protected VarStraightBytesEnum(AttributeSource source, IndexInput datIn,
           IndexInput idxIn) throws IOException {
         super(source, ValueType.BYTES_VAR_STRAIGHT);
         totBytes = idxIn.readVLong();
         fp = datIn.getFilePointer();
-        addresses = PackedInts.getReader(idxIn);
+        addresses = PackedInts.getReaderIterator(idxIn);
         this.datIn = datIn;
         this.idxIn = idxIn;
+        nextAddress = addresses.next();
       }
 
       @Override
@@ -207,7 +307,7 @@ class VarStraightBytesImpl {
         if (target >= maxDoc) {
           return pos = NO_MORE_DOCS;
         }
-        final long addr = addresses.get(target);
+        final long addr = pos+1 == target ? nextAddress : addresses.advance(target);
         if (addr == totBytes) { // empty values at the end
           bytesRef.length = 0;
           bytesRef.offset = 0;
@@ -215,7 +315,7 @@ class VarStraightBytesImpl {
         }
         datIn.seek(fp + addr);
         final int size = (int) (target == maxDoc - 1 ? totBytes - addr
-            : addresses.get(target + 1) - addr);
+            : (nextAddress = addresses.next()) - addr);
         if (bytesRef.bytes.length < size) {
           bytesRef.grow(size);
         }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/util/ByteBlockPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/util/ByteBlockPool.java?rev=1141100&r1=1141099&r2=1141100&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/util/ByteBlockPool.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/util/ByteBlockPool.java Wed Jun 29
13:39:32 2011
@@ -16,10 +16,13 @@ package org.apache.lucene.util;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.lucene.store.DataOutput;
+
 import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
 
 /** 
@@ -241,5 +244,42 @@ public final class ByteBlockPool {
     assert term.length >= 0;
     return term;
   }
+  
+  /**
+   * Copies the given {@link BytesRef} at the current positions (
+   * {@link #byteUpto} across buffer boundaries
+   */
+  public final void copy(final BytesRef bytes) {
+    int length = bytes.length;
+    int offset = bytes.offset;
+    int overflow = (length + byteUpto) - BYTE_BLOCK_SIZE;
+    do {
+      if (overflow <= 0) { 
+        System.arraycopy(bytes.bytes, offset, buffer, byteUpto, length);
+        byteUpto += length;
+        break;
+      } else {
+        final int bytesToCopy = length-overflow;
+        System.arraycopy(bytes.bytes, offset, buffer, byteUpto, bytesToCopy);
+        offset += bytesToCopy;
+        length -= bytesToCopy;
+        nextBuffer();
+        overflow = overflow - BYTE_BLOCK_SIZE;
+      }
+    }  while(true);
+  }
+  
+  /**
+   * Writes the pools content to the given {@link DataOutput}
+   */
+  public final void writePool(final DataOutput out) throws IOException {
+    int bytesOffset = byteOffset;
+    int block = 0;
+    while (bytesOffset > 0) {
+      out.writeBytes(buffers[block++], BYTE_BLOCK_SIZE);
+      bytesOffset -= BYTE_BLOCK_SIZE;
+    }
+    out.writeBytes(buffers[block], byteUpto);
+  }
 }
 

Modified: lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/values/TestDocValues.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/values/TestDocValues.java?rev=1141100&r1=1141099&r2=1141100&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/values/TestDocValues.java (original)
+++ lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/values/TestDocValues.java Wed
Jun 29 13:39:32 2011
@@ -64,7 +64,7 @@ public class TestDocValues extends Lucen
     Writer w = Bytes.getWriter(dir, "test", mode, comp, fixedSize, trackBytes);
     int maxDoc = 220;
     final String[] values = new String[maxDoc];
-    final int fixedLength = 3 + random.nextInt(7);
+    final int fixedLength = 1 + atLeast(50);
     for (int i = 0; i < 100; i++) {
       final String s;
       if (i > 0 && random.nextInt(5) <= 2) {

Modified: lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/values/TestDocValuesIndexing.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/values/TestDocValuesIndexing.java?rev=1141100&r1=1141099&r2=1141100&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/values/TestDocValuesIndexing.java
(original)
+++ lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/values/TestDocValuesIndexing.java
Wed Jun 29 13:39:32 2011
@@ -329,8 +329,7 @@ public class TestDocValuesIndexing exten
     final int numValues = 50 + atLeast(10);
     for (ValueType byteIndexValue : byteVariantList) {
       List<Closeable> closeables = new ArrayList<Closeable>();
-
-      int bytesSize = 1 + atLeast(10);
+      final int bytesSize = 1 + atLeast(50);
       OpenBitSet deleted = indexValues(w, numValues, byteIndexValue,
           byteVariantList, withDeletions, bytesSize);
       final IndexReader r = IndexReader.open(w, withDeletions);
@@ -357,7 +356,7 @@ public class TestDocValuesIndexing exten
           assertNotNull("expected none null - " + msg, br);
           if (br.length != 0) {
             assertEquals("expected zero bytes of length " + bytesSize + " - "
-                + msg, bytesSize, br.length);
+                + msg + br.utf8ToString(), bytesSize, br.length);
             for (int j = 0; j < br.length; j++) {
               assertEquals("Byte at index " + j + " doesn't match - " + msg, 0,
                   br.bytes[br.offset + j]);
@@ -391,12 +390,12 @@ public class TestDocValuesIndexing exten
         while (withDeletions && deleted.get(v++)) {
           upto += bytesSize;
         }
-
         BytesRef br = bytes.getBytes(i, new BytesRef());
         if (bytesEnum.docID() != i) {
           assertEquals("seek failed for index " + i + " " + msg, i, bytesEnum
               .advance(i));
         }
+        assertTrue(msg, br.length > 0);
         for (int j = 0; j < br.length; j++, upto++) {
           assertTrue(" enumRef not initialized " + msg,
               enumRef.bytes.length > 0);

Added: lucene/dev/trunk/lucene/src/test/org/apache/lucene/util/TestByteBlockPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/test/org/apache/lucene/util/TestByteBlockPool.java?rev=1141100&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/src/test/org/apache/lucene/util/TestByteBlockPool.java (added)
+++ lucene/dev/trunk/lucene/src/test/org/apache/lucene/util/TestByteBlockPool.java Wed Jun
29 13:39:32 2011
@@ -0,0 +1,67 @@
+package org.apache.lucene.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.RAMDirectory;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public class TestByteBlockPool extends LuceneTestCase {
+
+  public void testCopyRefAndWrite() throws IOException {
+    List<String> list = new ArrayList<String>();
+    int maxLength = atLeast(500);
+    ByteBlockPool pool = new ByteBlockPool(new ByteBlockPool.DirectAllocator());
+    pool.nextBuffer();
+    final int numValues = atLeast(100);
+    BytesRef ref = new BytesRef();
+    for (int i = 0; i < numValues; i++) {
+      final String value = _TestUtil.randomRealisticUnicodeString(random,
+          maxLength);
+      list.add(value);
+      ref.copy(value);
+      pool.copy(ref);
+    }
+    RAMDirectory dir = new RAMDirectory();
+    IndexOutput stream = dir.createOutput("foo.txt");
+    pool.writePool(stream);
+    stream.flush();
+    stream.close();
+    IndexInput input = dir.openInput("foo.txt");
+    assertEquals(pool.byteOffset + pool.byteUpto, stream.length());
+    BytesRef expected = new BytesRef();
+    BytesRef actual = new BytesRef();
+    for (String string : list) {
+      expected.copy(string);
+      actual.grow(expected.length);
+      actual.length = expected.length;
+      input.readBytes(actual.bytes, 0, actual.length);
+      assertEquals(expected, actual);
+    }
+    try {
+      input.readByte();
+      fail("must be EOF");
+    } catch (IOException e) {
+      // expected - read past EOF
+    }
+    dir.close();
+  }
+}



Mime
View raw message