jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ju...@apache.org
Subject svn commit: r1443129 - /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
Date Wed, 06 Feb 2013 19:00:47 GMT
Author: jukka
Date: Wed Feb  6 19:00:47 2013
New Revision: 1443129

URL: http://svn.apache.org/viewvc?rev=1443129&view=rev
Log:
OAK-593: Segment-based MK

Correct synchronization of SegmentWriter

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1443129&r1=1443128&r2=1443129&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
Wed Feb  6 19:00:47 2013
@@ -62,7 +62,7 @@ public class SegmentWriter {
         this.blockSegmentSize = blocksPerSegment * BLOCK_SIZE;
     }
 
-    public void flush() {
+    public synchronized void flush() {
         if (buffer.position() > 0) {
             byte[] data = new byte[buffer.position()];
             buffer.flip();
@@ -104,6 +104,54 @@ public class SegmentWriter {
         return new RecordId(uuid, buffer.position());
     }
 
+    private synchronized void writeRecordId(RecordId id) {
+        UUID segmentId = id.getSegmentId();
+        int index = uuids.indexOf(segmentId);
+        if (index == -1) {
+            index = uuids.size();
+            uuids.add(segmentId);
+        }
+        buffer.putInt(index << 24 | id.getOffset());
+    }
+
+    private void writeInlineBlocks(
+            List<RecordId> blockIds, byte[] buffer, int offset, int length) {
+        int begin = offset;
+        int end = offset + length;
+        while (begin + BLOCK_SIZE <= end) {
+            blockIds.add(writeBlock(buffer, begin, BLOCK_SIZE));
+            begin += BLOCK_SIZE;
+        }
+        if (begin < end) {
+            blockIds.add(writeBlock(buffer, begin, end - begin));
+        }
+    }
+
+    private void writeBulkSegment(
+            List<RecordId> blockIds, byte[] buffer, int offset, int length) {
+        UUID segmentId = UUID.randomUUID();
+        store.createSegment(segmentId, buffer, offset, length);
+        for (int position = 0; position < length; position += BLOCK_SIZE) {
+            blockIds.add(new RecordId(segmentId, position));
+        }
+    }
+
+    private synchronized RecordId writeListBucket(List<RecordId> bucket) {
+        RecordId bucketId = prepare(0, bucket);
+        for (RecordId id : bucket) {
+            writeRecordId(id);
+        }
+        return bucketId;
+    }
+
+    private synchronized RecordId writeValueRecord(
+            long length, RecordId blocks) {
+        RecordId valueId = prepare(8, Collections.singleton(blocks));
+        buffer.putLong(length);
+        writeRecordId(blocks);
+        return valueId;
+    }
+
     /**
      * Writes a block record containing the given block of bytes.
      *
@@ -128,38 +176,30 @@ public class SegmentWriter {
      * @param ids list of record identifiers
      * @return list record identifier
      */
-    public synchronized RecordId writeList(List<RecordId> ids) {
+    public RecordId writeList(List<RecordId> ids) {
         checkNotNull(ids);
 
         int size = ids.size();
         if (size == 0) {
             return prepare(0);
-        } else if (size == 1) {
-            return ids.iterator().next();
         } else {
             List<RecordId> thisLevel = ids;
-            do {
+            while (thisLevel.size() > 1) {
                 List<RecordId> nextLevel = new ArrayList<RecordId>();
                 for (List<RecordId> bucket :
                         Lists.partition(thisLevel, ListRecord.LEVEL_SIZE)) {
-                    RecordId bucketId = prepare(0, bucket);
-                    for (RecordId id : bucket) {
-                        writeRecordId(id);
-                    }
-                    nextLevel.add(bucketId);
+                    nextLevel.add(writeListBucket(bucket));
                 }
                 thisLevel = nextLevel;
-            } while (thisLevel.size() > 1);
+            }
             return thisLevel.iterator().next();
         }
     }
 
     /**
-     * Writes a value record containing the given sequence of bytes.
+     * Writes a string value record.
      *
-     * @param bytes source buffer
-     * @param offset offset within the source buffer
-     * @param length number of bytes in the value
+     * @param string string to be written
      * @return value record identifier
      */
     public RecordId writeString(String string) {
@@ -181,82 +221,53 @@ public class SegmentWriter {
             }
         }
 
-        return writeValueRecord(data.length, blockIds);
+        return writeValueRecord(data.length, writeList(blockIds));
     }
 
+    /**
+     * Writes a stream value record. The given stream is consumed
+     * <em>and closed</em> by this method.
+     *
+     * @param stream stream to be written
+     * @return value record identifier
+     * @throws IOException if the stream could not be read
+     */
     public RecordId writeStream(InputStream stream) throws IOException {
-        List<RecordId> blockIds = new ArrayList<RecordId>();
+        try {
+            List<RecordId> blockIds = new ArrayList<RecordId>();
 
-        // First read the head of the stream. This covers most small
-        // binaries and the frequently accessed head of larger ones.
-        // The head gets inlined in the current segment.
-        byte[] head = new byte[INLINE_SIZE];
-        int headLength = ByteStreams.read(stream, head, 0, head.length);
-
-        writeInlineBlocks(blockIds, head, 0, headLength);
-        long length = headLength;
-
-        // If the stream filled the full head buffer, it's likely that
-        // the bulk of the data is still to come. Read it in larger
-        // chunks and save in separate segments.
-        if (headLength == head.length) {
-            byte[] bulk = new byte[blockSegmentSize];
-            int bulkLength = ByteStreams.read(stream, bulk, 0, bulk.length);
-            while (bulkLength > INLINE_SIZE) {
-                writeBulkSegment(blockIds, bulk, 0, bulkLength);
-                length += bulkLength;
-                bulkLength = ByteStreams.read(stream, bulk, 0, bulk.length);
-            }
-            // The tail chunk of the stream is too small to put in a separate
-            // segment, so we inline also it.
-            if (bulkLength > 0) {
-                writeInlineBlocks(blockIds, bulk, 0, bulkLength);
-                length += bulkLength;
+            // First read the head of the stream. This covers most small
+            // binaries and the frequently accessed head of larger ones.
+            // The head gets inlined in the current segment.
+            byte[] head = new byte[INLINE_SIZE];
+            int headLength = ByteStreams.read(stream, head, 0, head.length);
+
+            writeInlineBlocks(blockIds, head, 0, headLength);
+            long length = headLength;
+
+            // If the stream filled the full head buffer, it's likely that
+            // the bulk of the data is still to come. Read it in larger
+            // chunks and save in separate segments.
+            if (headLength == head.length) {
+                byte[] bulk = new byte[blockSegmentSize];
+                int bulkLength = ByteStreams.read(stream, bulk, 0, bulk.length);
+                while (bulkLength > INLINE_SIZE) {
+                    writeBulkSegment(blockIds, bulk, 0, bulkLength);
+                    length += bulkLength;
+                    bulkLength = ByteStreams.read(stream, bulk, 0, bulk.length);
+                }
+                // The tail chunk of the stream is too small to put in
+                // a separate segment, so we inline also it.
+                if (bulkLength > 0) {
+                    writeInlineBlocks(blockIds, bulk, 0, bulkLength);
+                    length += bulkLength;
+                }
             }
-        }
-
-        return writeValueRecord(length, blockIds);
-    }
-
-    private RecordId writeValueRecord(long length, List<RecordId> blockIds) {
-        // Store the list of blocks along with the length of the value
-        RecordId listId = writeList(blockIds);
-        RecordId valueId = prepare(8, Collections.singleton(listId));
-        buffer.putLong(length);
-        writeRecordId(listId);
-        return valueId;
-    }
-
-    private void writeInlineBlocks(
-            List<RecordId> blockIds, byte[] buffer, int offset, int length) {
-        int begin = offset;
-        int end = offset + length;
-        while (begin + BLOCK_SIZE <= end) {
-            blockIds.add(writeBlock(buffer, begin, BLOCK_SIZE));
-            begin += BLOCK_SIZE;
-        }
-        if (begin < end) {
-            blockIds.add(writeBlock(buffer, begin, end - begin));
-        }
-    }
 
-    private void writeBulkSegment(
-            List<RecordId> blockIds, byte[] buffer, int offset, int length) {
-        UUID segmentId = UUID.randomUUID();
-        store.createSegment(segmentId, buffer, offset, length);
-        for (int position = 0; position < length; position += BLOCK_SIZE) {
-            blockIds.add(new RecordId(segmentId, position));
-        }
-    }
-
-    private void writeRecordId(RecordId id) {
-        UUID segmentId = id.getSegmentId();
-        int index = uuids.indexOf(segmentId);
-        if (index == -1) {
-            index = uuids.size();
-            uuids.add(segmentId);
+            return writeValueRecord(length, writeList(blockIds));
+        } finally {
+            stream.close();
         }
-        buffer.putInt(index << 24 | id.getOffset());
     }
 
 }



Mime
View raw message