Author: jukka
Date: Wed Feb 6 19:00:47 2013
New Revision: 1443129
URL: http://svn.apache.org/viewvc?rev=1443129&view=rev
Log:
OAK-593: Segment-based MK
Correct synchronization of SegmentWriter
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1443129&r1=1443128&r2=1443129&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
Wed Feb 6 19:00:47 2013
@@ -62,7 +62,7 @@ public class SegmentWriter {
this.blockSegmentSize = blocksPerSegment * BLOCK_SIZE;
}
- public void flush() {
+ public synchronized void flush() {
if (buffer.position() > 0) {
byte[] data = new byte[buffer.position()];
buffer.flip();
@@ -104,6 +104,54 @@ public class SegmentWriter {
return new RecordId(uuid, buffer.position());
}
+ private synchronized void writeRecordId(RecordId id) {
+ UUID segmentId = id.getSegmentId();
+ int index = uuids.indexOf(segmentId);
+ if (index == -1) {
+ index = uuids.size();
+ uuids.add(segmentId);
+ }
+ buffer.putInt(index << 24 | id.getOffset());
+ }
+
+ private void writeInlineBlocks(
+ List<RecordId> blockIds, byte[] buffer, int offset, int length) {
+ int begin = offset;
+ int end = offset + length;
+ while (begin + BLOCK_SIZE <= end) {
+ blockIds.add(writeBlock(buffer, begin, BLOCK_SIZE));
+ begin += BLOCK_SIZE;
+ }
+ if (begin < end) {
+ blockIds.add(writeBlock(buffer, begin, end - begin));
+ }
+ }
+
+ private void writeBulkSegment(
+ List<RecordId> blockIds, byte[] buffer, int offset, int length) {
+ UUID segmentId = UUID.randomUUID();
+ store.createSegment(segmentId, buffer, offset, length);
+ for (int position = 0; position < length; position += BLOCK_SIZE) {
+ blockIds.add(new RecordId(segmentId, position));
+ }
+ }
+
+ private synchronized RecordId writeListBucket(List<RecordId> bucket) {
+ RecordId bucketId = prepare(0, bucket);
+ for (RecordId id : bucket) {
+ writeRecordId(id);
+ }
+ return bucketId;
+ }
+
+ private synchronized RecordId writeValueRecord(
+ long length, RecordId blocks) {
+ RecordId valueId = prepare(8, Collections.singleton(blocks));
+ buffer.putLong(length);
+ writeRecordId(blocks);
+ return valueId;
+ }
+
/**
* Writes a block record containing the given block of bytes.
*
@@ -128,38 +176,30 @@ public class SegmentWriter {
* @param ids list of record identifiers
* @return list record identifier
*/
- public synchronized RecordId writeList(List<RecordId> ids) {
+ public RecordId writeList(List<RecordId> ids) {
checkNotNull(ids);
int size = ids.size();
if (size == 0) {
return prepare(0);
- } else if (size == 1) {
- return ids.iterator().next();
} else {
List<RecordId> thisLevel = ids;
- do {
+ while (thisLevel.size() > 1) {
List<RecordId> nextLevel = new ArrayList<RecordId>();
for (List<RecordId> bucket :
Lists.partition(thisLevel, ListRecord.LEVEL_SIZE)) {
- RecordId bucketId = prepare(0, bucket);
- for (RecordId id : bucket) {
- writeRecordId(id);
- }
- nextLevel.add(bucketId);
+ nextLevel.add(writeListBucket(bucket));
}
thisLevel = nextLevel;
- } while (thisLevel.size() > 1);
+ }
return thisLevel.iterator().next();
}
}
/**
- * Writes a value record containing the given sequence of bytes.
+ * Writes a string value record.
*
- * @param bytes source buffer
- * @param offset offset within the source buffer
- * @param length number of bytes in the value
+ * @param string string to be written
* @return value record identifier
*/
public RecordId writeString(String string) {
@@ -181,82 +221,53 @@ public class SegmentWriter {
}
}
- return writeValueRecord(data.length, blockIds);
+ return writeValueRecord(data.length, writeList(blockIds));
}
+ /**
+ * Writes a stream value record. The given stream is consumed
+ * <em>and closed</em> by this method.
+ *
+ * @param stream stream to be written
+ * @return value record identifier
+ * @throws IOException if the stream could not be read
+ */
public RecordId writeStream(InputStream stream) throws IOException {
- List<RecordId> blockIds = new ArrayList<RecordId>();
+ try {
+ List<RecordId> blockIds = new ArrayList<RecordId>();
- // First read the head of the stream. This covers most small
- // binaries and the frequently accessed head of larger ones.
- // The head gets inlined in the current segment.
- byte[] head = new byte[INLINE_SIZE];
- int headLength = ByteStreams.read(stream, head, 0, head.length);
-
- writeInlineBlocks(blockIds, head, 0, headLength);
- long length = headLength;
-
- // If the stream filled the full head buffer, it's likely that
- // the bulk of the data is still to come. Read it in larger
- // chunks and save in separate segments.
- if (headLength == head.length) {
- byte[] bulk = new byte[blockSegmentSize];
- int bulkLength = ByteStreams.read(stream, bulk, 0, bulk.length);
- while (bulkLength > INLINE_SIZE) {
- writeBulkSegment(blockIds, bulk, 0, bulkLength);
- length += bulkLength;
- bulkLength = ByteStreams.read(stream, bulk, 0, bulk.length);
- }
- // The tail chunk of the stream is too small to put in a separate
- // segment, so we inline also it.
- if (bulkLength > 0) {
- writeInlineBlocks(blockIds, bulk, 0, bulkLength);
- length += bulkLength;
+ // First read the head of the stream. This covers most small
+ // binaries and the frequently accessed head of larger ones.
+ // The head gets inlined in the current segment.
+ byte[] head = new byte[INLINE_SIZE];
+ int headLength = ByteStreams.read(stream, head, 0, head.length);
+
+ writeInlineBlocks(blockIds, head, 0, headLength);
+ long length = headLength;
+
+ // If the stream filled the full head buffer, it's likely that
+ // the bulk of the data is still to come. Read it in larger
+ // chunks and save in separate segments.
+ if (headLength == head.length) {
+ byte[] bulk = new byte[blockSegmentSize];
+ int bulkLength = ByteStreams.read(stream, bulk, 0, bulk.length);
+ while (bulkLength > INLINE_SIZE) {
+ writeBulkSegment(blockIds, bulk, 0, bulkLength);
+ length += bulkLength;
+ bulkLength = ByteStreams.read(stream, bulk, 0, bulk.length);
+ }
+ // The tail chunk of the stream is too small to put in
+ // a separate segment, so we inline also it.
+ if (bulkLength > 0) {
+ writeInlineBlocks(blockIds, bulk, 0, bulkLength);
+ length += bulkLength;
+ }
}
- }
-
- return writeValueRecord(length, blockIds);
- }
-
- private RecordId writeValueRecord(long length, List<RecordId> blockIds) {
- // Store the list of blocks along with the length of the value
- RecordId listId = writeList(blockIds);
- RecordId valueId = prepare(8, Collections.singleton(listId));
- buffer.putLong(length);
- writeRecordId(listId);
- return valueId;
- }
-
- private void writeInlineBlocks(
- List<RecordId> blockIds, byte[] buffer, int offset, int length) {
- int begin = offset;
- int end = offset + length;
- while (begin + BLOCK_SIZE <= end) {
- blockIds.add(writeBlock(buffer, begin, BLOCK_SIZE));
- begin += BLOCK_SIZE;
- }
- if (begin < end) {
- blockIds.add(writeBlock(buffer, begin, end - begin));
- }
- }
- private void writeBulkSegment(
- List<RecordId> blockIds, byte[] buffer, int offset, int length) {
- UUID segmentId = UUID.randomUUID();
- store.createSegment(segmentId, buffer, offset, length);
- for (int position = 0; position < length; position += BLOCK_SIZE) {
- blockIds.add(new RecordId(segmentId, position));
- }
- }
-
- private void writeRecordId(RecordId id) {
- UUID segmentId = id.getSegmentId();
- int index = uuids.indexOf(segmentId);
- if (index == -1) {
- index = uuids.size();
- uuids.add(segmentId);
+ return writeValueRecord(length, writeList(blockIds));
+ } finally {
+ stream.close();
}
- buffer.putInt(index << 24 | id.getOffset());
}
}
|