Return-Path: X-Original-To: apmail-jackrabbit-oak-commits-archive@minotaur.apache.org Delivered-To: apmail-jackrabbit-oak-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2A2BBF6EC for ; Tue, 7 May 2013 11:17:02 +0000 (UTC) Received: (qmail 71795 invoked by uid 500); 7 May 2013 11:17:02 -0000 Delivered-To: apmail-jackrabbit-oak-commits-archive@jackrabbit.apache.org Received: (qmail 71719 invoked by uid 500); 7 May 2013 11:17:00 -0000 Mailing-List: contact oak-commits-help@jackrabbit.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: oak-dev@jackrabbit.apache.org Delivered-To: mailing list oak-commits@jackrabbit.apache.org Received: (qmail 71661 invoked by uid 99); 7 May 2013 11:16:58 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 May 2013 11:16:58 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 May 2013 11:16:54 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id AB875238889B; Tue, 7 May 2013 11:16:32 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1479858 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak: kernel/ plugins/segment/ Date: Tue, 07 May 2013 11:16:32 -0000 To: oak-commits@jackrabbit.apache.org From: jukka@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130507111632.AB875238889B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jukka Date: Tue May 7 11:16:32 2013 New Revision: 1479858 URL: http://svn.apache.org/r1479858 Log: OAK-775: Implement backward compatible observation Simplify Kernel- and SegmentNodeStore to make it easier to inject local observation support Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/JsopDiff.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/JsopDiff.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/JsopDiff.java?rev=1479858&r1=1479857&r2=1479858&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/JsopDiff.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/JsopDiff.java Tue May 7 11:16:32 2013 @@ -22,12 +22,9 @@ import static org.apache.jackrabbit.oak. import static org.apache.jackrabbit.oak.api.Type.STRINGS; import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; import javax.jcr.PropertyType; -import org.apache.jackrabbit.mk.api.MicroKernel; import org.apache.jackrabbit.mk.json.JsopBuilder; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.PropertyState; @@ -35,50 +32,36 @@ import org.apache.jackrabbit.oak.commons import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStateDiff; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * TODO document */ class JsopDiff implements NodeStateDiff { - private static final Logger log = LoggerFactory.getLogger(JsopDiff.class); - private final MicroKernel kernel; + private final KernelNodeStore store; protected final JsopBuilder jsop; protected final String path; - public JsopDiff(MicroKernel kernel, JsopBuilder jsop, String path) { - this.kernel = kernel; + JsopDiff(KernelNodeStore store, JsopBuilder jsop, String path) { + this.store = store; this.jsop = jsop; this.path = path; } - public JsopDiff(MicroKernel kernel) { - this(kernel, new JsopBuilder(), "/"); + JsopDiff(KernelNodeStore store) { + this(store, new JsopBuilder(), "/"); } public static void diffToJsop( - MicroKernel kernel, NodeState before, NodeState after, + KernelNodeStore store, NodeState before, NodeState after, String path, JsopBuilder jsop) { - after.compareAgainstBaseState(before, new JsopDiff(kernel, jsop, path)); - } - - public static String diffToJsop(NodeState before, NodeState after) { - JsopDiff diff = new JsopDiff(null) { - @Override - protected String writeBlob(Blob blob) { - return "Blob{" + Arrays.toString(blob.sha256()) + '}'; - } - }; - after.compareAgainstBaseState(before, diff); - return diff.toString(); + after.compareAgainstBaseState(before, new JsopDiff(store, jsop, path)); } protected JsopDiff createChildDiff(JsopBuilder jsop, String path) { - return new JsopDiff(kernel, jsop, path); + return new JsopDiff(store, jsop, path); } //-----------------------------------------------------< NodeStateDiff >-- @@ -194,24 +177,18 @@ class JsopDiff implements NodeStateDiff * @param blob blob to persist * @return id of the persisted blob */ - protected String writeBlob(Blob blob) { - String blobId; + private String writeBlob(Blob blob) { + KernelBlob kernelBlob; if (blob instanceof KernelBlob) { - blobId = ((KernelBlob) blob).getBinaryID(); + kernelBlob = (KernelBlob) blob; } else { - InputStream is = blob.getNewStream(); - blobId = kernel.write(is); - close(is); + try { + kernelBlob = store.createBlob(blob.getNewStream()); + } catch (IOException e) { + throw new IllegalStateException(e); + } } - return blobId; + return kernelBlob.getBinaryID(); } - private static void close(InputStream stream) { - try { - stream.close(); - } - catch (IOException e) { - log.warn("Error closing stream", e); - } - } } \ No newline at end of file Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java?rev=1479858&r1=1479857&r2=1479858&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java Tue May 7 11:16:32 2013 @@ -64,7 +64,6 @@ public class KernelNodeStore extends Abs */ private KernelNodeState root; - public KernelNodeStore(final MicroKernel kernel, long cacheSize) { this.kernel = checkNotNull(kernel); this.cache = CacheBuilder.newBuilder() @@ -160,4 +159,12 @@ public class KernelNodeStore extends Abs } } + NodeState commit(String jsop, String baseRevision) { + return getRootState(kernel.commit("", jsop, baseRevision, null)); + } + + NodeState merge(String headRevision) { + return getRootState(kernel.merge(headRevision, null)); + } + } Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java?rev=1479858&r1=1479857&r2=1479858&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java Tue May 7 11:16:32 2013 @@ -43,10 +43,7 @@ class KernelNodeStoreBranch extends Abst private final KernelNodeStore store; /** Root state of the base revision of this branch */ - private NodeState base; - - /** Revision of the base state of this branch*/ - private String baseRevision; + private KernelNodeState base; /** Root state of the transient head revision on top of persisted branch, null if merged. */ private NodeState head; @@ -60,7 +57,6 @@ class KernelNodeStoreBranch extends Abst KernelNodeStoreBranch(KernelNodeStore store, KernelNodeState root) { this.store = store; this.base = root; - this.baseRevision = root.getRevision(); this.head = root; } @@ -151,25 +147,25 @@ class KernelNodeStoreBranch extends Abst head = null; // Mark as merged return base; } else { - MicroKernel kernel = store.getKernel(); - String newRevision; - JsopDiff diff = new JsopDiff(kernel); + NodeState newRoot; + JsopDiff diff = new JsopDiff(store); if (headRevision == null) { // no branch created yet, commit directly head.compareAgainstBaseState(base, diff); - newRevision = kernel.commit("", diff.toString(), baseRevision, null); + newRoot = store.commit(diff.toString(), base.getRevision()); } else { // commit into branch and merge head.compareAgainstBaseState(store.getRootState(headRevision), diff); String jsop = diff.toString(); if (!jsop.isEmpty()) { - headRevision = kernel.commit("", jsop, headRevision, null); + headRevision = store.getKernel().commit( + "", jsop, headRevision, null); } - newRevision = kernel.merge(headRevision, null); + newRoot = store.merge(headRevision); headRevision = null; } head = null; // Mark as merged - return store.getRootState(newRevision); + return newRoot; } } catch (MicroKernelException e) { head = oldRoot; @@ -186,7 +182,6 @@ class KernelNodeStoreBranch extends Abst // Nothing was written to this branch: set new base revision head = root; base = root; - baseRevision = root.getRevision(); } else if (headRevision == null) { // Nothing written to persistent branch yet // perform rebase in memory @@ -194,14 +189,12 @@ class KernelNodeStoreBranch extends Abst getHead().compareAgainstBaseState(getBase(), new ConflictAnnotatingRebaseDiff(builder)); head = builder.getNodeState(); base = root; - baseRevision = root.getRevision(); } else { // perform rebase in kernel persistTransientHead(); headRevision = store.getKernel().rebase(headRevision, root.getRevision()); head = store.getRootState(headRevision); base = root; - baseRevision = root.getRevision(); } } @@ -224,7 +217,7 @@ class KernelNodeStoreBranch extends Abst MicroKernel kernel = store.getKernel(); if (headRevision == null) { // create the branch if this is the first commit - headRevision = kernel.branch(baseRevision); + headRevision = kernel.branch(base.getRevision()); } // persist transient changes first @@ -235,14 +228,13 @@ class KernelNodeStoreBranch extends Abst } private void persistTransientHead() { - NodeState oldBase = base; - String oldBaseRevision = baseRevision; + KernelNodeState oldBase = base; NodeState oldHead = head; String oldHeadRevision = headRevision; boolean success = false; try { MicroKernel kernel = store.getKernel(); - JsopDiff diff = new JsopDiff(store.getKernel()); + JsopDiff diff = new JsopDiff(store); if (headRevision == null) { // no persistent branch yet if (head.equals(base)) { @@ -251,7 +243,7 @@ class KernelNodeStoreBranch extends Abst return; } else { // create branch - headRevision = kernel.branch(baseRevision); + headRevision = kernel.branch(base.getRevision()); head.compareAgainstBaseState(base, diff); } } else { @@ -274,7 +266,6 @@ class KernelNodeStoreBranch extends Abst // revert to old state if unsuccessful if (!success) { base = oldBase; - baseRevision = oldBaseRevision; head = oldHead; headRevision = oldHeadRevision; } Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1479858&r1=1479857&r2=1479858&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java Tue May 7 11:16:32 2013 @@ -22,6 +22,8 @@ import java.io.InputStream; import javax.annotation.Nonnull; import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.spi.commit.EmptyObserver; +import org.apache.jackrabbit.oak.spi.commit.Observer; import org.apache.jackrabbit.oak.spi.state.AbstractNodeStore; import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStoreBranch; @@ -34,24 +36,37 @@ public class SegmentNodeStore extends Ab private final SegmentReader reader; + private final Observer observer; + + private SegmentNodeState root; + public SegmentNodeStore(SegmentStore store, String journal) { this.store = store; this.journal = store.getJournal(journal); this.reader = new SegmentReader(store); + this.observer = EmptyObserver.INSTANCE; + this.root = new SegmentNodeState(store, this.journal.getHead()); } public SegmentNodeStore(SegmentStore store) { this(store, "root"); } + boolean setHead(SegmentNodeState base, SegmentNodeState head) { + return journal.setHead(base.getRecordId(), head.getRecordId()); + } + @Override @Nonnull - public NodeState getRoot() { - return new SegmentNodeState(store, journal.getHead()); + public synchronized SegmentNodeState getRoot() { + NodeState before = root; + root = new SegmentNodeState(store, journal.getHead()); + observer.contentChanged(before, root); + return root; } @Override @Nonnull public NodeStoreBranch branch() { - return new SegmentNodeStoreBranch(store, journal); + return new SegmentNodeStoreBranch(this, new SegmentWriter(store)); } @Override Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java?rev=1479858&r1=1479857&r2=1479858&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java Tue May 7 11:16:32 2013 @@ -32,49 +32,46 @@ class SegmentNodeStoreBranch extends Abs private static final Random RANDOM = new Random(); - private final SegmentStore store; - - private final Journal journal; + private final SegmentNodeStore store; private final SegmentWriter writer; - private RecordId baseId; + private SegmentNodeState base; - private RecordId rootId; + private SegmentNodeState head; - SegmentNodeStoreBranch(SegmentStore store, Journal journal) { + SegmentNodeStoreBranch(SegmentNodeStore store, SegmentWriter writer) { this.store = store; - this.journal = journal; - this.writer = new SegmentWriter(store); - this.baseId = journal.getHead(); - this.rootId = baseId; + this.writer = writer; + this.base = store.getRoot(); + this.head = base; } @Override @Nonnull public NodeState getBase() { - return new SegmentNodeState(store, baseId); + return base; } @Override @Nonnull public synchronized NodeState getHead() { - return new SegmentNodeState(store, rootId); + return head; } @Override public synchronized void setRoot(NodeState newRoot) { - this.rootId = writer.writeNode(newRoot).getRecordId(); + head = writer.writeNode(newRoot); writer.flush(); } @Override public synchronized void rebase() { - RecordId newBaseId = journal.getHead(); - if (!baseId.equals(newBaseId)) { - NodeBuilder builder = - new SegmentNodeState(store, newBaseId).builder(); - getHead().compareAgainstBaseState(getBase(), new ConflictAnnotatingRebaseDiff(builder)); - this.baseId = newBaseId; - this.rootId = writer.writeNode(builder.getNodeState()).getRecordId(); + SegmentNodeState newBase = store.getRoot(); + if (!base.getRecordId().equals(newBase.getRecordId())) { + NodeBuilder builder = newBase.builder(); + head.compareAgainstBaseState( + base, new ConflictAnnotatingRebaseDiff(builder)); + base = newBase; + head = writer.writeNode(builder.getNodeState()); writer.flush(); } } @@ -82,23 +79,23 @@ class SegmentNodeStoreBranch extends Abs @Override @Nonnull public synchronized NodeState merge(CommitHook hook) throws CommitFailedException { - RecordId originalBaseId = baseId; - RecordId originalRootId = rootId; + SegmentNodeState originalBase = base; + SegmentNodeState originalHead = head; long backoff = 1; - while (baseId != rootId) { + while (base != head) { // apply commit hooks on the rebased changes - RecordId headId = writer.writeNode( - hook.processCommit(getBase(), getHead())).getRecordId(); + SegmentNodeState root = writer.writeNode( + hook.processCommit(base, head)); writer.flush(); // use optimistic locking to update the journal - if (journal.setHead(baseId, headId)) { - baseId = headId; - rootId = headId; + if (store.setHead(base, root)) { + base = root; + head = root; } else { // someone else was faster, so restore state and retry later - baseId = originalBaseId; - rootId = originalRootId; + base = originalBase; + head = originalHead; // use exponential backoff to reduce contention if (backoff < 10000) {