jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ju...@apache.org
Subject svn commit: r1547664 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment: SegmentNodeStore.java SegmentRootState.java
Date Wed, 04 Dec 2013 01:02:36 GMT
Author: jukka
Date: Wed Dec  4 01:02:36 2013
New Revision: 1547664

URL: http://svn.apache.org/r1547664
Log:
OAK-593: Segment-based MK

Further simplificatin of the SegmentMK commit/rebase logic

Removed:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentRootState.java
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1547664&r1=1547663&r2=1547664&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
Wed Dec  4 01:02:36 2013
@@ -16,8 +16,10 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
+import static com.google.common.base.Objects.equal;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.System.currentTimeMillis;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -45,8 +47,6 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 
-import com.google.common.base.Objects;
-
 public class SegmentNodeStore implements NodeStore, Observable {
 
     static final String ROOT = "root";
@@ -111,26 +111,25 @@ public class SegmentNodeStore implements
                 commitSemaphore.release();
             }
         }
-        return new SegmentRootState(head);
+        return head.getChildNode(ROOT);
     }
 
     @Override
     public NodeState merge(
             @Nonnull NodeBuilder builder, @Nonnull CommitHook commitHook,
             @Nullable CommitInfo info) throws CommitFailedException {
+        checkArgument(builder instanceof SegmentNodeBuilder);
         checkNotNull(commitHook);
 
-        NodeState base = builder.getBaseState();
-        checkArgument(store.isInstance(base, SegmentRootState.class));
-        SegmentNodeState root = ((SegmentRootState) base).getRootState();
+        SegmentNodeBuilder snb = (SegmentNodeBuilder) builder;
+        checkArgument(store == snb.getBaseState().getStore());
 
         try {
             commitSemaphore.acquire();
             try {
-                Commit commit = new Commit(
-                        root, builder.getNodeState(), commitHook, info);
+                Commit commit = new Commit(snb, commitHook, info);
                 NodeState merged = commit.execute();
-                ((SegmentNodeBuilder) builder).reset(merged);
+                snb.reset(merged);
                 return merged;
             } finally {
                 commitSemaphore.release();
@@ -154,14 +153,6 @@ public class SegmentNodeStore implements
         return builder.getNodeState();
     }
 
-    private boolean fastEquals(Object a, Object b) {
-        return store.isInstance(a, Record.class)
-                && store.isInstance(b, Record.class)
-                && Objects.equal(
-                        ((Record) a).getRecordId(),
-                        ((Record) b).getRecordId());
-    }
-
     @Override @Nonnull
     public NodeState reset(@Nonnull NodeBuilder builder) {
         checkArgument(builder instanceof SegmentNodeBuilder);
@@ -223,30 +214,31 @@ public class SegmentNodeStore implements
 
         private final Random random = new Random();
 
-        private SegmentNodeState base;
+        private SegmentNodeState before;
 
-        private SegmentNodeState head;
+        private SegmentNodeState after;
 
         private final CommitHook hook;
 
         private final CommitInfo info;
 
-        Commit(@Nonnull SegmentNodeState base, @Nonnull NodeState head,
+        Commit(@Nonnull SegmentNodeBuilder builder,
                 @Nonnull CommitHook hook, @Nullable CommitInfo info) {
-            this.base = checkNotNull(base);
-            SegmentNodeBuilder builder = base.builder();
-            builder.setChildNode(ROOT, checkNotNull(head));
-            this.head = builder.getNodeState();
+            checkNotNull(builder);
+            this.before = builder.getBaseState();
+            this.after = builder.getNodeState();
 
             this.hook = checkNotNull(hook);
             this.info = info;
         }
 
-        private boolean setHead(
-                SegmentNodeState base, SegmentNodeState head, CommitInfo info) {
+        private boolean setHead(SegmentNodeBuilder builder) {
+            SegmentNodeState base = builder.getBaseState();
+            SegmentNodeState head = builder.getNodeState();
+
             refreshHead();
             if (journal.setHead(base.getRecordId(), head.getRecordId())) {
-                this.head = head;
+                SegmentNodeStore.this.head = head;
                 changeDispatcher.contentChanged(head.getChildNode(ROOT), info);
                 refreshHead();
                 return true;
@@ -255,55 +247,46 @@ public class SegmentNodeStore implements
             }
         }
 
-        private void rebase() {
-            SegmentNodeState newBase = SegmentNodeStore.this.head;
-            if (!base.getRecordId().equals(newBase.getRecordId())) {
-                NodeBuilder builder = newBase.builder();
-                head.getChildNode(ROOT).compareAgainstBaseState(
-                        base.getChildNode(ROOT),
-                        new ConflictAnnotatingRebaseDiff(builder.child(ROOT)));
-                base = newBase;
-                head = store.getWriter().writeNode(builder.getNodeState());
+        private SegmentNodeBuilder prepare() throws CommitFailedException {
+            SegmentNodeBuilder builder = head.builder();
+            if (fastEquals(before, head.getChildNode(ROOT))) {
+                // use a shortcut when there are no external changes
+                builder.setChildNode(ROOT, hook.processCommit(before, after));
+            } else {
+                // there were some external changes, so do the full rebase
+                ConflictAnnotatingRebaseDiff diff =
+                        new ConflictAnnotatingRebaseDiff(builder.child(ROOT));
+                after.compareAgainstBaseState(before, diff);
+                // apply commit hooks on the rebased changes
+                builder.setChildNode(ROOT, hook.processCommit(
+                        builder.getBaseState().getChildNode(ROOT),
+                        builder.getNodeState().getChildNode(ROOT)));
             }
+            return builder;
         }
 
-        private long optimisticMerge(CommitHook hook, CommitInfo info)
+        private long optimisticMerge()
                 throws CommitFailedException, InterruptedException {
             long timeout = 1;
 
-            SegmentNodeState originalBase = base;
-            SegmentNodeState originalHead = head;
-
             // use exponential backoff in case of concurrent commits
             for (long backoff = 1; backoff < maximumBackoff; backoff *= 2) {
-                rebase(); // rebase to latest head, a no-op if already there
-
                 long start = System.nanoTime();
 
-                if (base.hasProperty("token")
-                        && base.getLong("timeout") >= System.currentTimeMillis())
{
+                refreshHead();
+                if (head.hasProperty("token")
+                        && head.getLong("timeout") >= currentTimeMillis()) {
                     // someone else has a pessimistic lock on the journal,
-                    // so we should not try to commit anything
+                    // so we should not try to commit anything yet
                 } else {
-                    // apply commit hooks on the rebased changes
-                    NodeBuilder builder = head.builder();
-                    builder.setChildNode(ROOT, hook.processCommit(
-                            base.getChildNode(ROOT), head.getChildNode(ROOT)));
-                    SegmentNodeState newHead =
-                            store.getWriter().writeNode(builder.getNodeState());
-
+                    SegmentNodeBuilder builder = prepare();
                     // use optimistic locking to update the journal
-                    if (setHead(base, newHead, info)) {
-                        base = newHead;
-                        head = newHead;
+                    if (setHead(builder)) {
                         return -1;
                     }
                 }
 
-                // someone else was faster, so restore state and retry later
-                base = originalBase;
-                head = originalHead;
-
+                // someone else was faster, so wait a while and retry later
                 Thread.sleep(backoff, random.nextInt(1000000));
 
                 long stop = System.nanoTime();
@@ -315,49 +298,31 @@ public class SegmentNodeStore implements
             return MILLISECONDS.convert(timeout, NANOSECONDS);
         }
 
-        private void pessimisticMerge(
-                CommitHook hook, long timeout, CommitInfo info)
+        private void pessimisticMerge(long timeout)
                 throws CommitFailedException, InterruptedException {
             while (true) {
-                SegmentNodeState before = head;
-                long now = System.currentTimeMillis();
-                if (before.hasProperty("token")
-                        && before.getLong("timeout") >= now) {
+                long now = currentTimeMillis();
+                if (head.hasProperty("token")
+                        && head.getLong("timeout") >= now) {
                     // locked by someone else, wait until unlocked or expired
                     Thread.sleep(
-                            Math.min(before.getLong("timeout") - now, 1000),
+                            Math.min(head.getLong("timeout") - now, 1000),
                             random.nextInt(1000000));
                 } else {
                     // attempt to acquire the lock
-                    NodeBuilder builder = before.builder();
+                    SegmentNodeBuilder builder = head.builder();
                     builder.setProperty("token", UUID.randomUUID().toString());
                     builder.setProperty("timeout", now + timeout);
 
-                    SegmentNodeState after =
-                            store.getWriter().writeNode(builder.getNodeState());
-                    if (setHead(before, after, info)) {
-                        SegmentNodeState originalBase = base;
-                        SegmentNodeState originalHead = head;
-
-                        // lock acquired; rebase, apply commit hooks, and unlock
-                        rebase();
-                        builder.setChildNode(ROOT, hook.processCommit(
-                                base.getChildNode(ROOT), head.getChildNode(ROOT)));
+                    if (setHead(builder)) {
+                         // lock acquired; rebase, apply commit hooks, and unlock
+                        builder = prepare();
                         builder.removeProperty("token");
                         builder.removeProperty("timeout");
 
                         // complete the commit
-                        SegmentNodeState newHead =
-                                store.getWriter().writeNode(builder.getNodeState());
-                        if (setHead(after, newHead, info)) {
-                            base = newHead;
-                            head = newHead;
+                        if (setHead(builder)) {
                             return;
-                        } else {
-                            // something else happened, perhaps a timeout, so
-                            // undo the previous rebase and try again
-                            base = originalBase;
-                            head = originalHead;
                         }
                     }
                 }
@@ -365,17 +330,30 @@ public class SegmentNodeStore implements
         }
 
         @Nonnull
-        SegmentRootState execute()
+        NodeState execute()
                 throws CommitFailedException, InterruptedException {
-            if (base != head) {
-                long timeout = optimisticMerge(hook, info);
+            // only do the merge if there are some changes to commit
+            if (!fastEquals(before, after)) {
+                long timeout = optimisticMerge();
                 if (timeout >= 0) {
-                    pessimisticMerge(hook, timeout, info);
+                    pessimisticMerge(timeout);
                 }
             }
-            return new SegmentRootState(head);
+            return head.getChildNode(ROOT);
         }
 
     }
 
+    private static boolean fastEquals(Object a, Object b) {
+        return a instanceof Record && fastEquals((Record) a, b);
+    }
+
+    private static boolean fastEquals(Record a, Object b) {
+        return b instanceof Record && fastEquals(a, (Record) b);
+    }
+
+    private static boolean fastEquals(Record a, Record b) {
+        return equal(a.getRecordId(), b.getRecordId());
+    }
+
 }



Mime
View raw message