Return-Path: X-Original-To: apmail-jackrabbit-oak-commits-archive@minotaur.apache.org Delivered-To: apmail-jackrabbit-oak-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 795B310C8F for ; Tue, 3 Dec 2013 01:03:23 +0000 (UTC) Received: (qmail 67835 invoked by uid 500); 3 Dec 2013 01:03:23 -0000 Delivered-To: apmail-jackrabbit-oak-commits-archive@jackrabbit.apache.org Received: (qmail 67817 invoked by uid 500); 3 Dec 2013 01:03:23 -0000 Mailing-List: contact oak-commits-help@jackrabbit.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: oak-dev@jackrabbit.apache.org Delivered-To: mailing list oak-commits@jackrabbit.apache.org Received: (qmail 67809 invoked by uid 99); 3 Dec 2013 01:03:23 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Dec 2013 01:03:23 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Dec 2013 01:03:21 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A5838238883D; Tue, 3 Dec 2013 01:03:01 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1547253 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment: SegmentNodeStore.java SegmentNodeStoreBranch.java Date: Tue, 03 Dec 2013 01:03:01 -0000 To: oak-commits@jackrabbit.apache.org From: jukka@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131203010301.A5838238883D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jukka Date: Tue Dec 3 01:03:01 2013 New Revision: 1547253 URL: http://svn.apache.org/r1547253 Log: OAK-593: Segment-based MK Inline SegmentNodeStoreBranch into SegmentNodeStore in order to further simplify the code Removed: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1547253&r1=1547252&r2=1547253&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java Tue Dec 3 01:03:01 2013 @@ -19,11 +19,14 @@ package org.apache.jackrabbit.oak.plugin import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.util.Random; +import java.util.UUID; import java.util.concurrent.Semaphore; import javax.annotation.CheckForNull; @@ -94,25 +97,6 @@ public class SegmentNodeStore implements } } - boolean setHead( - SegmentNodeState base, SegmentNodeState head, CommitInfo info) - throws InterruptedException { - commitSemaphore.acquire(); - try { - refreshHead(); - if (journal.setHead(base.getRecordId(), head.getRecordId())) { - this.head = head; - changeDispatcher.contentChanged(head.getChildNode(ROOT), info); - refreshHead(); - return true; - } else { - return false; - } - } finally { - commitSemaphore.release(); - } - } - @Override public Closeable addObserver(Observer observer) { return changeDispatcher.addObserver(observer); @@ -140,9 +124,8 @@ public class SegmentNodeStore implements checkArgument(store.isInstance(base, SegmentRootState.class)); SegmentNodeState root = ((SegmentRootState) base).getRootState(); - SegmentNodeStoreBranch branch = new SegmentNodeStoreBranch( - this, store.getWriter(), root, maximumBackoff); - branch.setRoot(builder.getNodeState()); + SegmentNodeStoreBranch branch = + new SegmentNodeStoreBranch(root, builder.getNodeState()); NodeState merged = branch.merge(commitHook, info); ((SegmentNodeBuilder) builder).reset(merged); return merged; @@ -172,7 +155,6 @@ public class SegmentNodeStore implements ((Record) b).getRecordId()); } - @Override @Nonnull public NodeState reset(@Nonnull NodeBuilder builder) { checkArgument(builder instanceof SegmentRootBuilder); @@ -201,4 +183,172 @@ public class SegmentNodeStore implements new SegmentNodeState(store.getWriter().getDummySegment(), id); return root.getChildNode(ROOT); } + + private static final Random RANDOM = new Random(); + + private class SegmentNodeStoreBranch { + + private SegmentNodeState base; + + private SegmentNodeState head; + + SegmentNodeStoreBranch(SegmentNodeState base, NodeState head) { + this.base = base; + SegmentRootBuilder builder = base.builder(); + builder.setChildNode(ROOT, head); + this.head = builder.getNodeState(); + } + + private boolean setHead( + SegmentNodeState base, SegmentNodeState head, CommitInfo info) + throws InterruptedException { + commitSemaphore.acquire(); + try { + refreshHead(); + if (journal.setHead(base.getRecordId(), head.getRecordId())) { + this.head = head; + changeDispatcher.contentChanged(head.getChildNode(ROOT), info); + refreshHead(); + return true; + } else { + return false; + } + } finally { + commitSemaphore.release(); + } + } + + private void rebase() { + SegmentNodeState newBase = SegmentNodeStore.this.head; + if (!base.getRecordId().equals(newBase.getRecordId())) { + NodeBuilder builder = newBase.builder(); + head.getChildNode(ROOT).compareAgainstBaseState( + base.getChildNode(ROOT), + new ConflictAnnotatingRebaseDiff(builder.child(ROOT))); + base = newBase; + head = store.getWriter().writeNode(builder.getNodeState()); + } + } + + private long optimisticMerge(CommitHook hook, CommitInfo info) + throws CommitFailedException, InterruptedException { + long timeout = 1; + + SegmentNodeState originalBase = base; + SegmentNodeState originalHead = head; + + // use exponential backoff in case of concurrent commits + for (long backoff = 1; backoff < maximumBackoff; backoff *= 2) { + rebase(); // rebase to latest head, a no-op if already there + + long start = System.nanoTime(); + + if (base.hasProperty("token") + && base.getLong("timeout") >= System.currentTimeMillis()) { + // someone else has a pessimistic lock on the journal, + // so we should not try to commit anything + } else { + // apply commit hooks on the rebased changes + NodeBuilder builder = head.builder(); + builder.setChildNode(ROOT, hook.processCommit( + base.getChildNode(ROOT), head.getChildNode(ROOT))); + SegmentNodeState newHead = + store.getWriter().writeNode(builder.getNodeState()); + + // use optimistic locking to update the journal + if (setHead(base, newHead, info)) { + base = newHead; + head = newHead; + return -1; + } + } + + // someone else was faster, so restore state and retry later + base = originalBase; + head = originalHead; + + RANDOM.wait(backoff, RANDOM.nextInt(1000000)); + + long stop = System.nanoTime(); + if (stop - start > timeout) { + timeout = stop - start; + } + } + + return MILLISECONDS.convert(timeout, NANOSECONDS); + } + + private void pessimisticMerge( + CommitHook hook, long timeout, CommitInfo info) + throws CommitFailedException, InterruptedException { + while (true) { + SegmentNodeState before = head; + long now = System.currentTimeMillis(); + if (before.hasProperty("token") + && before.getLong("timeout") >= now) { + // locked by someone else, wait until unlocked or expired + RANDOM.wait( + Math.min(before.getLong("timeout") - now, 1000), + RANDOM.nextInt(1000000)); + } else { + // attempt to acquire the lock + NodeBuilder builder = before.builder(); + builder.setProperty("token", UUID.randomUUID().toString()); + builder.setProperty("timeout", now + timeout); + + SegmentNodeState after = + store.getWriter().writeNode(builder.getNodeState()); + if (setHead(before, after, info)) { + SegmentNodeState originalBase = base; + SegmentNodeState originalHead = head; + + // lock acquired; rebase, apply commit hooks, and unlock + rebase(); + builder.setChildNode(ROOT, hook.processCommit( + base.getChildNode(ROOT), head.getChildNode(ROOT))); + builder.removeProperty("token"); + builder.removeProperty("timeout"); + + // complete the commit + SegmentNodeState newHead = + store.getWriter().writeNode(builder.getNodeState()); + if (setHead(after, newHead, info)) { + base = newHead; + head = newHead; + return; + } else { + // something else happened, perhaps a timeout, so + // undo the previous rebase and try again + base = originalBase; + head = originalHead; + } + } + } + } + } + + @Nonnull + SegmentRootState merge(@Nonnull CommitHook hook, @Nullable CommitInfo info) + throws CommitFailedException { + checkNotNull(hook); + if (base != head) { + synchronized (RANDOM) { + try { + long timeout = optimisticMerge(hook, info); + if (timeout >= 0) { + pessimisticMerge(hook, timeout, info); + } + } catch (InterruptedException e) { + throw new CommitFailedException( + "Segment", 1, "Commit interrupted", e); + } finally { + RANDOM.notifyAll(); + } + } + } + return new SegmentRootState(head); + } + + } + }