jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ju...@apache.org
Subject svn commit: r1548354 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment: SegmentNodeStore.java file/FileStore.java
Date Fri, 06 Dec 2013 01:22:27 GMT
Author: jukka
Date: Fri Dec  6 01:22:26 2013
New Revision: 1548354

URL: http://svn.apache.org/r1548354
Log:
OAK-593: Segment-based MK

Use AtomicReferences to track the latest head state

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1548354&r1=1548353&r2=1548354&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
Fri Dec  6 01:22:26 2013
@@ -30,6 +30,7 @@ import java.io.InputStream;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
@@ -61,7 +62,7 @@ public class SegmentNodeStore implements
     /**
      * Local copy of the head of the journal associated with this store.
      */
-    private volatile SegmentNodeState head;
+    private final AtomicReference<SegmentNodeState> head;
 
     /**
      * Semaphore that controls access to the {@link #head} variable.
@@ -75,8 +76,8 @@ public class SegmentNodeStore implements
     public SegmentNodeStore(SegmentStore store, String journal) {
         this.store = store;
         this.journal = store.getJournal(journal);
-        this.head = new SegmentNodeState(
-                store.getWriter().getDummySegment(), this.journal.getHead());
+        this.head = new AtomicReference<SegmentNodeState>(new SegmentNodeState(
+                store.getWriter().getDummySegment(), this.journal.getHead()));
         this.changeDispatcher = new ChangeDispatcher(getRoot());
     }
 
@@ -93,15 +94,16 @@ public class SegmentNodeStore implements
     }
 
     /**
-     * Refreshes the head state. Does nothing if a concurrent local commit is
-     * in progress, as that commit will automatically refresh the head state.
+     * Refreshes the head state. Should only be called while holding a
+     * permit from the {@link #commitSemaphore}.
      */
     private void refreshHead() {
         RecordId id = journal.getHead();
-        if (!id.equals(head.getRecordId())) {
-            head = new SegmentNodeState(
+        if (!id.equals(head.get().getRecordId())) {
+            SegmentNodeState state = new SegmentNodeState(
                     store.getWriter().getDummySegment(), id);
-            changeDispatcher.contentChanged(head.getChildNode(ROOT), null);
+            head.set(state);
+            changeDispatcher.contentChanged(state.getChildNode(ROOT), null);
         }
     }
 
@@ -119,7 +121,7 @@ public class SegmentNodeStore implements
                 commitSemaphore.release();
             }
         }
-        return head.getChildNode(ROOT);
+        return head.get().getChildNode(ROOT);
     }
 
     @Override
@@ -196,14 +198,14 @@ public class SegmentNodeStore implements
                 try {
                     refreshHead();
 
-                    SegmentNodeState ns = head;
-                    RecordId ri = head.getRecordId();
+                    SegmentNodeState state = head.get();
+                    RecordId ri = state.getRecordId();
 
-                    SegmentNodeBuilder builder = ns.builder();
+                    SegmentNodeBuilder builder = state.builder();
                     NodeBuilder cp = builder.child(name);
                     cp.setProperty("timestamp", System.currentTimeMillis()
                             + lifetime);
-                    cp.setChildNode(ROOT, ns.getChildNode(ROOT));
+                    cp.setChildNode(ROOT, state.getChildNode(ROOT));
 
                     if (journal.setHead(ri, builder.getNodeState()
                             .getRecordId())) {
@@ -222,7 +224,7 @@ public class SegmentNodeStore implements
 
     @Override @CheckForNull
     public NodeState retrieve(@Nonnull String checkpoint) {
-        NodeState cp = head.getChildNode(checkpoint).getChildNode(ROOT);
+        NodeState cp = head.get().getChildNode(checkpoint).getChildNode(ROOT);
         if (cp.exists()) {
             return cp;
         }
@@ -252,13 +254,13 @@ public class SegmentNodeStore implements
         }
 
         private boolean setHead(SegmentNodeBuilder builder) {
-            SegmentNodeState base = builder.getBaseState();
-            SegmentNodeState head = builder.getNodeState();
+            SegmentNodeState before = builder.getBaseState();
+            SegmentNodeState after = builder.getNodeState();
 
             refreshHead();
-            if (journal.setHead(base.getRecordId(), head.getRecordId())) {
-                SegmentNodeStore.this.head = head;
-                changeDispatcher.contentChanged(head.getChildNode(ROOT), info);
+            if (journal.setHead(before.getRecordId(), after.getRecordId())) {
+                head.set(after);
+                changeDispatcher.contentChanged(after.getChildNode(ROOT), info);
                 refreshHead();
                 return true;
             } else {
@@ -267,8 +269,9 @@ public class SegmentNodeStore implements
         }
 
         private SegmentNodeBuilder prepare() throws CommitFailedException {
-            SegmentNodeBuilder builder = head.builder();
-            if (fastEquals(before, head.getChildNode(ROOT))) {
+            SegmentNodeState state = head.get();
+            SegmentNodeBuilder builder = state.builder();
+            if (fastEquals(before, state.getChildNode(ROOT))) {
                 // use a shortcut when there are no external changes
                 builder.setChildNode(ROOT, hook.processCommit(before, after));
             } else {
@@ -293,8 +296,9 @@ public class SegmentNodeStore implements
                 long start = System.nanoTime();
 
                 refreshHead();
-                if (head.hasProperty("token")
-                        && head.getLong("timeout") >= currentTimeMillis()) {
+                SegmentNodeState state = head.get();
+                if (state.hasProperty("token")
+                        && state.getLong("timeout") >= currentTimeMillis()) {
                     // someone else has a pessimistic lock on the journal,
                     // so we should not try to commit anything yet
                 } else {
@@ -321,15 +325,16 @@ public class SegmentNodeStore implements
                 throws CommitFailedException, InterruptedException {
             while (true) {
                 long now = currentTimeMillis();
-                if (head.hasProperty("token")
-                        && head.getLong("timeout") >= now) {
+                SegmentNodeState state = head.get();
+                if (state.hasProperty("token")
+                        && state.getLong("timeout") >= now) {
                     // locked by someone else, wait until unlocked or expired
                     Thread.sleep(
-                            Math.min(head.getLong("timeout") - now, 1000),
+                            Math.min(state.getLong("timeout") - now, 1000),
                             random.nextInt(1000000));
                 } else {
                     // attempt to acquire the lock
-                    SegmentNodeBuilder builder = head.builder();
+                    SegmentNodeBuilder builder = state.builder();
                     builder.setProperty("token", UUID.randomUUID().toString());
                     builder.setProperty("timeout", now + timeout);
 
@@ -358,7 +363,7 @@ public class SegmentNodeStore implements
                     pessimisticMerge(timeout);
                 }
             }
-            return head.getChildNode(ROOT);
+            return head.get().getChildNode(ROOT);
         }
 
     }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1548354&r1=1548353&r2=1548354&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Fri Dec  6 01:22:26 2013
@@ -33,6 +33,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.Nonnull;
 
@@ -40,7 +41,6 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.segment.Journal;
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.slf4j.Logger;
@@ -68,9 +68,16 @@ public class FileStore extends AbstractS
 
     private final RandomAccessFile journalFile;
 
-    private volatile RecordId head;
+    /**
+     * The latest head of the root journal.
+     */
+    private final AtomicReference<RecordId> head;
 
-    private volatile boolean updated = false;
+    /**
+     * The persisted head of the root journal, used to determine whether the
+     * latest {@link #head} value should be written to the disk.
+     */
+    private RecordId persistedHead = null;
 
     /**
      * The background flush thread. Automatically flushes the TarMK state
@@ -94,7 +101,8 @@ public class FileStore extends AbstractS
         this(directory, EMPTY_NODE, maxFileSizeMB, cacheSizeMB, memoryMapping);
     }
 
-    public FileStore(File directory, NodeState initial, int maxFileSizeMB,
+    public FileStore(
+            final File directory, NodeState initial, int maxFileSizeMB,
             int cacheSizeMB, boolean memoryMapping) throws IOException {
         super(cacheSizeMB);
         checkNotNull(directory).mkdirs();
@@ -122,25 +130,24 @@ public class FileStore extends AbstractS
             }
         }
 
-        head = null;
         journalFile = new RandomAccessFile(
                 new File(directory, JOURNAL_FILE_NAME), "rw");
         String line = journalFile.readLine();
         while (line != null) {
             int space = line.indexOf(' ');
             if (space != -1) {
-                head = RecordId.fromString(line.substring(0, space));
+                persistedHead = RecordId.fromString(line.substring(0, space));
             }
             line = journalFile.readLine();
         }
 
-        if (head == null) {
+        if (persistedHead != null) {
+            head = new AtomicReference<RecordId>(persistedHead);
+        } else {
             NodeBuilder builder = EMPTY_NODE.builder();
             builder.setChildNode("root", initial);
-            SegmentNodeState root =
-                    getWriter().writeNode(builder.getNodeState());
-            head = root.getRecordId();
-            updated = true;
+            head = new AtomicReference<RecordId>(
+                    getWriter().writeNode(builder.getNodeState()).getRecordId());
         }
 
         this.flushThread = new Thread(new Runnable() {
@@ -153,7 +160,7 @@ public class FileStore extends AbstractS
                             flush();
                         } catch (IOException e) {
                             log.warn("Failed to flush the TarMK at" +
-                                    FileStore.this.directory, e);
+                                    directory, e);
                         }
                         timeToClose.await(5, SECONDS);
                     }
@@ -169,7 +176,8 @@ public class FileStore extends AbstractS
     }
 
     public synchronized void flush() throws IOException {
-        if (updated) {
+        RecordId id = head.get();
+        if (!id.equals(persistedHead)) {
             getWriter().flush();
             for (TarFile file : bulkFiles) {
                 file.flush();
@@ -177,8 +185,9 @@ public class FileStore extends AbstractS
             for (TarFile file : dataFiles) {
                 file.flush();
             }
-            journalFile.writeBytes(head + " root\n");
+            journalFile.writeBytes(id + " root\n");
             journalFile.getChannel().force(false);
+            persistedHead = id;
         }
     }
 
@@ -235,19 +244,12 @@ public class FileStore extends AbstractS
         return new Journal() {
             @Override
             public RecordId getHead() {
-                return head;
+                return head.get();
             }
             @Override
-            public boolean setHead(RecordId base, RecordId head) {
-                synchronized (FileStore.this) {
-                    if (base.equals(FileStore.this.head)) {
-                        updated = !head.equals(FileStore.this.head);
-                        FileStore.this.head = head;
-                        return true;
-                    } else {
-                        return false;
-                    }
-                }
+            public boolean setHead(RecordId before, RecordId after) {
+                RecordId id = head.get();
+                return id.equals(before) && head.compareAndSet(id, after);
             }
             @Override
             public void merge() {



Mime
View raw message