jackrabbit-oak-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Mueller <muel...@adobe.com>
Subject Re: svn commit: r1532782 [1/2] - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/core/ oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/ oak-core/src/main...
Date Thu, 17 Oct 2013 08:58:00 GMT
Hi,

When using multiple cluster nodes (using MongoMK), multiple cluster nodes
concurrently add or modify the node "/:commit-info". This results in a
conflict relatively quickly:

  The node 1:/:commit-info was changed in revision
  r141c598ec6d-0-4, which was applied after the base revision
  r141c598e754-0-3, before
  r141c598eacd-0-3


I suggest to change the mechanism. At least each cluster node should use a
different node name.

Regards,
Thomas


On 10/16/13 4:43 PM, "mduerig@apache.org" <mduerig@apache.org> wrote:

>Author: mduerig
>Date: Wed Oct 16 14:43:01 2013
>New Revision: 1532782
>
>URL: http://svn.apache.org/r1532782
>Log:
>OAK-1055: Occasional test failure in ObservationTest.observation
>OAK-1060: Periodically poll for external events
>- Remove PostCommitHook argument from NodeStore.merge and make
>ChangeDispatcher part of NodeStore implementations that implement
>Observable (i.e. support observation).
>- Use :commit-info on the root node to pass commit information across
>
>Added:
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jakrabbit/oak/plug
>ins/observation/CommitInfoEditorProvider.java
>Removed:
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/
>commit/PostCommitHook.java
>Modified:
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core
>/AbstractRoot.java
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core
>/AbstractTree.java
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core
>/ContentRepositoryImpl.java
>
>jackrabit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core
>/ContentSessionImpl.java
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core
>/SystemRoot.java
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeStore.java
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeStoreBranch.java
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kern
>el/KernelRootBuilder.java
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/index/AsyncIndexUpdate.java
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/memory/MemoryNodeStore.java
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/mongomk/MongoNodeStore.java
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/observation/ChangeDispatcher.java
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/observation/ChangeProcessor.java
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/observation/Observable.java
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/segment/SegmentNodeStore.java
>
>jackrabbit/oak/trunk/oak-core/src/mainjava/org/apache/jackrabbit/oak/plug
>ins/segment/SegmentNodeStoreBranch.java
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/segment/SegmentNodeStoreService.java
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/
>lifecycle/OakInitializer.java
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/
>state/NodeStore.java
>
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/
>state/NodeStoreBranch.java
>
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeBuilderTest.java
>
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeStateTest.java
>
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeStoreCacheTest.java
>
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/kern
>el/LargeKernelNodeStateTest.java
>
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/kern
>el/NodeStoreTest.java
>
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plug
>ins/index/AsyncIndexUpdateTest.java
>
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plug
>ins/index/nodetype/NodeTypeIndexTest.java
>
>jackrabit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plug
>ins/segment/JournalTst.java
>
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plug
>ins/segment/MergeTest.java
>
>jackrabbit/oak/trunk/oak-upgrade/src/main/java/org/apache/jackrabbit/oak/u
>pgrade/RepositoryUpgrade.java
>
>Modified:
>jackrabit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core
>/AbstractRoot.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/core/AbstractRoot.java?rev=1532782&r1=1532781&r2=
>1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/sc/main/java/org/apache/jackrabbit/oak/core
>/AbstractRoot.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jacrabbit/oak/core
>/AbstractRoot.java Wed Oct 16 14:43:01 2013
>@@ -47,7 +47,6 @@ import org.apache.jackrabbit.oak.spi.com
> import org.apace.jackrabbit.oak.spi.commit.CompositeHook;
> import org.apache.jackrabbit.oak.spi.commit.EditorHook;
> import or.apache.jackrabbit.oak.spi.commit.EmptyHook;
>-import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
> import org.apache.jackrabbit.oak.spi.commit.PostValidationHook;
> import org.apache.jackrabit.oak.spi.commit.ValidatorProvider;
> import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
>@@ -70,8 +69,6 @@ public abstract class AbstractRoot imple
>
>     private final CommitHook hook;
>
>-    private final PostCommitHook postHook;
>-
>     private final String workspaceName;
>
>     private final Subject subject;
>@@ -+126,12 @@ public abstract class AbstractRoot imple
>      /
>     protected AbstractRoot(NodeStore store,
>             CommitHook hook,
>-            PostCommitHook postHook,
>             String workspaceName,
>             Subject subject,
>             SecurityProvider securityProvider,
             QueryIndexProvider indexProvider) {
>         this.store = checkNotNull(store);
>         this.hok = checkNotNull(hook);
>-        this.postHook = postHook;
>         this.workspaceName = checkNotNull(workspaceName);
>         this.subject = checkNotNull(subject);
>         this.securityProvider = checkNotNull(securityProvider);
>@@ -245,7 +240,7 @@ public abstract class AbstractRoot imple
>     @Override
>     public void commit(final CommitHook... hooks) throws
>CommitFailedException {
>         checkLive();
>-        base = store.merge(builder, getCommitHook(hooks), postHook);
>+        base = store.merge(builder, getCommitHook(hooks));
>         secureBuilder.baseChanged();
>         modCount = 0;
>         if (permissionProvider.hasValue()) {
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core
>/AbstractTree.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/core/AbstractTree.java?rev=1532782&r1=1532781&r2=
>1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core
>/AbstractTree.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core
>/AbstractTree.java Wed Oct 16 14:43:01 2013
>@@ -19,7 +19,18 @@
>
> package org.apache.jackrabbit.oak.core;
>
>+import static com.google.common.base.Preconditions.checkNotNull;
>+import static com.google.common.collect.Iterables.filter;
>+import static com.google.common.collect.Iterables.size;
>+import static com.google.common.collect.Iterables.transform;
>+import static org.apache.jackrabbit.oak.api.Tree.Status.EXISTING;
>+import static org.apache.jackrabbit.oak.api.Tree.Status.MODIFIED;
>+import static org.apache.jackrabbit.oak.api.Tree.Status.NEW;
>+import static org.apache.jackrabbit.oak.api.Type.STRING;
>+import static
>org.apache.jackrabbit.oak.spi.state.NodeStateUtils.isHidden;
>+
> import java.util.Iterator;
>+
> import javax.annotation.Nonnull;
>
> import com.google.common.base.Function;
>@@ -28,19 +39,10 @@ import org.apache.jackrabbit.mk.api.Micr
> import org.apache.jackrabbit.oak.api.PropertyState;
> import org.apache.jackrabbit.oak.api.Tree;
> import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
>+import
>org.apache.jackrabbit.oak.plugins.observation.CommitInfoEditorProvider;
> import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
> import org.apache.jackrabbit.oak.spi.state.NodeState;
>
>-import static com.google.common.base.Preconditions.checkNotNull;
>-import static com.google.common.collect.Iterables.filter;
>-import static com.google.common.collect.Iterables.size;
>-import static com.google.common.collect.Iterables.transform;
>-import static org.apache.jackrabbit.oak.api.Tree.Status.EXISTING;
>-import static org.apache.jackrabbit.oak.api.Tree.Status.MODIFIED;
>-import static org.apache.jackrabbit.oak.api.Tree.Status.NEW;
>-impot static org.apache.jackrabbit.oak.api.Type.STRING;
>-import static
>org.apache.jackrabbit.oak.spi.state.NodeStateUtils.isHidden;
>-
> /**
>  * {@code AbstractTree} provides default implementations for most
>  * read methods of {@code Tree}. Furthermore it handles the
>@@ -55,7 +57, @@ public abstract class AbstractTree imple
>     public static final String OAK_CHILD_ORDER = ":childOrder";
>
>     // TODO: make this configurable
>-    private static final String[] INTERNAL_NODE_NAMES=
>{IndexConstants.INDEX_CONTENT_NODE_NAME, MicroKernel.CONFLICT_NAME};
>+    private static final String[] INTERNAL_NODE_NAMES = {
>+            IndexConstants.INDEX_CONTENT_NODE_NAME,
>+            MicroKernel.CONFLICT_NAME,
>CommiInfoEditorProvider.COMMIT_INFO};
>
>     /**
>      * Name of this tree
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core
>/ContentRepositoryImpl.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/core/ContentRepositoryImpl.java?rev=1532782&r1=15
>32781&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core
>/ContentRepositoryImpl.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core
>/ContentRepositoryImpl.java Wed Oct 16 14:43:01 2013
>@@ -26,7 +26,6 @@ import javax.security.auth.login.LoginEx
>
> import org.apache.jackrabbit.oak.api.ContentRepository;
> import org.apache.jackrabbit.oak.api.ContentSession;
>-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
> import org.apache.jackrabbit.oak.spi.commit.CommitHook;
> import org.apache.jackrabbit.oak.spi.query.CompositeQueryIndexProvider;
> import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
>@@ -47,7 +46,6 @@ public class ContentRepositoryImpl imple
>     private final String defaultWorkspaceName;
>     private final SecurityProvider securityProvider;
>     private final QueryIndexProvider indexProvider;
>-    private final ChangeDispatcher changeDispatcher;
>
>     /**
>      * Creates an content repository instance based on the given, already
>@@ -69,7 +67,6 @@ public class ContentRepositoryImpl imple
>         this.defaultWorkspaceName = checkNotNull(defaultWorkspaceName);
>         this.securityProvider = checkNotNull(securityProvider);
>         this.indexProvider = indexProvider != null ? indexProvider : new
>CompositeQueryIndexProvider();
>-        this.changeDispacher = new ChangeDispatcher(nodeStore);
>     }
>
>     @Nonnull
>@@ -89,8 +86,8 @@ public class ContentRepositoryImpl imple
>         LoginContext loginContext =
>lcProvider.getLoginContext(credentials, workspaceName);
>         loginContext.login();
>
>-        return new ContentSessionImpl(loginContext, securityProvider,
>workspaceName,
>-                nodeStore, commitHook, changeDispatcher, indexProvider);
>+        reurn new ContentSessionImpl(loginContext, securityProvider,
>workspaceName, nodeStore,
>+                commitHook, indexProvider);
>     }
>
>     public NodeStore getNodeStore() {
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core
>/ContentSessionImpl.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/core/ContentSessionImpl.java?rev=1532782&r1=15327
>81&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core
>/ContentSessionImpl.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core
>/ContentSessionImpl.java Wed Oct 16 14:43:01 2013
>@@ -16,6 +16,7 @@
>  */
> package org.apache.jackrabbit.oak.core;
>
>+import static com.google.common.base.Preconditions.checkArgument;
> import static com.google.common.base.Preconditions.checkState;
>
> import java.io.IOException;
>@@ -28,10 +29,12 @@ import javax.security.auth.login.LoginEx
> import org.apache.jackrabbit.oak.api.AuthInfo;
> import org.apache.jackrabbit.oak.api.ContentSession;
> import org.apache.jackrabbit.oak.api.Root;
>-import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
> import
>org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
>+import
>org.apache.jackrabbit.oak.plugins.observation.CommitInfoEditorProvider;
> import org.apache.jackrabbit.oak.plugins.observation.Observable;
> import org.apache.jackrabbit.oak.spi.commit.CommitHook;
>+import org.apache.jackrabbit.oak.spi.commit.CompositeHook;
>+import org.apache.jackrabbit.ok.spi.commit.EditorHook;
> import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
> import org.apache.jackrabbit.oak.spi.security.SecurityProvider;> import
>org.apache.jackrabbit.oak.spi.security.authentication.LoginContext;
>@@ -56,7 +59,6 @@ class ContentSessionImpl implements Cont
>    private final String workspaceName;
>     private final NodeStore store;
>     private final CommitHook hook;
>-    private final ChangeDispatcher changeDispatcher;
>     private final QueryIndexProvider indexProvider;
>     private final String sessionName;
>
>@@ -67,14 +69,13 @@ class ContentSessionImpl implements Cont
>                               @Nonnull String workspaceName,
>                               @Nonnull NodeStore store,
>                               @Nonnull CommitHook hook,
>-                              @Nonnull ChangeDispatcher changeDispatcher,
>                               @Nonnull QueryIndexProvider indexProvider)
>{
>+        checkArgument(store instanceof Observable);
>         this.loginContext = loginContext;
>         this.securityProvider = securityProvider;
>         this.workspaceName = workspaceName;
>         this.store = store;
>         this.hook = hook;
>-        this.changeDispatcher = changeDispatcher;
>         this.indexProvider = indexProvider;
>         this.sessionName = "session-" +
>SESSION_COUNTER.incrementAndGet();
>     }
>@@ -105,8 +106,12 @@ class ContentSessionImpl implements Cont
>     @Override
>     public Root getLatestRoot() {
>         checkLive();
>-        return new AbstractRoot(store, hook,
>changeDispatcher.newHook(ContentSessionImpl.this), workspaceName,
>-                loginContext.getSubject(), securityrovider,
>indexProvider) {
>+
>+        EditorHook commitInfoEditor = new EditorHook(
>+                new CommitInfoEditorProvider(sessionName,
>getAuthInfo().getUserID()));
>+
>+        return new AbstractRoot(store, new CompositeHook(hook,
>commitInfoEditor),
>+                workspaceNam, loginContext.getSubject(),
>securityProvider, indexProvider) {
>             @Overrie
>             protected void checkLive() {
>                 ContentSessionImpl.this.checkLive();
>@@ -121,7 +126,7 @@ class ContentSessionImpl implements Cont
>
>     @Override
>     public Listener newListener() {
>-        return changeDispatcher.newListener();
>+        return ((Observable) store).newListener();
>     }
>
>     //-----------------------------------------------------------<
>Closable >---
>@@ -139,4 +144,5 @@ class ContentSessionImpl implements Cont
>     public String toString() {
>         return sessionName;
>     }
>+
> }
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core
>/SystemRoot.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/core/SystemRoot.java?rev=1532782&r1=1532781&r2=15
>32782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core
>/SystemRoot.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core
>/SystemRoot.java Wed Oct 16 14:43:01 2013
>@@ -23,7 +23,6 @@ import org.apache.jackrabbit.oak.api.Roo
> import org.apache.jackrabbit.oak.security.authentication.SystemSubject;
> import org.apache.jackrabbit.oak.spi.commit.CommitHook;
> import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
>-import org.aache.jackrabbit.oak.spi.commit.PostCommitHook;
> import org.apache.jackrabbit.oak.spi.query.CompositeQueryIndexProvider;
> import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
> import org.apache.jackrabbit.oak.spi.security.OpenSecurityProvider;
>@@ -41,8 +40,7 @@ public class SystemRoot extends Abstract
>     publi SystemRoot(final NodeStore store, final CommitHook hook,
>final String workspaceName,
>             final SecurityProvider securityProvider, final
>QueryIndexProvider indexProvider) {
>
>-        super(store, hook, PostCommitHook.EMPTY, workspaceName,
>-                SystemSubject.INSTANCE, securityProvider, indexProvider);
>+        super(store, hook, workspaceName, SystemSubject.INSTANCE,
>securityProvider, indexProvider);
>
>         contentSession = new ContentSession() {
>             private final AuthInfoImpl authInfo = new AuthInfoImpl(
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeStore.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/kernel/KernelNodeStore.java?rev=1532782&r1=153278
>1&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeStore.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeStore.java Wed Oct 16 14:43:01 2013
>@@ -38,10 +38,12 @@ import org.apache.jackrabbit.mk.api.Micr
> import org.apache.jackrabbit.oak.api.CommitFailedException;
> import org.apache.jackrabbit.oak.cache.CacheLIRS;
> import org.apache.jackrabbit.oak.cache.CacheStats;
>+import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
>+import
>org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
>+import org.apache.jackrabbit.oak.plugins.observation.Observable;
> import org.apche.jackrabbit.oak.spi.commit.CommitHook;
> import org.apache.jackrabbit.oak.spi.commit.EmptyObserver;
> import org.apache.jackrabbit.oak.spi.commit.Observer;
>-import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
> import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
> import org.apache.jackrabbit.oak.spi.state.NodeState;
> import org.apache.jackrabbit.oak.spi.state.NodeStore;
>@@ -50,7 +52,7 @@ import org.apache.jackrabbit.oak.spi.sta
> /**
>  * {@code NodeStore} implementations against {@link MicroKernel}.
>  */
>-public class KernelNodeStore implements NodeStore {
>+public class KernelNodeStore implements NodeStore, Observable {
>
>     private static final long DEFAULT_CACHE_SIZE = 16 * 1024 * 1024;
>
>@@ -74,6 +76,8 @@ public class KernelNodeStore implements
>      */
>     private final Lock mergeLock = new ReentrantLock();
>
>+    private final ChangeDispatcher changeDispatcher;
>+
>     /**
>      * State of the current root node.
>      */
>@@ -120,6 +124,7 @@ public class KernelNodeStore implements
>         } catch (Exception e) {
>             throw new RuntimeException(e);
>         }
>+        changeDispatcher = new ChangeDispatcher(this);
>     }
>
>     public KernelNodeStore(MicroKernel kernel) {
>@@ -143,6 +148,13 @@ public class KernelNodeStore implements
>         return getRoot().toString();
>     }
>
>+    //------------------------------------------------------------<
>Observable >---
>+
>+    @Override
>+    public Listener newListener() {
>+        return changeDispatcher.newListener();
>+    }
>+
>     //----------------------------------------------------------<
>NodeStore >---
>
>     @Override
>@@ -157,15 +169,15 @@ public class KernelNodeStore implements
>     }
>
>     /**
>-     * This implementation delegates to {@link
>KernelRootBuilder#merge(CommitHook, PostCommitHook)}
>+     * This implementation delegates to {@link
>KernelRootBuilder#merge(CommitHook)}
>      * if {@code builder} is a {@link KernelNodeBuilder} instance.
>Otherwise it throws
>      * an {@code IllegalArgumentException}.
>      */
>     @Override
>-    public NodeState merge(@Nonnull NodeBuilder builder, @Nonnull
>CommitHook commitHook,
>-            PostCommitHook committed) throws CommitFailedException {
>+    public NodeState merge(@Nonnull NodeBuilder builder, @Nonnull
>CommitHook commitHook)
>+            throws CommitFailedException {
>         checkArgument(builder instanceof KernelRootBuilder);
>-        return ((KernelRootBuilder) builder).merge(commitHook,
>committed);
>+        return ((KernelRootBuilder) builder).merge(commitHook);
>     }
>
>     /**
>@@ -265,4 +277,16 @@ public class KernelNodeStore implements
>         return getRootState(kernel.merge(branchHead.getRevision(),
>null));
>     }
>
>+    void beforeCommit(NodeState root) {
>+        changeDispatcher.beforeCommit(root);
>+    }
>+
>+    void localCommit(NodeState root) {
>+        changeDispatcher.localCommit(root);
>+    }
>+
>+    void afterCommit(NodeState root) {
>+        changeDispatcher.afterCommit(root);
>+    }
>+
> }
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeStoreBranch.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabit/oak/kernel/KernelNodeStoreBranch.java?rev=1532782&r1=
>1532781&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeStoreBranch.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeStoreBranch.java Wed Oct 16 14:43:01 2013
>@@ -30,7 +30,6 @@ import org.apache.jackrabbit.mk.api.Micr
> import org.apache.jackrabbit.oak.api.CommitFailedException;
> import org.apache.jackrabbit.oak.commons.PathUtils;
> import org.apache.jackrabbit.oak.spi.commit.CommitHook;
>-import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
> import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff;
> import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
> import org.apache.jackrabbit.oak.spi.state.NodeState;
>@@ -132,8 +131,8 @@ class KernelNodeStoreBranch implements N
>
>     @Nonnull
>     @Override
>-    public NodeState merge(@Nonnull CommitHook hook, PostCommitHook
>committed) throws CommitFailedException {
>-        return branchState.merge(checkNotNull(hook),
>checkNotNull(committed));
>+    public NodeState merge(@Nonnull CommitHook hook) throws
>CommitFailedException {
>+        return branchState.merge(checkNotNull(hook));
>     }
>
>     @Override
>@@ -181,7 +180,7 @@ class KernelNodeStoreBranch implements N
>         abstract void rebase();
>
>         @Nonnull
>-        abstract NodeState merge(@Nonnull CommitHook hook,
>PostCommitHook committed) throws CommitFailedException;
>+        abstract NodeState merge(@Nonnull CommitHook hook) throws
>CommitFailedException;
>     }
>
>     /**
>@@ -191,7 +190,7 @@ class KernelNodeStoreBranch implements N
>      * <ul>
>      *     <li>{@link InMemory} on {@link #setRoot(NodeState)} if the
>new root differs
>      *         from the current base</li>.
>-     *     <li>{@link Merged} on {@link #merge(CommitHook,
>PostCommitHook)}</li>
>+     *     <li>{@link Merged} on {@link #merge(CommitHook)}</li>
>      * </ul>
>      */
>     private class Unmodified extends BranchState {
>@@ -222,7 +221,7 @@ class KernelNodeStoreBranch implements N
>         }
>
>         @Override
>-        NodeState merge(CommitHook hook, PostCommitHook committed)
>throws CommitFailedException {
>+        NodeState merge(CommitHook hook) throws CommitFailedException {
>             branchState = new Merged(base);
>             return base;
>         }
>@@ -237,7 +236,7 @@ class KernelNodeStoreBranch implements N
>      *     <li>{@link Unmodified} on {@link #setRoot(NodeState)} if the
>new root is the same
>      *         as the base of this branch or
>      *     <li>{@link Persisted} otherwise.
>-     *     <li>{@link Merged} on {@link #merge(CommitHook,
>PostCommitHook)}</li>
>+     *     <li>{@link Merged} on {@link #merge(CommitHook)}</li>
>      * </ul>
>      */
>     private class InMemory extends BranchState {
>@@ -279,15 +278,16 @@ class KernelNodeStoreBranch implements N
>         }
>
>         @Override
>-        NodeState merge(CommitHook hook, PostCommitHook committed)
>throws CommitFailedException {
>+        NodeState merge(CommitHook hook) throws CommitFailedException {
>             mergeLock.lock();
>             try {
>                 rebase();
>+                store.beforeCommit(base);
>                 NodeState toCommit =
>checkNotNull(hook).processCommit(base, head);
>                 JsopDiff diff = new JsopDiff(store);
>                 toCommit.compareAgainstBaseState(base, diff);
>                 NodeState newHead = store.commit(diff.toString(), base);
>-                committed.contentChanged(base, newHead);
>+                store.localCommit(newHead);
>                 branchState = new Merged(base);
>                 return newHead;
>             } catch (MicroKernelException e) {
>@@ -295,6 +295,7 @@ class KernelNodeStoreBranch implements N
>                         "Kernel", 1,
>                         "Failed to merge changes to the underlying
>MicroKernel", e);
>             } finally {
>+                store.afterCommit(store.getRoot());
>                 mergeLock.unlock();
>             }
>         }
>@@ -308,7 +309,7 @@ class KernelNodeStoreBranch implements N
>      * <ul>
>      *     <li>{@link Unmodified} on {@link #setRoot(NodeState)} if the
>new root is the same
>      *         as the base of this branch.
>-     *     <li>{@link Merged} on {@link #merge(CommitHook,
>PostCommitHook)}</li>
>+     *     <li>{@link Merged} on {@link #merge(CommitHook)}</li>
>      * </ul>
>      */
>     private class Persisted extends BranchState {
>@@ -361,13 +362,13 @@ class KernelNodeStoreBranch implements N
>         }
>
>         @Override
>-        NodeState merge(CommitHook hook, PostCommitHook committed)
>throws CommitFailedException {
>+        NodeState merge(CommitHook hook) throws CommitFailedException {
>             mergeLock.lock();
>             try {
>                 rebase();
>+                store.beforeCommit(base);
>                 NodeState toCommit =
>checkNotNull(hook).processCommit(base, head);
>                 if (toCommit.equals(base)) {
>-                    committed.contentChanged(base, base);
>                     branchState = new Merged(base);
>                     return base;
>                 } else {
>@@ -375,7 +376,7 @@ class KernelNodeStoreBranch implements N
>                     toCommit.compareAgainstBaseState(head, diff);
>                     commit(diff.toString());
>                     NodeState newRoot = store.merge(head);
>-                    committed.contentChanged(base, newRoot);
>+                    store.localCommit(newRoot);
>                     branchState = new Merged(base);
>                     return newRoot;
>                 }
>@@ -384,6 +385,7 @@ class KernelNodeStoreBranch implements N
>                         "Kernel", 1,
>                         "Failed to merge changes to the underlying
>MicroKernel", e);
>             } finally {
>+                store.afterCommit(store.getRoot());
>                 mergeLock.unlock();
>             }
>         }
>@@ -429,7 +431,7 @@ class KernelNodeStoreBranch implements N
>         }
>
>         @Override
>-        NodeState merge(CommitHook hook, PostCommitHook committed)
>throws CommitFailedException {
>+        NodeState merge(CommitHook hook) throws CommitFailedException {
>             throw new IllegalStateException("Branch has already been
>merged");
>         }
>     }
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kern
>el/KernelRootBuilder.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/kernel/KernelRootBuilder.java?rev=1532782&r1=1532
>781&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kern
>el/KernelRootBuilder.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kern
>el/KernelRootBuilder.java Wed Oct 16 14:43:01 2013
>@@ -21,7 +21,6 @@ import static com.google.common.base.Pre
> import org.apache.jackrabbit.oak.api.CommitFailedException;
> import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeBuilder;
> import org.apache.jackrabbit.oak.spi.commit.CommitHook;
>-import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
> import org.apache.jackrabbit.oak.spi.state.NodeState;
> import org.apache.jackrabbit.oak.spi.state.NodeStoreBranch;
>
>@@ -132,9 +131,9 @@ class KernelRootBuilder extends MemoryNo
>     /**
>      * Merge all changes tracked in this builder into the underlying
>store.
>      */
>-    NodeState merge(CommitHook hook, PostCommitHook committed) throws
>CommitFailedException {
>+    NodeState merge(CommitHook hook) throws CommitFailedException {
>         purge();
>-        branch.merge(hook, committed);
>+        branch.merge(hook);
>         return reset();
>     }
>
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/index/AsyncIndexUpdate.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1532782&r
>1=1532781&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/index/AsyncIndexUpdate.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/index/AsyncIndexUpdate.java Wed Oct 16 14:43:01 2013
>@@ -26,21 +26,19 @@ import java.util.concurrent.TimeUnit;
>
> import javax.annotation.Nonnull;
>
>+import com.google.common.base.Objects;
> import org.apache.jackrabbit.oak.api.CommitFailedException;
> import org.apache.jackrabbit.oak.api.PropertyState;
> import org.apache.jackrabbit.oak.api.Type;
> import org.apache.jackrabbit.oak.spi.commit.CommitHook;
> import org.apache.jackrabbit.oak.spi.commit.EditorDiff;
> import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
>-import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
> import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
> import org.apache.jackrabbit.oak.spi.state.NodeState;
> import org.apache.jackrabbit.oak.spi.state.NodeStore;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
>-import com.google.common.base.Objects;
>-
> public class AsyncIndexUpdate implements Runnable {
>
>     private static final Logger log = LoggerFactory
>@@ -119,7 +117,7 @@ public class AsyncIndexUpdate implements
>                             throw CONCURRENT_UPDATE;
>                         }
>                     }
>-                }, PostCommitHook.EMPTY);
>+                });
>             } catch (CommitFailedException e) {
>                 if (e != CONCURRENT_UPDATE) {
>                     exception = e;
>@@ -144,7 +142,7 @@ public class AsyncIndexUpdate implements
>         NodeBuilder builder = store.getRoot().builder();
>         preAsyncRunStatus(builder);
>         try {
>-            store.merge(builder, EmptyHook.INSTANCE,
>PostCommitHook.EMPTY);
>+            store.merge(builder, EmptyHook.INSTANCE);
>         } catch (CommitFailedException e) {
>             log.warn("Index status update {} failed", name, e);
>         }
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/memory/MemoryNodeStore.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/plugins/memory/MemoryNodeStore.java?rev=1532782&r
>1=1532781&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/memory/MemoryNodeStore.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/memory/MemoryNodeStore.java Wed Oct 16 14:43:01 2013
>@@ -33,7 +33,6 @@ import javax.annotation.Nonnull;
> import com.google.common.io.ByteStreams;
> import org.apache.jackrabbit.oak.api.CommitFailedException;
> import org.apache.jackrabbit.oak.spi.commit.CommitHook;
>-import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
> import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff;
> import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
> import org.apache.jackrabbit.oak.spi.state.NodeState;
>@@ -75,21 +74,20 @@ public class MemoryNodeStore implements
>      * new branch and immediately merging it back.
>      * @param builder  the builder whose changes to apply
>      * @param commitHook the commit hook to apply while merging changes
>-     * @param committed  the pos commit hook
>      * @return the node state resulting from the merge.
>      * @throws CommitFailedException
>      * @throws IllegalArgumentException if the builder is not acquired
>from a root state of
>      *                                  this store
>      */
>     @Override
>-    public synchronized NodeState merge(@Nonnull NodeBuilder builder,
>@Nonnull CommitHook commitHook,
>-            PostCommitHook committed) throws CommitFailedException {
>+    public synchronized NodeState merge(@Nonnull NodeBuilder builder,
>@Nonnull CommitHook commitHook)
>+            throws CommitFailedException {
>         checkArgument(builder instanceof MemoryNodeBuilder);
>         checkNotNull(commitHook);
>         rebase(checkNotNull(builder));
>         NodeStoreBranch branch = new MemoryNodeStoreBranch(this,
>getRoot());
>         branch.setRoot(builder.getNodeState());
>-        NodeState merged = branch.merge(commitHook, committed);
>+        NodeState merged = branch.merge(commitHook);
>         ((MemoryNodeBuilder) builder).reset(merged);
>         return merged;
>     }
>@@ -198,16 +196,12 @@ public class MemoryNodeStore implements
>         }
>
>         @Override
>-        public NodeState merge(CommitHook hook, PostCommitHook
>committed) throws CommitFailedException {
>+        public NodeState merge(CommitHook hook) throws
>CommitFailedException {
>             // TODO: rebase();
>             checkNotMerged();
>             NodeState merged =
>ModifiedNodeState.squeeze(checkNotNull(hook).processCommit(base, root));
>-            synchronized (this) {
>-                // FIXME temporarily synchronized to work around the
>race described in OAK-1055
>-                store.root.set(merged);
>-                root = null; // Mark as merged
>-                committed.contentChanged(base, merged);
>-            }
>+            store.root.set(merged);
>+            root = null; // Mark as merged
>             return merged;
>         }
>
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/mongomk/MongoNodeStore.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java?rev=1532782&r
>1=1532781&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/mongomk/MongoNodeStore.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/mongomk/MongoNodeStore.java Wed Oct 16 14:43:01 2013
>@@ -16,6 +16,9 @@
>  */
> package org.apache.jackrabbit.oak.plugins.mongomk;
>
>+import static com.google.common.base.Preconditions.checkNotNull;
>+import static com.google.common.base.Preconditions.checkState;
>+
> import java.io.IOException;
> import java.io.InputStream;
> import java.lang.ref.WeakReference;
>@@ -37,6 +40,10 @@ import java.util.concurrent.atomic.Atomi
> import javax.annotation.CheckForNull;
> import javax.annotation.Nonnull;
>
>+import com.google.common.base.Function;
>+import com.google.common.cache.Cache;
>+import com.google.common.collect.Iterables;
>+import com.google.common.collect.Maps;
> import org.apache.jackrabbit.mk.api.MicroKernelException;
> import org.apache.jackrabbit.oak.api.Blob;
> import org.apache.jackrabbit.oak.api.CommitFailedException;
>@@ -46,21 +53,12 @@ import org.apache.jackrabbit.oak.plugins
> import
>org.apache.jackrabbit.oak.plugins.mongomk.util.TimingDocumentStoreWrapper;
> import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
> import org.apache.jackrabbit.oak.spi.commit.CommitHook;
>-import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
> import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
> import org.apache.jackrabbit.oak.spi.state.NodeState;
> import org.apache.jackrabbit.oak.spi.state.NodeStore;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
>-import com.google.common.base.Function;
>-import com.google.common.cache.Cache;
>-import com.google.common.collect.Iterables;
>-import com.google.common.collect.Maps;
>-
>-import static com.google.common.base.Preconditions.checkNotNull;
>-import static com.google.common.base.Preconditions.checkState;
>-
> /**
>  * Implementation of a NodeStore on MongoDB.
>  */
>@@ -664,8 +662,7 @@ public final class MongoNodeStore implem
>     @Nonnull
>     @Override
>     public NodeState merge(@Nonnull NodeBuilder builder,
>-                           @Nonnull CommitHook commitHook,
>-                           PostCommitHook committed)
>+                           @Nonnull CommitHook commitHook)
>             throws CommitFailedException {
>         // TODO: implement
>         return null;
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/observation/ChangeDispatcher.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java?rev=153
>2782&r1=1532781&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/observation/ChangeDispatcher.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/observation/ChangeDispatcher.java Wed Oct 16 14:43:01 2013
>@@ -20,18 +20,21 @@ package org.apache.jackrabbit.oak.plugin
>
> import static com.google.common.base.Objects.toStringHelper;
> import static com.google.common.base.Preconditions.checkNotNull;
>-import static
>org.apache.jackrabbit.oak.plugins.observation.ObservationConstants.OAK_UNK
>NOWN;
>+import static com.google.common.base.Preconditions.checkState;
>+import static org.apache.jackrabbit.oak.api.Type.LONG;
>+import static org.apache.jackrabbit.oak.api.Type.STRING;
>
> import java.util.Queue;
> import java.util.Set;
>+import java.util.concurrent.atomic.AtomicLong;
>
> import javax.annotation.CheckForNull;
> import javax.annotation.Nonnull;
>
>+import com.google.common.base.Objects;
> import com.google.common.collect.Queues;
> import com.google.common.collect.Sets;
>-import org.apache.jackrabbit.oak.api.ContentSession;
>-import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
>+import org.apache.jackrabbit.oak.api.PropertyState;
> import org.apache.jackrabbit.oak.spi.state.NodeState;
> import org.apache.jackrabbit.oak.spi.state.NodeStore;
>
>@@ -39,19 +42,27 @@ import org.apache.jackrabbit.oak.spi.sta
>  * A {@code ChangeDispatcher} instance records changes to a {@link
>NodeStore}
>  * and dispatches them to interested parties.
>  * <p>
>- * The {@link #newHook(ContentSession)} method registers a hook for
>- * reporting changes. Actual changes are reported by calling
>- * {@link Hook#contentChanged(NodeState, NodeState)}. Such changes are
>considered
>- * to have occurred on the local cluster node and are recorded as such.
>Changes
>- * that occurred in-between calls to any hook registered with a change
>processor
>- * are considered to have occurred on a different cluster node and are
>recorded as such.
>+ * Actual changes are reported by calling {@link
>#beforeCommit(NodeState)},
>+ * {@link #localCommit(NodeState)} and {@link #afterCommit(NodeState)}
>in that order:
>+ * <pre>
>+      NodeState root = store.getRoot();
>+      branch.rebase();
>+      changeDispatcher.beforeCommit(root);
>+      try {
>+          NodeState head = branch.getHead();
>+          branch.merge();
>+          changeDispatcher.localCommit(head);
>+      } finally {
>+          changeDispatcher.afterCommit(store.getRoot());
>+      }
>+ * </pre>
>  * <p>
>- * The {@link #newListener()} registers a listener for receiving changes
>reported
>- * into a change dispatcher by any of its hooks.
>+ * The {@link #newListener()} method registers a listener for receiving
>changes reported
>+ * into a change dispatcher.
>  */
> public class ChangeDispatcher {
>-    private final NodeStore store;
>     private final Set<Listener> listeners = Sets.newHashSet();
>+    private final NodeStore store;
>
>     private NodeState previousRoot;
>
>@@ -65,20 +76,6 @@ public class ChangeDispatcher {
>     }
>
>     /**
>-     * Create a new {@link Hook} for reporting changes occurring in the
>-     * passed {@code contentSession}. The content session is used to
>-     * determine the user associated with the changes recorded through
>this
>-     * hook and to determine the originating session of changes.
>-     * @param contentSession  session which will be associated with any
>changes reported
>-     *                        through this hook.
>-     * @return a new {@code Hook} instance
>-     */
>-    @Nonnull
>-    public Hook newHook(ContentSession contentSession) {
>-        return new Hook(contentSession);
>-    }
>-
>-    /**
>      * Create a new {@link Listener} for receiving changes reported into
>      * this change dispatcher. Listeners need to be {@link
>Listener#dispose() disposed}
>      * when no longer needed.
>@@ -91,21 +88,83 @@ public class ChangeDispatcher {
>         return listener;
>     }
>
>-    private synchronized void contentChanged(@Nonnull NodeState before,
>@Nonnull NodeState after,
>-            ContentSession contentSession) {
>-        externalChange(checkNotNull(before));
>-        internalChange(checkNotNull(after), contentSession);
>+    private final AtomicLong changeCount = new AtomicLong(0);
>+
>+    private boolean inLocalCommit() {
>+        return changeCount.get() % 2 == 1;
>+    }
>+
>+    /**
>+     * Call with the latest persisted root node state right before
>persisting further changes.
>+     * Calling this method marks this instance to be inside a local
>commit.
>+     * <p>
>+     * The differences from the root node state passed to the last call
>to
>+     * {@link #afterCommit(NodeState)} to {@code root} are reported as
>cluster external
>+     * changes to any listener.
>+     *
>+     * @param root  latest persisted root node state.
>+     * @throws IllegalStateException  if inside a local commit
>+     */
>+    public synchronized void beforeCommit(@Nonnull NodeState root) {
>+        checkState(!inLocalCommit());
>+        changeCount.incrementAndGet();
>+        externalChange(checkNotNull(root));
>+    }
>+
>+    /**
>+     * Call right after changes have been successfully persisted passing
>the new root
>+     * node state resulting from the persist operation.
>+     * <p>
>+     * The differences from the root node state passed to the last call
>to
>+     * {@link #beforeCommit(NodeState)} to {@code root} are reported as
>cluster local
>+     * changes to any listener.
>+
>+     * @param root  root node state just persisted
>+     * @throws IllegalStateException  if not inside a local commit
>+     */
>+    public synchronized void localCommit(@Nonnull NodeState root) {
>+        checkState(inLocalCommit());
>+        internalChange(checkNotNull(root));
>+    }
>+
>+    /**
>+     * Call to mark the end of a persist operation passing the latest
>persisted root node state.
>+     * Calling this method marks this instance to not be inside a local
>commit.
>+     * <p>
>+     * The difference from the root node state passed to the las call to
>+     * {@link #localCommit(NodeState)} to {@code root} are reported as
>cluster external
>+     * changes to any listener.
>+
>+     * @param root  latest persisted root node state.
>+     * @throws IllegalStateException  if not inside a local commit
>+     */
>+    public synchronized void afterCommit(@Nonnull NodeState root) {
>+        checkState(inLocalCommit());
>+        externalChange(checkNotNull(root));
>+        changeCount.incrementAndGet();
>+    }
>+
>+    private void externalChange() {
>+        if (!inLocalCommit()) {
>+            long c = changeCount.get();
>+            NodeState root = store.getRoot();  // Need to get root
>outside sync. See OAK-959
>+            synchronized (this) {
>+                if (c == changeCount.get() && !inLocalCommit()) {
>+                    externalChange(root);
>+                }
>+            }
>+        }
>     }
>
>     private synchronized void externalChange(NodeState root) {
>         if (!root.equals(previousRoot)) {
>-            add(ChangeSet.external(previousRoot, root));
>+            add(new ChangeSet(previousRoot, root, true));
>             previousRoot = root;
>         }
>     }
>
>-    private synchronized void internalChange(NodeState root,
>ContentSession contentSession) {
>-        add(ChangeSet.local(previousRoot, root, contentSession));
>+    private synchronized void internalChange(NodeState root) {
>+        add(new ChangeSet(previousRoot, root, false));
>         previousRoot = root;
>     }
>
>@@ -133,28 +192,6 @@ public class ChangeDispatcher {
>         }
>     }
>
>-    //------------------------------------------------------------< Sink
>>---
>-
>-    /**
>-     * Hook for reporting changes. Actual changes are reported by calling
>-     * {@link Hook#contentChanged(NodeState, NodeState)}. Such changes
>are considered
>-     * to have occurred on the local cluster node and are recorded as
>such. Changes
>-     * that occurred in-between calls to any hook registered with a
>change processor
>-     * are considered to have occurred on a different cluster node and
>are recorded as such.
>-     */
>-    public class Hook implements PostCommitHook {
>-        private final ContentSession contentSession;
>-
>-        private Hook(ContentSession contentSession) {
>-            this.contentSession = contentSession;
>-        }
>-
>-        @Override
>-        public void contentChanged(@Nonnull NodeState before, @Nonnull
>NodeState after) {
>-            ChangeDispatcher.this.contentChanged(before, after,
>contentSession);
>-        }
>-    }
>-
>     //------------------------------------------------------------<
>Listener >---
>
>     /**
>@@ -177,6 +214,10 @@ public class ChangeDispatcher {
>          */
>         @CheckForNull
>         public ChangeSet getChanges() {
>+            if (changeSets.isEmpty()) {
>+                externalChange();
>+            }
>+
>             return changeSets.isEmpty() ? null : changeSets.remove();
>         }
>
>@@ -193,55 +234,45 @@ public class ChangeDispatcher {
>      * on the local cluster node, the user causing the changes and the
>date the changes
>      * where persisted.
>      */
>-    public abstract static class ChangeSet {
>+    public static class ChangeSet {
>         private final NodeState before;
>         private final NodeState after;
>+        private final boolean isExternal;
>
>-        static ChangeSet local(NodeState base, NodeState head,
>ContentSession contentSession) {
>-            return new InternalChangeSet(base, head, contentSession,
>System.currentTimeMillis());
>+        ChangeSet(NodeState before, NodeState after, boolean isExternal)
>{
>+            this.before = before;
>+            this.after = after;
>+            this.isExternal = isExternal;
>         }
>
>-        static ChangeSet external(NodeState base, NodeState head) {
>-            return new ExternalChangeSet(base, head);
>+        public boolean isExternal() {
>+            return isExternal;
>         }
>
>-        protected ChangeSet(NodeState before, NodeState after) {
>-            this.before = before;
>-            this.after = after;
>+        public boolean isLocal(String sessionId) {
>+            return Objects.equal(getSessionId(), sessionId);
>         }
>
>-        /**
>-         * Determine whether these changes originate from the local
>cluster node
>-         * or an external cluster node.
>-         * @return  {@code true} iff the changes originate from a remote
>cluster node.
>-         */
>-        public abstract boolean isExternal();
>-
>-        /**
>-         * Determine whether these changes where caused by the passed
>content
>-         * session.
>-         * @param contentSession  content session to test for
>-         * @return  {@code true} iff these changes where cause by the
>passed content session.
>-         *          Always {@code false} if {@link #isExternal()} is
>{@code true}.
>-         */
>-        public abstract boolean isLocal(ContentSession contentSession);
>+        @CheckForNull
>+        public String getSessionId() {
>+            return getStringOrNull(getCommitInfo(after),
>CommitInfoEditorProvider.SESSION_ID);
>+        }
>
>-        /**
>-         * Determine the user associated with these changes.
>-         * @return  user id or {@link ObservationConstants#OAK_UNKNOWN}
>if {@link #isExternal()} is {@code true}.
>-         */
>-        public abstract String getUserId();
>+        @CheckForNull
>+        public String getUserId() {
>+            return getStringOrNull(getCommitInfo(after),
>CommitInfoEditorProvider.USER_ID);
>+        }
>
>-        /**
>-         * Determine the date when these changes where persisted.
>-         * @return  date or {@code 0} if {@link #isExternal()} is {@code
>true}.
>-         */
>-        public abstract long getDate();
>+        public long getDate() {
>+            PropertyState property =
>getCommitInfo(after).getProperty(CommitInfoEditorProvider.TIME_STAMP);
>+            return property == null ? 0 : property.getValue(LONG);
>+        }
>
>         /**
>          * State before the change
>          * @return  before state
>          */
>+        @Nonnull
>         public NodeState getBeforeState() {
>             return before;
>         }
>@@ -250,6 +281,7 @@ public class ChangeDispatcher {
>          * State after the change
>          * @return  after state
>          */
>+        @Nonnull
>         public NodeState getAfterState() {
>             return after;
>         }
>@@ -259,8 +291,10 @@ public class ChangeDispatcher {
>             return toStringHelper(this)
>                 .add("base", before)
>                 .add("head", after)
>-                .add("userId", getUserId())
>-                .add("date", getDate())
>+                .add(CommitInfoEditorProvider.USER_ID, getUserId())
>+                .add(CommitInfoEditorProvider.TIME_STAMP, getDate())
>+                .add(CommitInfoEditorProvider.SESSION_ID, getSessionId())
>+                .add("external", isExternal)
>                 .toString();
>         }
>
>@@ -274,7 +308,8 @@ public class ChangeDispatcher {
>             }
>
>             ChangeSet that = (ChangeSet) other;
>-            return before.equals(that.before) &&
>after.equals(that.after);
>+            return before.equals(that.before) &&
>after.equals(that.after) &&
>+                    isExternal == that.isExternal;
>         }
>
>         @Override
>@@ -282,73 +317,13 @@ public class ChangeDispatcher {
>             return 31 * before.hashCode() + after.hashCode();
>         }
>
>-        private static class InternalChangeSet extends ChangeSet {
>-            private final ContentSession contentSession;
>-            private final String userId;
>-            private final long date;
>-
>-            InternalChangeSet(NodeState base, NodeState head,
>ContentSession contentSession, long date) {
>-                super(base, head);
>-                this.contentSession = contentSession;
>-                this.userId = contentSession.getAuthInfo().getUserID();
>-                this.date = date;
>-            }
>-
>-            @Override
>-            public boolean isExternal() {
>-                return false;
>-            }
>-
>-            @Override
>-            public boolean isLocal(ContentSession contentSession) {
>-                return this.contentSession == contentSession;
>-            }
>-
>-            @Override
>-            public String getUserId() {
>-                return userId;
>-            }
>-
>-            @Override
>-            public long getDate() {
>-                return date;
>-            }
>-
>-            @Override
>-            public boolean equals(Object other) {
>-                if (!super.equals(other)) {
>-                    return false;
>-                }
>-
>-                InternalChangeSet that = (InternalChangeSet) other;
>-                return date == that.date && contentSession ==
>that.contentSession;
>-            }
>+        private static String getStringOrNull(NodeState commitInfo,
>String name) {
>+            PropertyState property = commitInfo.getProperty(name);
>+            return property == null ? null : property.getValue(STRING);
>         }
>
>-        private static class ExternalChangeSet extends ChangeSet {
>-            ExternalChangeSet(NodeState base, NodeState head) {
>-                super(base, head);
>-            }
>-
>-            @Override
>-            public boolean isExternal() {
>-                return true;
>-            }
>-
>-            @Override
>-            public boolean isLocal(ContentSession contentSession) {
>-                return false;
>-            }
>-
>-            @Override
>-            public String getUserId() {
>-                return OAK_UNKNOWN;
>-            }
>-
>-            @Override
>-            public long getDate() {
>-                return 0;
>-            }
>+        private static NodeState getCommitInfo(NodeState after) {
>+            return
>after.getChildNode(CommitInfoEditorProvider.COMMIT_INFO);
>         }
>
>     }
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/observation/ChangeProcessor.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java?rev=1532
>782&r1=1532781&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/observation/ChangeProcessor.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/observation/ChangeProcessor.java Wed Oct 16 14:43:01 2013
>@@ -18,10 +18,22 @@
>  */
> package org.apache.jackrabbit.oak.plugins.observation;
>
>+import static com.google.common.base.Preconditions.checkArgument;
>+import static com.google.common.base.Preconditions.checkState;
>+import static com.google.common.collect.Iterators.emptyIterator;
>+import static com.google.common.collect.Iterators.singletonIterator;
>+import static com.google.common.collect.Iterators.transform;
>+import static javax.jcr.observation.Event.NODE_ADDED;
>+import static javax.jcr.observation.Event.NODE_REMOVED;
>+import static javax.jcr.observation.Event.PROPERTY_ADDED;
>+import static javax.jcr.observation.Event.PROPERTY_REMOVED;
>+import static
>org.apache.jackrabbit.oak.plugins.identifier.IdentifierManager.getIdentifi
>er;
>+
> import java.util.ArrayList;
> import java.util.Iterator;
> import java.util.List;
> import java.util.concurrent.atomic.AtomicReference;
>+
> import javax.annotation.Nonnull;
> import javax.jcr.observation.Event;
> import javax.jcr.observation.EventListener;
>@@ -50,17 +62,6 @@ import org.apache.jackrabbit.oak.spi.whi
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
>-import static com.google.common.base.Preconditions.checkArgument;
>-import static com.google.common.base.Preconditions.checkState;
>-import static com.google.common.collect.Iterators.emptyIterator;
>-import static com.google.common.collect.Iterators.singletonIterator;
>-import static com.google.common.collect.Iterators.transform;
>-import static javax.jcr.observation.Event.NODE_ADDED;
>-import static javax.jcr.observation.Event.NODE_REMOVED;
>-import static javax.jcr.observation.Event.PROPERTY_ADDED;
>-import static javax.jcr.observation.Event.PROPERTY_REMOVED;
>-import static
>org.apache.jackrabbit.oak.plugins.identifier.IdentifierManager.getIdentifi
>er;
>-
> /**
>  * A {@code ChangeProcessor} generates observation {@link
>javax.jcr.observation.Event}s
>  * based on a {@link EventFilter} and delivers them to an {@link
>javax.jcr.observation.EventListener}.
>@@ -189,7 +190,9 @@ public class ChangeProcessor implements
>             ChangeSet changes = changeListener.getChanges();
>             while (!stopping && changes != null) {
>                 EventFilter filter = filterRef.get();
>-                if (!(filter.excludeLocal() &&
>changes.isLocal(contentSession))) {
>+                // FIXME don't rely on toString for session id
>+                // FIXME make cluster node id part of session id
>+                if (!(filter.excludeLocal() &&
>changes.isLocal(contentSession.toString()))) {
>                     String path =
>namePathMapper.getOakPath(filter.getPath());
>                     ImmutableTree beforeTree =
>getTree(changes.getBeforeState(), path);
>                     ImmutableTree afterTree =
>getTree(changes.getAfterState(), path);
>
>Added:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/observation/CommitInfoEditorProvider.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/plugins/observation/CommitInfoEditorProvider.java
>?rev=1532782&view=auto
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/observation/CommitInfoEditorProvider.java (added)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/observation/CommitInfoEditorProvider.java Wed Oct 16 14:43:01 2013
>@@ -0,0 +1,69 @@
>+package org.apache.jackrabbit.oak.plugins.observation;
>+
>+import static com.google.common.base.Preconditions.checkNotNull;
>+
>+import javax.annotation.Nonnull;
>+
>+import org.apache.jackrabbit.oak.spi.commit.DefaultEditor;
>+import org.apache.jackrabbit.oak.spi.commit.Editor;
>+import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
>+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
>+import org.apache.jackrabbit.oak.spi.state.NodeState;
>+
>+/**
>+ * Provider for a {@link Editor} that amends each commit with
>+ * a commit info record. That record is stored in a child node
>+ * named {@link #COMMIT_INFO} of the root node.
>+ */
>+public class CommitInfoEditorProvider implements EditorProvider {
>+
>+    /**
>+     * Node name for the commit info record
>+     */
>+    public static final String COMMIT_INFO = ":commit-info";
>+
>+    /**
>+     * Name of the property containing the id of the session that
>committed
>+     * this revision.
>+     * <p>
>+     * In a clustered environment this property might contain a
>synthesised
>+     * value if the respective revision is the result of a cluster sync.
>+     */
>+    public static final String SESSION_ID = "session-id";
>+
>+    /**
>+     * Name of the property containing the id of the user that committed
>+     * this revision.
>+     * <p>
>+     * In a clustered environment this property might contain a
>synthesised
>+     * value if the respective revision is the result of a cluster sync.
>+     */
>+    public static final String USER_ID = "user-id";
>+
>+    /**
>+     * Name of the property containing the time stamp when this revision
>was
>+     * committed.
>+     */
>+    public static final String TIME_STAMP = "time-stamp";
>+
>+    private final String sessionId;
>+    private final String userId;
>+
>+    public CommitInfoEditorProvider(@Nonnull String sessionId, String
>userID) {
>+        this.sessionId = checkNotNull(sessionId);
>+        this.userId = userID;
>+    }
>+
>+    @Override
>+    public Editor getRootEditor(NodeState before, NodeState after, final
>NodeBuilder builder) {
>+        return new DefaultEditor() {
>+            @Override
>+            public void enter(NodeState before, NodeState after) {
>+                NodeBuilder commitInfo =
>builder.setChildNode(COMMIT_INFO);
>+                commitInfo.setProperty(USER_ID, String.valueOf(userId));
>+                commitInfo.setProperty(SESSION_ID, sessionId);
>+                commitInfo.setProperty(TIME_STAMP,
>System.currentTimeMillis());
>+            }
>+        };
>+    }
>+}
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/observation/Observable.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/plugins/observation/Observable.java?rev=1532782&r
>1=1532781&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/observation/Observable.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/observation/Observable.java Wed Oct 16 14:43:01 2013
>@@ -23,14 +23,15 @@ import org.apache.jackrabbit.oak.plugins
>
> /**
>  * An {@code Observable} supports attaching {@link Listener} instances
>for
>- * listening to changes in a {@code ContentSession}.
>+ * listening to content changes.
>+ *
>  * @see ChangeDispatcher
>  */
> public interface Observable {
>
>     /**
>      * Register a new {@code Listener}. Clients need to call
>-     * {@link ChangeDispatcher.Listener#dispose()} when to free
>+     * {@link ChangeDispatcher.Listener#dispose()} to free
>      * up any resources associated with the listener when done.
>      * @return a fresh {@code Listener} instance.
>      */
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/segment/SegmentNodeStore.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1532782
>&r1=1532781&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/segment/SegmentNodeStore.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/segment/SegmentNodeStore.java Wed Oct 16 14:43:01 2013
>@@ -29,16 +29,18 @@ import javax.annotation.Nonnull;
>
> import org.apache.jackrabbit.oak.api.Blob;
> import org.apache.jackrabbit.oak.api.CommitFailedException;
>+import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
>+import
>org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
>+import org.apache.jackrabbit.oak.plugins.observation.Observable;
> import org.apache.jackrabbit.oak.spi.commit.CommitHook;
> import org.apache.jackrabbit.oak.spi.commit.EmptyObserver;
> import org.apache.jackrabbit.oak.spi.commit.Observer;
>-import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
> import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff;
> import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
> import org.apache.jackrabbit.oak.spi.state.NodeState;
> import org.apache.jackrabbit.oak.spi.state.NodeStore;
>
>-public class SegmentNodeStore implements NodeStore {
>+public class SegmentNodeStore implements NodeStore, Observable {
>
>     static final String ROOT = "root";
>
>@@ -48,6 +50,8 @@ public class SegmentNodeStore implements
>
>     private final Observer observer;
>
>+    private final ChangeDispatcher changeDispatcher;
>+
>     private SegmentNodeState head;
>
>     private long maximumBackoff = MILLISECONDS.convert(10, SECONDS);
>@@ -58,6 +62,7 @@ public class SegmentNodeStore implements
>         this.observer = EmptyObserver.INSTANCE;
>         this.head = new SegmentNodeState(
>                 store.getWriter().getDummySegment(),
>this.journal.getHead());
>+        this.changeDispatcher = new ChangeDispatcher(this);
>     }
>
>     public SegmentNodeStore(SegmentStore store) {
>@@ -78,7 +83,22 @@ public class SegmentNodeStore implements
>     }
>
>     boolean setHead(SegmentNodeState base, SegmentNodeState head) {
>-        return journal.setHead(base.getRecordId(), head.getRecordId());
>+        changeDispatcher.beforeCommit(base.getChildNode(ROOT));
>+        try {
>+            if (journal.setHead(base.getRecordId(), head.getRecordId()))
>{
>+                changeDispatcher.localCommit(head.getChildNode(ROOT));
>+                return true;
>+            } else {
>+                return false;
>+            }
>+        } finally {
>+            changeDispatcher.afterCommit(getRoot());
>+        }
>+    }
>+
>+    @Override
>+    public Listener newListener() {
>+        return changeDispatcher.newListener();
>     }
>
>     @Override @Nonnull
>@@ -89,7 +109,7 @@ public class SegmentNodeStore implements
>     @Override
>     public synchronized NodeState merge(
>             @Nonnull NodeBuilder builder,
>-            @Nonnull CommitHook commitHook, PostCommitHook committed)
>+            @Nonnull CommitHook commitHook)
>             throws CommitFailedException {
>         checkArgument(builder instanceof SegmentNodeBuilder);
>         checkNotNull(commitHook);
>@@ -98,7 +118,7 @@ public class SegmentNodeStore implements
>         SegmentNodeStoreBranch branch = new SegmentNodeStoreBranch(
>                 this, store.getWriter(), head, maximumBackoff);
>         branch.setRoot(builder.getNodeState());
>-        NodeState merged = branch.merge(commitHook, committed);
>+        NodeState merged = branch.merge(commitHook);
>         ((SegmentNodeBuilder) builder).reset(merged);
>         return merged;
>     }
>@@ -147,5 +167,4 @@ public class SegmentNodeStore implements
>                 new
>SegmentNodeState(store.getWriter().getDummySegment(), id);
>         return root.getChildNode(ROOT);
>     }
>-
> }
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/segment/SegmentNodeStoreBranch.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java?rev=1
>532782&r1=1532781&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/segment/SegmentNodeStoreBranch.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/segment/SegmentNodeStoreBranch.java Wed Oct 16 14:43:01 2013
>@@ -28,7 +28,6 @@ import javax.annotation.Nonnull;
> import org.apache.jackrabbit.oak.api.CommitFailedException;
> import org.apache.jackrabbit.oak.commons.PathUtils;
> import org.apache.jackrabbit.oak.spi.commit.CommitHook;
>-import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
> import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff;
> import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
> import org.apache.jackrabbit.oak.spi.state.NodeState;
>@@ -88,7 +87,7 @@ class SegmentNodeStoreBranch implements
>         }
>     }
>
>-    private synchronized long optimisticMerge(CommitHook hook,
>PostCommitHook committed)
>+    private synchronized long optimisticMerge(CommitHook hook)
>             throws CommitFailedException, InterruptedException {
>         long timeout = 1;
>
>@@ -111,10 +110,8 @@ class SegmentNodeStoreBranch implements
>                 // someone else has a pessimistic lock on the journal,
>                 // so we should not try to commit anything
>             } else if (store.setHead(base, newHead)) {
>-                NodeState previousBase = base;
>                 base = newHead;
>                 head = newHead;
>-
>committed.contentChanged(previousBase.getChildNode(ROOT),
>newHead.getChildNode(ROOT));
>                 return -1;
>             }
>
>@@ -136,7 +133,7 @@ class SegmentNodeStoreBranch implements
>         return MILLISECONDS.convert(timeout, NANOSECONDS);
>     }
>
>-    private synchronized void pessimisticMerge(CommitHook hook,
>PostCommitHook committed, long timeout)
>+    private synchronized void pessimisticMerge(CommitHook hook, long
>timeout)
>             throws CommitFailedException {
>         while (true) {
>             SegmentNodeState before = store.getHead();
>@@ -168,10 +165,8 @@ class SegmentNodeStoreBranch implements
>                     SegmentNodeState newHead =
>                             writer.writeNode(builder.getNodeState());
>                     if (store.setHead(after, newHead)) {
>-                        NodeState previousBase = base;
>                         base = newHead;
>                         head = newHead;
>-
>committed.contentChanged(previousBase.getChildNode(ROOT),
>newHead.getChildNode(ROOT));
>                         return;
>                     } else {
>                         // something else happened, perhaps a timeout, so
>@@ -185,13 +180,13 @@ class SegmentNodeStoreBranch implements
>     }
>
>     @Override @Nonnull
>-    public synchronized NodeState merge(CommitHook hook, PostCommitHook
>committed)
>+    public synchronized NodeState merge(CommitHook hook)
>             throws CommitFailedException {
>         if (base != head) {
>             try {
>-                long timeout = optimisticMerge(hook, committed);
>+                long timeout = optimisticMerge(hook);
>                 if (timeout >= 0) {
>-                    pessimisticMerge(hook, committed, timeout);
>+                    pessimisticMerge(hook, timeout);
>                 }
>             } catch (InterruptedException e) {
>                 throw new CommitFailedException(
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/segment/SegmentNodeStoreService.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=
>1532782&r1=1532781&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/segment/SegmentNodeStoreService.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/segment/SegmentNodeStoreService.java Wed Oct 16 14:43:01 2013
>@@ -24,6 +24,7 @@ import java.util.Dictionary;
> import javax.annotation.CheckForNull;
> import javax.annotation.Nonnull;
>
>+import com.google.common.base.Preconditions;
> import com.mongodb.Mongo;
> import org.apache.felix.scr.annotations.Activate;
> import org.apache.felix.scr.annotations.Component;
>@@ -33,10 +34,11 @@ import org.apache.felix.scr.annotations.
> import org.apache.felix.scr.annotations.Service;
> import org.apache.jackrabbit.oak.api.Blob;
> import org.apache.jackrabbit.oak.api.CommitFailedException;
>+import
>org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
>+import org.apache.jackrabbit.oak.plugins.observation.Observable;
> import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
> import org.apache.jackrabbit.oak.plugins.segment.mongo.MongoStore;
> import org.apache.jackrabbit.oak.spi.commit.CommitHook;
>-import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
> import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
> import org.apache.jackrabbit.oak.spi.state.NodeState;
> import org.apache.jackrabbit.oak.spi.state.NodeStore;
>@@ -44,7 +46,7 @@ import org.osgi.service.component.Compon
>
> @Component(policy = ConfigurationPolicy.REQUIRE)
> @Service(NodeStore.class)
>-public class SegmentNodeStoreService implements NodeStore {
>+public class SegmentNodeStoreService implements NodeStore, Observable {
>
>     @Property(description="The unique name of this instance")
>     public static final String NAME = "name";
>@@ -145,6 +147,15 @@ public class SegmentNodeStoreService imp
>         }
>     }
>
>+    //------------------------------------------------------------<
>Observable >---
>+
>+    @Override
>+    public Listener newListener() {
>+        Preconditions.checkState(delegate instanceof Observable);
>+        return ((Observable) getDelegate()).newListener();
>+    }
>+
>+
>     //---------------------------------------------------------<
>NodeStore >--
>
>     @Override @Nonnull
>@@ -154,9 +165,9 @@ public class SegmentNodeStoreService imp
>
>     @Nonnull
>     @Override
>-    public NodeState merge(@Nonnull NodeBuilder builder, @Nonnull
>CommitHook commitHook,
>-            PostCommitHook committed) throws CommitFailedException {
>-        return getDelegate().merge(builder, commitHook, committed);
>+    public NodeState merge(@Nonnull NodeBuilder builder, @Nonnull
>CommitHook commitHook)
>+            throws CommitFailedException {
>+        return getDelegate().merge(builder, commitHook);
>     }
>
>     @Override
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/
>lifecycle/OakInitializer.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/spi/lifecycle/OakInitializer.java?rev=1532782&r1=
>1532781&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/
>lifecycle/OakInitializer.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/
>lifecycle/OakInitializer.java Wed Oct 16 14:43:01 2013
>@@ -21,11 +21,10 @@ package org.apache.jackrabbit.oak.spi.li
> import javax.annotation.Nonnull;
>
> import org.apache.jackrabbit.oak.api.CommitFailedException;
>-import org.apache.jackrabbit.oak.plugins.index.IndexUpdateProvider;
> import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
>+import org.apache.jackrabbit.oak.plugins.index.IndexUpdateProvider;
> import org.apache.jackrabbit.oak.spi.commit.CommitHook;
> import org.apache.jackrabbit.oak.spi.commit.EditorHook;
>-import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
> import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
> import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
> import org.apache.jackrabbit.oak.spi.state.NodeStore;
>@@ -43,8 +42,7 @@ public final class OakInitializer {
>             initializer.initialize(builder);
>             store.merge(
>                     builder,
>-                    new EditorHook(new IndexUpdateProvider(indexEditor)),
>-                    PostCommitHook.EMPTY);
>+                    new EditorHook(new
>IndexUpdateProvider(indexEditor)));
>         } catch (CommitFailedException e) {
>             throw new RuntimeException(e);
>         }
>@@ -63,8 +61,7 @@ public final class OakInitializer {
>         try {
>             store.merge(
>                     builder,
>-                    new EditorHook(new IndexUpdateProvider(indexEditor)),
>-                    PostCommitHook.EMPTY);
>+                    new EditorHook(new
>IndexUpdateProvider(indexEditor)));
>         } catch (CommitFailedException e) {
>             throw new RuntimeException(e);
>         }
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/
>state/NodeStore.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/spi/state/NodeStore.java?rev=1532782&r1=1532781&r
>2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/
>state/NodeStore.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/
>state/NodeStore.java Wed Oct 16 14:43:01 2013
>@@ -25,7 +25,6 @@ import javax.annotation.Nonnull;
> import org.apache.jackrabbit.oak.api.Blob;
> import org.apache.jackrabbit.oak.api.CommitFailedException;
> import org.apache.jackrabbit.oak.spi.commit.CommitHook;
>-import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
>
> /**
>  * Storage abstraction for trees. At any given point in time the stored
>@@ -51,15 +50,14 @@ public interface NodeStore {
>      *
>      * @param builder  the builder whose changes to apply
>      * @param commitHook the commit hook to apply while merging changes
>-     * @param committed  the post commit hook
>      * @return the node state resulting from the merge.
>      * @throws CommitFailedException if the merge failed
>      * @throws IllegalArgumentException if the builder is not acquired
>      *                                  from a root state of this store
>      */
>     @Nonnull
>-    NodeState merge(@Nonnull NodeBuilder builder, @Nonnull CommitHook
>commitHook,
>-            PostCommitHook committed) throws CommitFailedException;
>+    NodeState merge(@Nonnull NodeBuilder builder, @Nonnull CommitHook
>commitHook)
>+            throws CommitFailedException;
>
>     /**
>      * Rebase the changes in the passed {@code builder} on top of the
>current root state.
>@@ -114,5 +112,4 @@ public interface NodeStore {
>      */
>     @CheckForNull
>     NodeState retrieve(@Nonnull String checkpoint);
>-
> }
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/
>state/NodeStoreBranch.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/spi/state/NodeStoreBranch.java?rev=1532782&r1=153
>2781&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/
>state/NodeStoreBranch.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/
>state/NodeStoreBranch.java Wed Oct 16 14:43:01 2013
>@@ -20,7 +20,6 @@ import javax.annotation.Nonnull;
>
> import org.apache.jackrabbit.oak.api.CommitFailedException;
> import org.apache.jackrabbit.oak.spi.commit.CommitHook;
>-import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
>
> /**
>  * An instance of this class represents a private branch of the tree in a
>@@ -86,13 +85,12 @@ public interface NodeStoreBranch {
>      * the current head revision followed by a fast forward merge.
>      *
>      * @param hook the commit hook to apply while merging changes
>-     * @param committed the post commit hook to call after a successful
>merge
>      * @return the node state resulting from the merge.
>      * @throws CommitFailedException if the merge failed
>      * @throws IllegalStateException if the branch is already merged
>      */
>     @Nonnull
>-    NodeState merge(@Nonnull CommitHook hook, PostCommitHook committed)
>throws CommitFailedException;
>+    NodeState merge(@Nonnull CommitHook hook) throws
>CommitFailedException;
>
>     /**
>      * Rebase the changes from this branch on top of the current
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeBuilderTest.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/o
>rg/apache/jackrabbit/oak/kernel/KernelNodeBuilderTest.java?rev=1532782&r1=
>1532781&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeBuilderTest.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeBuilderTest.java Wed Oct 16 14:43:01 2013
>@@ -26,10 +26,8 @@ import org.apache.jackrabbit.mk.core.Mic
> import org.apache.jackrabbit.oak.api.CommitFailedException;
> import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
> import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
>-import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
> import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
> import org.apache.jackrabbit.oak.spi.state.NodeStore;
>-import org.apache.jackrabbit.oak.spi.state.NodeStoreBranch;
> import org.junit.Test;
>
> public class KernelNodeBuilderTest {
>@@ -51,7 +49,7 @@ public class KernelNodeBuilderTest {
>     private static void init(NodeStore store) throws
>CommitFailedException {
>         NodeBuilder builder = store.getRoot().builder();
>         builder.child("x").child("y").child("z");
>-        store.merge(builder, EmptyHook.INSTANCE, PostCommitHook.EMPTY);
>+        store.merge(builder, EmptyHook.INSTANCE);
>     }
>
>     private static void run(NodeStore store) throws
>CommitFailedException {
>@@ -73,7 +71,7 @@ public class KernelNodeBuilderTest {
>         assertFalse("child node x/y/z not should not be present", builder
>                 .child("x").child("y").hasChildNode("z"));
>
>-        store.merge(builder, EmptyHook.INSTANCE, PostCommitHook.EMPTY);
>+        store.merge(builder, EmptyHook.INSTANCE);
>     }
>
> }
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeStateTest.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/o
>rg/apache/jackrabbit/oak/kernel/KernelNodeStateTest.java?rev=1532782&r1=15
>32781&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeStateTest.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeStateTest.java Wed Oct 16 14:43:01 2013
>@@ -33,7 +33,6 @@ import org.apache.jackrabbit.mk.core.Mic
> import org.apache.jackrabbit.oak.api.CommitFailedException;
> import org.apache.jackrabbit.oak.api.PropertyState;
> import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
>-import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
> import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
> import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
> import org.apache.jackrabbit.oak.spi.state.NodeState;
>@@ -58,7 +57,8 @@ public class KernelNodeStateTest {
>         builder.child("y");
>         builder.child("z");
>
>-        state = store.merge(builder, EmptyHook.INSTANCE,
>PostCommitHook.EMPTY);
>+        state = store.merge(builder, EmptyHook.INSTANCE);
>+        state = store.merge(builder, EmptyHook.INSTANCE);
>     }
>
>     @After
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeStoreCacheTest.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/o
>rg/apache/jackrabbit/oak/kernel/KernelNodeStoreCacheTest.java?rev=1532782&
>r1=1532781&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeStoreCacheTest.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/kern
>el/KernelNodeStoreCacheTest.java Wed Oct 16 14:43:01 2013
>@@ -29,7 +29,6 @@ import org.apache.jackrabbit.mk.api.Micr
> import org.apache.jackrabbit.mk.api.MicroKernelException;
> import org.apache.jackrabbit.mk.core.MicroKernelImpl;
> import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
>-import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
> import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
> import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
> import org.apache.jackrabbit.oak.spi.state.NodeState;
>@@ -61,7 +60,7 @@ public class KernelNodeStoreCacheTest {
>         b.child("c");
>         b.child("d");
>         b.child("e");
>-        store.merge(builder, EmptyHook.INSTANCE, PostCommitHook.EMPTY);
>+        store.merge(builder, EmptyHook.INSTANCE);
>     }
>
>     /**
>@@ -138,7 +137,7 @@ public class KernelNodeStoreCacheTest {
>     private void modifyContent() throws Exception {
>         NodeBuilder builder = store.getRoot().builder();
>         builder.child("a").setProperty("foo", "bar");
>-        store.merge(builder, EmptyHook.INSTANCE, PostCommitHook.EMPTY);
>+        store.merge(builder, EmptyHook.INSTANCE);
>     }
>
>     private void readTree(NodeState root) {
>
>Modified:
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/kern
>el/LargeKernelNodeStateTest.java
>URL:
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/o
>rg/apache/jackrabbit/oak/kernel/LargeKernelNodeStateTest.java?rev=1532782&
>r1=1532781&r2=1532782&view=diff
>==========================================================================
>====
>---
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/kern
>el/LargeKernelNodeStateTest.java (original)
>+++
>jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/kern
>el/LargeKernelNodeStateTest.java Wed Oct 16 14:43:01 2013
>@@ -25,7 +25,6 @@ import static junit.framework.Assert.ass
> import org.apache.jackrabbit.mk.core.MicroKernelImpl;
> import org.apache.jackrabbit.oak.api.CommitFailedException;
> import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
>-import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
> import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
> import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
> import org.apache.jackrabbit.oak.spi.state.NodeState;
>@@ -50,7 +49,7 @@ public class LargeKernelNodeStateTest {
>             builder.child("x" + i);
>         }
>
>-        state = store.merge(builder, EmptyHook.INSTANCE,
>PostCommitHook.EMPTY);
>+        state = store.merge(builder, EmptyHook.INSTANCE);
>     }
>
>     @After
>
>


Mime
View raw message