jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ju...@apache.org
Subject svn commit: r1547700 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/segment/ main/java/org/apache/jackrabbit/oak/plugins/segment/file/ test/java/org/apache/jackrabbit/oak/plugins/segment/
Date Wed, 04 Dec 2013 03:55:57 GMT
Author: jukka
Date: Wed Dec  4 03:55:56 2013
New Revision: 1547700

URL: http://svn.apache.org/r1547700
Log:
OAK-593: Segment-based MK

Use a background thread to flush the TarMK files once every five seconds

Removed:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileJournal.java
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/MappedAccess.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentSizeTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java?rev=1547700&r1=1547699&r2=1547700&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
Wed Dec  4 03:55:56 2013
@@ -20,8 +20,6 @@ import static com.google.common.base.Obj
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkPositionIndexes;
 import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.collect.Lists.newArrayListWithCapacity;
-import static java.util.Collections.emptyList;
 import static org.apache.jackrabbit.oak.plugins.segment.SegmentIdFactory.isDataSegmentId;
 import static org.apache.jackrabbit.oak.plugins.segment.SegmentWriter.BLOCK_SIZE;
 
@@ -102,13 +100,17 @@ public class Segment {
                 }
             };
 
+    private static final UUID[] NO_REFS = new UUID[0];
+
     private final SegmentStore store;
 
     private final UUID uuid;
 
+    private final UUID[] refids;
+
     private final ByteBuffer data;
 
-    private final int refposition;
+    private final boolean current;
 
     public Segment(SegmentStore store, UUID uuid, ByteBuffer data) {
         this.store = checkNotNull(store);
@@ -117,10 +119,28 @@ public class Segment {
 
         int refpos = data.position();
         if (isDataSegmentId(uuid)) {
+            int refs = data.get(refpos) & 0xff;
             int roots = data.getShort(refpos + 1) & 0xffff;
             refpos += align(3 + roots * 3);
+            refids = new UUID[refs];
+            for (int i = 0; i < refs; i++) {
+                refids[i] = new UUID(
+                        data.getLong(refpos + i * 16),
+                        data.getLong(refpos + i * 16 + 8));
+            }
+        } else {
+            refids = NO_REFS;
         }
-        this.refposition = refpos;
+
+        this.current = false;
+    }
+
+    Segment(SegmentStore store, UUID uuid, UUID[] refids, ByteBuffer data) {
+        this.store = checkNotNull(store);
+        this.uuid = checkNotNull(uuid);
+        this.refids = checkNotNull(refids);
+        this.data = checkNotNull(data);
+        this.current = true;
     }
 
     /**
@@ -154,18 +174,7 @@ public class Segment {
     }
 
     public List<UUID> getReferencedIds() {
-        if (isDataSegmentId(uuid)) {
-            int refcount = data.get(data.position()) & 0xff;
-            List<UUID> refs = newArrayListWithCapacity(refcount);
-            for (int i = 0; i < refcount; i++) {
-                refs.add(new UUID(
-                        data.getLong(refposition + i * 16),
-                        data.getLong(refposition + i * 16 + 8)));
-            }
-            return refs;
-        } else {
-            return emptyList();
-        }
+        return Arrays.asList(refids);
     }
 
     public int size() {
@@ -226,8 +235,7 @@ public class Segment {
         UUID refid;
         int refpos = data.get(pos) & 0xff;
         if (refpos != 0xff) {
-            refpos = refposition + refpos * 16;
-            refid = new UUID(data.getLong(refpos), data.getLong(refpos + 8));
+            refid = refids[refpos];
         } else {
             refid = uuid;
         }
@@ -412,27 +420,26 @@ public class Segment {
         StringWriter string = new StringWriter();
         PrintWriter writer = new PrintWriter(string);
 
-        int pos = refposition;
-        int refcount = data.get(data.position()) & 0xff;
-        int rootcount = data.getShort(data.position() + 1) &0xffff;
-        int length =
-                data.capacity() - (align(3 + rootcount * 3) + refcount * 16);
+        int rootcount = 0;
+        int length = data.remaining();
+        if (!current) {
+            rootcount = data.getShort(data.position() + 1) &0xffff;
+            length -= (align(3 + rootcount * 3) + refids.length * 16);
+        }
 
         writer.format(
                 "Segment %s (%d bytes, %d ref%s, %d root%s)%n",
                 uuid, length,
-                refcount, (refcount != 1 ? "s" : ""),
+                refids.length, (refids.length != 1 ? "s" : ""),
                 rootcount, (rootcount != 1 ? "s" : ""));
         writer.println("--------------------------------------------------------------------------");
-        if (refcount > 0) {
-            for (int i = 0; i < refcount; i++) {
-                UUID id = new UUID(data.getLong(pos), data.getLong(pos + 8));
-                writer.format("reference %02x: %s%n", i, id);
-                pos += 16;
+        if (refids.length > 0) {
+            for (int i = 0; i < refids.length; i++) {
+                writer.format("reference %02x: %s%n", i, refids[i]);
             }
             writer.println("--------------------------------------------------------------------------");
         }
-        pos = data.limit() - ((length + 15) & ~15);
+        int pos = data.limit() - ((length + 15) & ~15);
         while (pos < data.limit()) {
             writer.format("%04x: ", (MAX_SEGMENT_SIZE - data.limit() + pos) >> RECORD_ALIGN_BITS);
             for (int i = 0; i < 16; i++) {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1547700&r1=1547699&r2=1547700&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
Wed Dec  4 03:55:56 2013
@@ -116,7 +116,7 @@ public class SegmentWriter {
      * The segment write buffer, filled from the end to the beginning
      * (see OAK-629).
      */
-    private final byte[] buffer = new byte[MAX_SEGMENT_SIZE];
+    private byte[] buffer = new byte[MAX_SEGMENT_SIZE];
 
     /**
      * The number of bytes already written (or allocated). Counted from
@@ -168,12 +168,12 @@ public class SegmentWriter {
     public synchronized Segment getCurrentSegment(UUID id) {
         if (equal(id, uuid)) {
             if (currentSegment == null) {
-                int header = align(3 + roots.size() * 3) + 16 * refids.size();
-                ByteBuffer b = ByteBuffer.allocate(header + length);
-                writeSegmentHeader(b);
-                b.put(buffer, buffer.length - length, length);
-                b.rewind();
-                currentSegment = new Segment(store, uuid, b);
+                ByteBuffer b = ByteBuffer.wrap(buffer);
+                b.position(buffer.length - length);
+                currentSegment = new Segment(
+                        store, uuid,
+                        refids.keySet().toArray(new UUID[refids.size()]),
+                        b);
             }
             return currentSegment;
         } else {
@@ -196,6 +196,7 @@ public class SegmentWriter {
             store.writeSegment(uuid, buffer, buffer.length - length, length);
 
             uuid = newDataSegmentId();
+            buffer = new byte[MAX_SEGMENT_SIZE];
             refids.clear();
             roots.clear();
             length = 0;
@@ -221,10 +222,7 @@ public class SegmentWriter {
             }
         }
         int refCount = refids.size() + segmentIds.size();
-
-        Set<RecordId> rootIds = newHashSet(roots.keySet());
-        rootIds.removeAll(ids);
-        int rootCount = rootIds.size() + 1;
+        int rootCount = roots.size() + 1;
 
         int recordSize = Segment.align(size + ids.size() * Segment.RECORD_ID_BYTES);
         int headerSize = Segment.align(3 + rootCount * 3);

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1547700&r1=1547699&r2=1547700&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Wed Dec  4 03:55:56 2013
@@ -16,11 +16,11 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment.file;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Lists.newArrayList;
 import static com.google.common.collect.Lists.newLinkedList;
-import static com.google.common.collect.Maps.newConcurrentMap;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 import static org.apache.jackrabbit.oak.plugins.segment.SegmentIdFactory.isBulkSegmentId;
 
@@ -30,7 +30,6 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 
 import javax.annotation.Nonnull;
@@ -63,7 +62,13 @@ public class FileStore extends AbstractS
 
     private final RandomAccessFile journalFile;
 
-    private final Map<String, RecordId> journals = newConcurrentMap();
+    private volatile RecordId head;
+
+    private volatile boolean updated = false;
+
+    private volatile boolean alive = true;
+
+    private final Thread flushThread;
 
     public FileStore(File directory, int maxFileSizeMB, boolean memoryMapping)
             throws IOException {
@@ -103,46 +108,64 @@ public class FileStore extends AbstractS
             }
         }
 
+        head = null;
         journalFile = new RandomAccessFile(
                 new File(directory, JOURNAL_FILE_NAME), "rw");
         String line = journalFile.readLine();
         while (line != null) {
             int space = line.indexOf(' ');
             if (space != -1) {
-                String name = line.substring(space + 1);
-                RecordId id = RecordId.fromString(line.substring(0, space));
-                journals.put(name, id);
+                head = RecordId.fromString(line.substring(0, space));
             }
             line = journalFile.readLine();
         }
 
-        if (!journals.containsKey("root")) {
+        if (head == null) {
             NodeBuilder builder = EMPTY_NODE.builder();
             builder.setChildNode("root", initial);
             SegmentNodeState root =
                     getWriter().writeNode(builder.getNodeState());
-            journals.put("root", root.getRecordId());
-            journalFile.writeBytes(root.getRecordId() + " root\n");
+            head = root.getRecordId();
+            updated = true;
         }
-    }
 
-    RecordId getHead(String name) {
-        return journals.get(name);
+        this.flushThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    synchronized (flushThread) {
+                        flushThread.wait(1000);
+                        while (alive) {
+                            flush();
+                            flushThread.wait(5000);
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    // stop flushing
+                }
+            }
+        });
+        flushThread.setName("TarMK flush thread " + directory);
+        flushThread.setDaemon(true);
+        flushThread.setPriority(Thread.MIN_PRIORITY);
+        flushThread.start();
     }
 
-    synchronized boolean setHead(String name, RecordId base, RecordId head) {
-        if (base.equals(journals.get(name))) {
-            getWriter().flush();
+    private synchronized void flush() {
+        if (updated) {
             try {
-                journalFile.writeBytes(head + " " + name + "\n");
-                journals.put(name, head);
-                return true;
+                getWriter().flush();
+                for (TarFile file : bulkFiles) {
+                    file.flush();
+                }
+                for (TarFile file : dataFiles) {
+                    file.flush();
+                }
+                journalFile.writeBytes(head + " root\n");
+                journalFile.getChannel().force(false);
             } catch (IOException e) {
-                throw new IllegalStateException(
-                        "Failed to update journal " + name, e);
+                e.printStackTrace(); // FIXME
             }
-        } else {
-            return false;
         }
     }
 
@@ -162,6 +185,13 @@ public class FileStore extends AbstractS
         try {
             super.close();
 
+            alive = false;
+            synchronized (flushThread) {
+                flushThread.notify();
+            }
+            flushThread.join();
+            flush();
+
             journalFile.close();
 
             for (TarFile file : bulkFiles) {
@@ -181,12 +211,29 @@ public class FileStore extends AbstractS
 
     @Override
     public Journal getJournal(String name) {
-        synchronized (journals) {
-            if (journals.containsKey(name)) {
-                journals.put(name, journals.get("root"));
+        checkArgument("root".equals(name)); // only root supported for now
+        return new Journal() {
+            @Override
+            public RecordId getHead() {
+                return head;
             }
-        }
-        return new FileJournal(this, name);
+            @Override
+            public boolean setHead(RecordId base, RecordId head) {
+                synchronized (FileStore.this) {
+                    if (base.equals(FileStore.this.head)) {
+                        FileStore.this.head = head;
+                        updated = true;
+                        return true;
+                    } else {
+                        return false;
+                    }
+                }
+            }
+            @Override
+            public void merge() {
+                throw new UnsupportedOperationException();
+            }
+        };
     }
 
     @Override @Nonnull

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/MappedAccess.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/MappedAccess.java?rev=1547700&r1=1547699&r2=1547700&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/MappedAccess.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/MappedAccess.java
Wed Dec  4 03:55:56 2013
@@ -28,12 +28,15 @@ class MappedAccess implements FileAccess
 
     private final MappedByteBuffer buffer;
 
+    private boolean updated = false;
+
     MappedAccess(File file, int length) throws IOException {
         RandomAccessFile f = new RandomAccessFile(file, "rw");
         try {
             long l = f.length();
             if (l == 0) { // it's a new file
                 l = length;
+                updated = true;
             }
             buffer = f.getChannel().map(READ_WRITE, 0, l);
         } finally {
@@ -55,16 +58,21 @@ class MappedAccess implements FileAccess
     }
 
     @Override
-    public void write(int position, byte[] b, int offset, int length)
+    public synchronized void write(
+            int position, byte[] b, int offset, int length)
             throws IOException {
         ByteBuffer entry = buffer.duplicate();
         entry.position(position);
         entry.put(b, offset, length);
+        updated = true;
     }
 
     @Override
-    public void flush() {
-        buffer.force();
+    public synchronized void flush() {
+        if (updated) {
+            buffer.force();
+            updated = false;
+        }
     }
 
     @Override

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java?rev=1547700&r1=1547699&r2=1547700&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
Wed Dec  4 03:55:56 2013
@@ -179,8 +179,13 @@ class TarFile {
         return true;
     }
 
-    void close() throws IOException {
+    public void flush() throws IOException {
         access.flush();
+    }
+
+
+    void close() throws IOException {
+        flush();
         access.close();
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentSizeTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentSizeTest.java?rev=1547700&r1=1547699&r2=1547700&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentSizeTest.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentSizeTest.java
Wed Dec  4 03:55:56 2013
@@ -154,6 +154,7 @@ public class SegmentSizeTest {
         }
 
         SegmentNodeState state = writer.writeNode(builder.getNodeState());
+        writer.flush();
         Segment segment = store.readSegment(state.getRecordId().getSegmentId());
         assertEquals(26752, Segment.WEIGHER.weigh(null, segment));
 
@@ -162,6 +163,7 @@ public class SegmentSizeTest {
         builder = state.builder();
         builder.child("child1000");
         state = writer.writeNode(builder.getNodeState());
+        writer.flush();
         segment = store.readSegment(state.getRecordId().getSegmentId());
         assertEquals(136, Segment.WEIGHER.weigh(null, segment));
     }
@@ -170,6 +172,7 @@ public class SegmentSizeTest {
         SegmentStore store = new MemoryStore();
         SegmentWriter writer = store.getWriter();
         RecordId id = writer.writeNode(builder.getNodeState()).getRecordId();
+        writer.flush();
         Segment segment = store.readSegment(id.getSegmentId());
         return Segment.WEIGHER.weigh(null, segment);
     }
@@ -179,6 +182,7 @@ public class SegmentSizeTest {
         SegmentWriter writer = store.getWriter();
         NodeState state = builder.getNodeState();
         RecordId id = writer.writeNode(state).getRecordId();
+        writer.flush();
         Segment segment = store.readSegment(id.getSegmentId());
         int base = Segment.WEIGHER.weigh(null, segment);
 
@@ -186,6 +190,7 @@ public class SegmentSizeTest {
         writer = store.getWriter();
         writer.writeNode(state);
         id = writer.writeNode(state).getRecordId();
+        writer.flush();
         segment = store.readSegment(id.getSegmentId());
         return Segment.WEIGHER.weigh(null, segment) - base - 4;
     }



Mime
View raw message