jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mreut...@apache.org
Subject svn commit: r1514457 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/mongomk/ test/java/org/apache/jackrabbit/oak/plugins/mongomk/
Date Thu, 15 Aug 2013 19:46:29 GMT
Author: mreutegg
Date: Thu Aug 15 19:46:28 2013
New Revision: 1514457

URL: http://svn.apache.org/r1514457
Log:
OAK-926: MongoMK: split documents when they are too large
- Move more node specific code from MongoMK to NodeDocument
- Centralize access to revisions and commit root in NodeDocument

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/RevisionContext.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MemoryDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnmergedBranches.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UpdateOp.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/SimpleTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java?rev=1514457&r1=1514456&r2=1514457&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java Thu Aug 15 19:46:28 2013
@@ -102,11 +102,8 @@ class Collision {
         String p = Utils.getPathFromId(document.getId());
         String commitRootPath = null;
         // first check if we can mark the commit with the given revision
-        @SuppressWarnings("unchecked")
-        Map<String, String> revisions = (Map<String, String>) document.get(UpdateOp.REVISIONS);
-        if (revisions != null && revisions.containsKey(revision)) {
-            String value = revisions.get(revision);
-            if (Utils.isCommitted(value)) {
+        if (document.containsRevision(revision)) {
+            if (document.isCommitted(revision)) {
                 // already committed
                 return false;
             }
@@ -116,7 +113,7 @@ class Collision {
         } else {
             // next look at commit root
             @SuppressWarnings("unchecked")
-            Map<String, Integer> commitRoots = (Map<String, Integer>) document.get(UpdateOp.COMMIT_ROOT);
+            Map<String, Integer> commitRoots = (Map<String, Integer>) document.get(NodeDocument.COMMIT_ROOT);
             if (commitRoots != null) {
                 Integer depth = commitRoots.get(revision);
                 if (depth != null) {
@@ -133,13 +130,13 @@ class Collision {
                 Utils.getIdFromPath(commitRootPath), false);
         document = store.find(Collection.NODES, op.getKey());
         // check commit status of revision
-        if (isCommitted(revision, document)) {
+        if (document.isCommitted(revision)) {
             return false;
         }
-        op.setMapEntry(UpdateOp.COLLISIONS, revision, true);
+        op.setMapEntry(NodeDocument.COLLISIONS, revision, true);
         document = store.createOrUpdate(Collection.NODES, op);
         // check again on old document right before our update was applied
-        if (isCommitted(revision, document)) {
+        if (document.isCommitted(revision)) {
             return false;
         }
         // otherwise collision marker was set successfully
@@ -154,18 +151,4 @@ class Collision {
         throw new MicroKernelException("No commit root for revision: "
                 + revision + ", document: " + Utils.formatDocument(document));
     }
-    
-    /**
-     * Returns <code>true</code> if the given <code>revision</code> is marked
-     * committed on the given <code>document</code>.
-     * 
-     * @param revision the revision.
-     * @param document a MongoDB document.
-     * @return <code>true</code> if committed; <code>false</code> otherwise.
-     */
-    private static boolean isCommitted(String revision, Map<String, Object> document) {
-        @SuppressWarnings("unchecked")
-        Map<String, String> revisions = (Map<String, String>) document.get(UpdateOp.REVISIONS);
-        return revisions != null && Utils.isCommitted(revisions.get(revision));
-    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java?rev=1514457&r1=1514456&r2=1514457&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java Thu Aug 15 19:46:28 2013
@@ -88,7 +88,7 @@ public class Commit {
     }
     
     static void setModified(UpdateOp op, Revision revision) {
-        op.set(UpdateOp.MODIFIED, getModified(revision.getTimestamp()));
+        op.set(NodeDocument.MODIFIED, getModified(revision.getTimestamp()));
     }
     
     public static long getModified(long timestamp) {
@@ -110,7 +110,8 @@ public class Commit {
     
     public void touchNode(String path) {
         UpdateOp op = getUpdateOperationForNode(path);
-        op.setMapEntry(UpdateOp.LAST_REV, "" + revision.getClusterId(), revision.toString());        
+        op.setMapEntry(NodeDocument.LAST_REV,
+                String.valueOf(revision.getClusterId()), revision.toString());
     }
     
     void updateProperty(String path, String propertyName, String value) {
@@ -199,15 +200,16 @@ public class Commit {
             if (baseBranchRevision == null) {
                 // only apply _lastRev for trunk commits, _lastRev for
                 // branch commits only become visible on merge
-                op.setMapEntry(UpdateOp.LAST_REV, "" + revision.getClusterId(), revision.toString());
+                op.setMapEntry(NodeDocument.LAST_REV,
+                        String.valueOf(revision.getClusterId()), revision.toString());
             }
             if (op.isNew) {
-                op.setMapEntry(UpdateOp.DELETED, revision.toString(), "false");
+                op.setMapEntry(NodeDocument.DELETED, revision.toString(), "false");
             }
             if (op == commitRoot) {
                 // apply at the end
             } else {
-                op.setMapEntry(UpdateOp.COMMIT_ROOT, revision.toString(), commitRootDepth);
+                op.setMapEntry(NodeDocument.COMMIT_ROOT, revision.toString(), commitRootDepth);
                 if (op.isNew()) {
                     newNodes.add(op);
                 } else {
@@ -219,7 +221,7 @@ public class Commit {
             // no updates and root of commit is also new. that is,
             // it is the root of a subtree added in a commit.
             // so we try to add the root like all other nodes
-            commitRoot.setMapEntry(UpdateOp.REVISIONS, revision.toString(), commitValue);
+            commitRoot.setMapEntry(NodeDocument.REVISIONS, revision.toString(), commitValue);
             newNodes.add(commitRoot);
         }
         try {
@@ -232,7 +234,7 @@ public class Commit {
                         if (op == commitRoot) {
                             // don't write the commit root just yet
                             // (because there might be a conflict)
-                            commitRoot.unsetMapEntry(UpdateOp.REVISIONS, revision.toString());
+                            commitRoot.unsetMapEntry(NodeDocument.REVISIONS, revision.toString());
                         }
                         changedNodes.add(op);
                     }
@@ -241,7 +243,7 @@ public class Commit {
             }
             for (UpdateOp op : changedNodes) {
                 // set commit root on changed nodes
-                op.setMapEntry(UpdateOp.COMMIT_ROOT, revision.toString(), commitRootDepth);
+                op.setMapEntry(NodeDocument.COMMIT_ROOT, revision.toString(), commitRootDepth);
                 opLog.add(op);
                 createOrUpdateNode(store, op);
             }
@@ -250,7 +252,7 @@ public class Commit {
             // first to check if there was a conflict, and only then to commit
             // the revision, with the revision property set)
             if (changedNodes.size() > 0 || !commitRoot.isNew) {
-                commitRoot.setMapEntry(UpdateOp.REVISIONS, revision.toString(), commitValue);
+                commitRoot.setMapEntry(NodeDocument.REVISIONS, revision.toString(), commitValue);
                 opLog.add(commitRoot);
                 createOrUpdateNode(store, commitRoot);
                 operations.put(commitRootPath, commitRoot);
@@ -285,16 +287,19 @@ public class Commit {
         NodeDocument doc = store.createOrUpdate(Collection.NODES, op);
         if (baseRevision != null) {
             final AtomicReference<List<Revision>> collisions = new AtomicReference<List<Revision>>();
-            Revision newestRev = mk.getNewestRevision(doc, revision,
-                    new CollisionHandler() {
-                @Override
-                void concurrentModification(Revision other) {
-                    if (collisions.get() == null) {
-                        collisions.set(new ArrayList<Revision>());
-                    }
-                    collisions.get().add(other);
-                }
-            });
+            Revision newestRev = null;
+            if (doc != null) {
+                newestRev = doc.getNewestRevision(mk, store, revision,
+                        new CollisionHandler() {
+                            @Override
+                            void concurrentModification(Revision other) {
+                                if (collisions.get() == null) {
+                                    collisions.set(new ArrayList<Revision>());
+                                }
+                                collisions.get().add(other);
+                            }
+                        });
+            }
             String conflictMessage = null;
             if (newestRev == null) {
                 if (op.isDelete || !op.isNew) {
@@ -372,7 +377,7 @@ public class Commit {
         }
         // did existence of node change after baseRevision?
         @SuppressWarnings("unchecked")
-        Map<String, String> deleted = (Map<String, String>) nodeMap.get(UpdateOp.DELETED);
+        Map<String, String> deleted = (Map<String, String>) nodeMap.get(NodeDocument.DELETED);
         if (deleted != null) {
             for (Map.Entry<String, String> entry : deleted.entrySet()) {
                 if (mk.isRevisionNewer(Revision.fromString(entry.getKey()), baseRevision)) {
@@ -387,7 +392,7 @@ public class Commit {
             }
             int idx = entry.getKey().indexOf('.');
             String name = entry.getKey().substring(0, idx);
-            if (UpdateOp.DELETED.equals(name)) {
+            if (NodeDocument.DELETED.equals(name)) {
                 // existence of node changed, this always conflicts with
                 // any other concurrent change
                 return true;
@@ -413,7 +418,7 @@ public class Commit {
     private UpdateOp[] splitDocument(Document doc) {
         String id = doc.getId();
         String path = Utils.getPathFromId(id);
-        Long previous = (Long) doc.get(UpdateOp.PREVIOUS);
+        Long previous = (Long) doc.get(NodeDocument.PREVIOUS);
         if (previous == null) {
             previous = 0L;
         } else {
@@ -423,18 +428,19 @@ public class Commit {
         setModified(old, revision);
         UpdateOp main = new UpdateOp(path, id, false);
         setModified(main, revision);
-        main.set(UpdateOp.PREVIOUS, previous);
+        main.set(NodeDocument.PREVIOUS, previous);
         for (Entry<String, Object> e : doc.entrySet()) {
             String key = e.getKey();
             if (key.equals(Document.ID)) {
                 // ok
-            } else if (key.equals(UpdateOp.MODIFIED)) {
+            } else if (key.equals(NodeDocument.MODIFIED)) {
                 // ok
-            } else if (key.equals(UpdateOp.PREVIOUS)) {
+            } else if (key.equals(NodeDocument.PREVIOUS)) {
                 // ok
-            } else if (key.equals(UpdateOp.LAST_REV)) {
+            } else if (key.equals(NodeDocument.LAST_REV)) {
                 // only maintain the lastRev in the main document
-                main.setMap(UpdateOp.LAST_REV, "" + revision.getClusterId(), revision.toString());        
+                main.setMap(NodeDocument.LAST_REV,
+                        String.valueOf(revision.getClusterId()), revision.toString());
             } else {
                 // UpdateOp.DELETED,
                 // UpdateOp.REVISIONS,
@@ -545,7 +551,7 @@ public class Commit {
         removedNodes.add(path);
         UpdateOp op = getUpdateOperationForNode(path);
         op.setDelete(true);
-        op.setMapEntry(UpdateOp.DELETED, revision.toString(), "true");
+        op.setMapEntry(NodeDocument.DELETED, revision.toString(), "true");
     }
 
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentStore.java?rev=1514457&r1=1514456&r2=1514457&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentStore.java Thu Aug 15 19:46:28 2013
@@ -116,10 +116,10 @@ public interface DocumentStore {
      *
      * @param collection the collection
      * @param update the update operation
-     * @return the old document
+     * @return the old document or <code>null</code> if it didn't exist before.
      * @throws MicroKernelException if the operation failed.
      */    
-    @Nonnull
+    @CheckForNull
     <T extends Document> T createOrUpdate(Collection<T> collection, UpdateOp update)
             throws MicroKernelException;
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MemoryDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MemoryDocumentStore.java?rev=1514457&r1=1514456&r2=1514457&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MemoryDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MemoryDocumentStore.java Thu Aug 15 19:46:28 2013
@@ -171,7 +171,7 @@ public class MemoryDocumentStore impleme
         return oldDoc;
     }
 
-    @Nonnull
+    @CheckForNull
     @Override
     public <T extends Document> T createOrUpdate(Collection<T> collection, UpdateOp update)
             throws MicroKernelException {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java?rev=1514457&r1=1514456&r2=1514457&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java Thu Aug 15 19:46:28 2013
@@ -74,7 +74,7 @@ public class MongoDocumentStore implemen
         // the _id field is the primary key, so we don't need to define it
         DBObject index = new BasicDBObject();
         // modification time (descending)
-        index.put(UpdateOp.MODIFIED, -1L);
+        index.put(NodeDocument.MODIFIED, -1L);
         DBObject options = new BasicDBObject();
         options.put("unique", Boolean.FALSE);
         nodes.ensureIndex(index, options);
@@ -140,7 +140,11 @@ public class MongoDocumentStore implemen
                 doc = nodesCache.get(key, new Callable<NodeDocument>() {
                     @Override
                     public NodeDocument call() throws Exception {
-                        return (NodeDocument) findUncached(collection, key);
+                        NodeDocument doc = (NodeDocument) findUncached(collection, key);
+                        if (doc == null) {
+                            doc = NodeDocument.NULL;
+                        }
+                        return doc;
                     }
                 });
                 if (maxCacheAge == 0 || maxCacheAge == Integer.MAX_VALUE) {
@@ -152,8 +156,12 @@ public class MongoDocumentStore implemen
                 // too old: invalidate, try again
                 nodesCache.invalidate(key);
             }
-            //noinspection unchecked
-            return (T) doc;
+            if (doc == NodeDocument.NULL) {
+                return null;
+            } else {
+                //noinspection unchecked
+                return (T) doc;
+            }
         } catch (ExecutionException e) {
             throw new IllegalStateException("Failed to load document with " + key, e);
         }
@@ -331,7 +339,7 @@ public class MongoDocumentStore implemen
         }
     }
 
-    @Nonnull
+    @CheckForNull
     @Override
     public <T extends Document> T createOrUpdate(Collection<T> collection, UpdateOp update)
             throws MicroKernelException {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java?rev=1514457&r1=1514456&r2=1514457&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java Thu Aug 15 19:46:28 2013
@@ -21,15 +21,11 @@ import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -37,6 +33,7 @@ import java.util.concurrent.ExecutionExc
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
@@ -69,7 +66,7 @@ import com.mongodb.DB;
 /**
  * A MicroKernel implementation that stores the data in a MongoDB.
  */
-public class MongoMK implements MicroKernel {
+public class MongoMK implements MicroKernel, RevisionContext {
 
     /**
      * The threshold where special handling for many child node starts.
@@ -337,21 +334,20 @@ public class MongoMK implements MicroKer
     
     void backgroundRead() {
         String id = Utils.getIdFromPath("/");
-        Map<String, Object> map = store.find(Collection.NODES, id, asyncDelay);
-        @SuppressWarnings("unchecked")
-        Map<String, String> lastRevMap = (Map<String, String>) map.get(UpdateOp.LAST_REV);
+        NodeDocument doc = store.find(Collection.NODES, id, asyncDelay);
+        Map<Integer, Revision> lastRevMap = doc.getLastRev();
         
         boolean hasNewRevisions = false;
         // the (old) head occurred first
         Revision headSeen = Revision.newRevision(0);
         // then we saw this new revision (from another cluster node) 
         Revision otherSeen = Revision.newRevision(0);
-        for (Entry<String, String> e : lastRevMap.entrySet()) {
-            int machineId = Integer.parseInt(e.getKey());
+        for (Entry<Integer, Revision> e : lastRevMap.entrySet()) {
+            int machineId = e.getKey();
             if (machineId == clusterId) {
                 continue;
             }
-            Revision r = Revision.fromString(e.getValue());
+            Revision r = e.getValue();
             Revision last = lastKnownRevision.get(machineId);
             if (last == null || r.compareRevisionTime(last) > 0) {
                 lastKnownRevision.put(machineId, r);
@@ -467,30 +463,6 @@ public class MongoMK implements MicroKer
         }
     }
     
-    private boolean includeRevision(Revision x, Revision requestRevision) {
-        Branch b = branches.getBranch(x);
-        if (b != null) {
-            // only include if requested revision is also a branch revision
-            // with a history including x
-            if (b.containsCommit(requestRevision)) {
-                // in same branch, include if the same revision or
-                // requestRevision is newer
-                return x.equals(requestRevision) || isRevisionNewer(requestRevision, x);
-            }
-            // not part of branch identified by requestedRevision
-            return false;
-        }
-        // assert: x is not a branch commit
-        b = branches.getBranch(requestRevision);
-        if (b != null) {
-            // reset requestRevision to branch base revision to make
-            // sure we don't include revisions committed after branch
-            // was created
-            requestRevision = b.getBase(requestRevision);
-        }
-        return revisionComparator.compare(requestRevision, x) >= 0;
-    }
-    
     /**
      * Checks that revision x is newer than another revision.
      * 
@@ -502,129 +474,10 @@ public class MongoMK implements MicroKer
         return revisionComparator.compare(x, previous) > 0;
     }
 
-    /**
-     * Checks if the revision is valid for the given document. A revision is
-     * considered valid if the given document is the root of the commit, or the
-     * commit root has the revision set. This method may read further documents
-     * to perform this check.
-     * This method also takes pending branches into consideration.
-     * The <code>readRevision</code> identifies the read revision used by the
-     * client, which may be a branch revision logged in {@link #branches}.
-     * The revision <code>rev</code> is valid if it is part of the branch
-     * history of <code>readRevision</code>.
-     *
-     * @param rev     revision to check.
-     * @param readRevision the read revision of the client.
-     * @param doc the document to check.
-     * @param validRevisions set of revisions already checked against
-     *                       <code>readRevision</code> and considered valid.
-     * @return <code>true</code> if the revision is valid; <code>false</code>
-     *         otherwise.
-     */
-    boolean isValidRevision(@Nonnull Revision rev,
-                            @Nonnull Revision readRevision,
-                            @Nonnull Document doc,
-                            @Nonnull Set<Revision> validRevisions) {
-        if (validRevisions.contains(rev)) {
-            return true;
-        }
-        @SuppressWarnings("unchecked")
-        Map<String, String> revisions = (Map<String, String>) doc.get(UpdateOp.REVISIONS);
-        if (isCommitted(rev, readRevision, revisions)) {
-            validRevisions.add(rev);
-            return true;
-        } else if (revisions != null && revisions.containsKey(rev.toString())) {
-            // rev is in revisions map of this node, but not committed
-            // no need to check _commitRoot field
-            return false;
-        }
-        // check commit root
-        @SuppressWarnings("unchecked")
-        Map<String, Integer> commitRoot = (Map<String, Integer>) doc.get(UpdateOp.COMMIT_ROOT);
-        String commitRootPath = null;
-        if (commitRoot != null) {
-            Integer depth = commitRoot.get(rev.toString());
-            if (depth != null) {
-                String p = Utils.getPathFromId(doc.getId());
-                commitRootPath = PathUtils.getAncestorPath(p, PathUtils.getDepth(p) - depth);
-            }
-        }
-        if (commitRootPath == null) {
-            // shouldn't happen, either node is commit root for a revision
-            // or has a reference to the commit root
-            LOG.warn("Node {} does not have commit root reference for revision {}",
-                    doc.getId(), rev);
-            LOG.warn(doc.toString());
-            return false;
-        }
-        // get root of commit
-        doc = store.find(Collection.NODES,
-                Utils.getIdFromPath(commitRootPath));
-        if (doc == null) {
-            return false;
-        }
-        @SuppressWarnings("unchecked")
-        Map<String, String> rootRevisions = (Map<String, String>) doc.get(UpdateOp.REVISIONS);
-        if (isCommitted(rev, readRevision, rootRevisions)) {
-            validRevisions.add(rev);
-            return true;
-        }
-        return false;
-    }
-    
     public long getSplitDocumentAgeMillis() {
         return this.splitDocumentAgeMillis;
     }
 
-    /**
-     * Returns <code>true</code> if the given revision
-     * {@link Utils#isCommitted(String)} in the revisions map and is visible
-     * from the <code>readRevision</code>.
-     *
-     * @param revision  the revision to check.
-     * @param readRevision the read revision.
-     * @param revisions the revisions map, or <code>null</code> if none is set.
-     * @return <code>true</code> if the revision is committed, otherwise
-     *         <code>false</code>.
-     */
-    private boolean isCommitted(@Nonnull Revision revision,
-                                @Nonnull Revision readRevision,
-                                @Nullable Map<String, String> revisions) {
-        if (Commit.PURGE_OLD_REVISIONS) {
-            long diff = Revision.getTimestampDifference(Revision.getCurrentTimestamp(), revision.getTimestamp());
-            if (diff >= splitDocumentAgeMillis) {
-                return true;
-            }
-        }
-        if (revision.equals(readRevision)) {
-            return true;
-        }
-        if (revisions == null) {
-            return false;
-        }
-        String value = revisions.get(revision.toString());
-        if (value == null) {
-            return false;
-        }
-        if (Utils.isCommitted(value)) {
-            // resolve commit revision
-            revision = Utils.resolveCommitRevision(revision, value);
-            if (branches.getBranch(readRevision) == null) {
-                // readRevision is not from a branch
-                // compare resolved revision as is
-                return !isRevisionNewer(revision, readRevision);
-            }
-        } else {
-            // branch commit
-            if (Revision.fromString(value).getClusterId() != clusterId) {
-                // this is an unmerged branch commit from another cluster node,
-                // hence never visible to us
-                return false;
-            }
-        }
-        return includeRevision(revision, readRevision);
-    }
-
     public Children getChildren(final String path, final Revision rev, final int limit)  throws MicroKernelException {
         checkRevisionAge(rev, path);
         String key = path + "@" + rev;
@@ -668,7 +521,7 @@ public class MongoMK implements MicroKer
         }
         for (NodeDocument doc : list) {
             // filter out deleted children
-            if (getLiveRevision(doc, rev, validRevisions) == null) {
+            if (doc.getLiveRevision(this, store, rev, validRevisions) == null) {
                 continue;
             }
             // TODO put the whole node in the cache
@@ -678,120 +531,16 @@ public class MongoMK implements MicroKer
         return c;
     }
 
+    @CheckForNull
     private Node readNode(String path, Revision readRevision) {
         String id = Utils.getIdFromPath(path);
-        Document doc = store.find(Collection.NODES, id);
+        NodeDocument doc = store.find(Collection.NODES, id);
         if (doc == null) {
             return null;
         }
-        Revision min = getLiveRevision(doc, readRevision);
-        if (min == null) {
-            // deleted
-            return null;
-        }
-        Node n = new Node(path, readRevision);
-        for (String key : doc.keySet()) {
-            if (!Utils.isPropertyName(key)) {
-                continue;
-            }
-            Object v = doc.get(key);
-            @SuppressWarnings("unchecked")
-            Map<String, String> valueMap = (Map<String, String>) v;
-            if (valueMap != null) {
-                if (valueMap instanceof TreeMap) {
-                    // TODO instanceof should be avoided
-                    // use descending keys (newest first) if map is sorted
-                    valueMap = ((TreeMap<String, String>) valueMap).descendingMap();
-                }
-                String value = getLatestValue(valueMap, min, readRevision);
-                String propertyName = Utils.unescapePropertyName(key);
-                n.setProperty(propertyName, value);
-            }
-        }
-
-        // when was this node last modified?
-        Branch branch = branches.getBranch(readRevision);
-        Revision lastRevision = null;
-        Map<Integer, Revision> lastRevs = new HashMap<Integer, Revision>();
-        @SuppressWarnings("unchecked")
-        Map<String, String> valueMap = (Map<String, String>) doc.get(UpdateOp.LAST_REV);
-        if (valueMap != null) {
-            for (String clusterId : valueMap.keySet()) {
-                lastRevs.put(Integer.parseInt(clusterId),
-                        Revision.fromString(valueMap.get(clusterId)));
-            }
-        }
-        // overlay with unsaved last modified from this instance
-        Revision lastModified = unsavedLastRevisions.get(path);
-        if (lastModified != null) {
-            lastRevs.put(clusterId, lastModified);
-        }
-        // filter out revisions newer than branch base
-        if (branch != null) {
-            Revision base = branch.getBase(readRevision);
-            for (Iterator<Revision> it = lastRevs.values().iterator(); it
-                    .hasNext();) {
-                Revision r = it.next();
-                if (isRevisionNewer(r, base)) {
-                    it.remove();
-                }
-            }
-        }
-        for (Revision r : lastRevs.values()) {
-            // ignore if newer than readRevision
-            if (isRevisionNewer(r, readRevision)) {
-                continue;
-            }
-            if (lastRevision == null || isRevisionNewer(r, lastRevision)) {
-                lastRevision = r;
-            }
-        }
-        if (branch != null) {
-            // read from a branch
-            // -> possibly overlay with unsaved last revs from branch
-            Revision r = branch.getUnsavedLastRevision(path, readRevision);
-            if (r != null) {
-                lastRevision = r;
-            }
-        }
-        if (lastRevision == null) {
-            // use readRevision if none found
-            lastRevision = readRevision;
-        }
-        n.setLastRevision(lastRevision);
-        return n;
+        return doc.getNodeAtRevision(this, store, readRevision);
     }
     
-    /**
-     * Get the latest property value that is larger or equal the min revision,
-     * and smaller or equal the max revision.
-     * 
-     * @param valueMap the revision-value map
-     * @param min the minimum revision (null meaning unlimited)
-     * @param max the maximum revision
-     * @return the value, or null if not found
-     */
-    private String getLatestValue(@Nonnull Map<String, String> valueMap,
-                                  @Nullable Revision min,
-                                  @Nonnull Revision max) {
-        String value = null;
-        Revision latestRev = null;
-        for (String r : valueMap.keySet()) {
-            Revision propRev = Revision.fromString(r);
-            if (min != null && isRevisionNewer(min, propRev)) {
-                continue;
-            }
-            if (latestRev != null && !isRevisionNewer(propRev, latestRev)) {
-                continue;
-            }
-            if (includeRevision(propRev, max)) {
-                latestRev = propRev;
-                value = valueMap.get(r);
-            }
-        }
-        return value;
-    }
-
     @Override
     public String getHeadRevision() throws MicroKernelException {
         return headRevision.toString();
@@ -916,7 +665,7 @@ public class MongoMK implements MicroKer
         String fromKey = Utils.getKeyLowerLimit(path);
         String toKey = Utils.getKeyUpperLimit(path);
         List<NodeDocument> list = store.query(Collection.NODES, fromKey, toKey,
-                UpdateOp.MODIFIED, minValue, Integer.MAX_VALUE);
+                NodeDocument.MODIFIED, minValue, Integer.MAX_VALUE);
         for (NodeDocument doc : list) {
             String id = doc.getId();
             String p = Utils.getPathFromId(id);
@@ -1152,6 +901,58 @@ public class MongoMK implements MicroKer
         return rev.toString();
     }
 
+    //------------------------< RevisionContext >-------------------------------
+
+    @Override
+    public UnmergedBranches getBranches() {
+        return branches;
+    }
+
+    @Override
+    public UnsavedModifications getPendingModifications() {
+        return unsavedLastRevisions;
+    }
+
+    @Override
+    public RevisionComparator getRevisionComparator() {
+        return revisionComparator;
+    }
+
+    @Override
+    public void publishRevision(Revision foreignRevision, Revision changeRevision) {
+        if (revisionComparator.compare(headRevision, foreignRevision) >= 0) {
+            // already visible
+            return;
+        }
+        int clusterNodeId = foreignRevision.getClusterId();
+        if (clusterNodeId == this.clusterId) {
+            return;
+        }
+        // the (old) head occurred first
+        Revision headSeen = Revision.newRevision(0);
+        // then we saw this new revision (from another cluster node)
+        Revision otherSeen = Revision.newRevision(0);
+        // and after that, the current change
+        Revision changeSeen = Revision.newRevision(0);
+        revisionComparator.add(foreignRevision, otherSeen);
+        // TODO invalidating the whole cache is not really needed,
+        // but how to ensure we invalidate the right part of the cache?
+        // possibly simply wait for the background thread to pick
+        // up the changes, but this depends on how often this method is called
+        store.invalidateCache();
+        // the latest revisions of the current cluster node
+        // happened before the latest revisions of other cluster nodes
+        revisionComparator.add(headRevision, headSeen);
+        revisionComparator.add(changeRevision, changeSeen);
+        // the head revision is after other revisions
+        headRevision = Revision.newRevision(clusterId);
+    }
+
+    @Override
+    public int getClusterId() {
+        return clusterId;
+    }
+
     private void copyNode(String sourcePath, String targetPath, Revision baseRev, Commit commit) {
         moveOrCopyNode(false, sourcePath, targetPath, baseRev, commit);
     }
@@ -1221,183 +1022,6 @@ public class MongoMK implements MicroKer
         nodeCache.invalidate(path + "@" + rev);
     }
 
-    /**
-     * Get the earliest (oldest) revision where the node was alive at or before
-     * the provided revision, if the node was alive at the given revision.
-     * 
-     * @param doc the document
-     * @param maxRev the maximum revision to return
-     * @return the earliest revision, or null if the node is deleted at the
-     *         given revision
-     */
-    private Revision getLiveRevision(Document doc,
-                                     Revision maxRev) {
-        return getLiveRevision(doc, maxRev, new HashSet<Revision>());
-    }
-
-    /**
-     * Get the earliest (oldest) revision where the node was alive at or before
-     * the provided revision, if the node was alive at the given revision.
-     * 
-     * @param doc the document
-     * @param maxRev the maximum revision to return
-     * @param validRevisions the set of revisions already checked against maxRev
-     *            and considered valid.
-     * @return the earliest revision, or null if the node is deleted at the
-     *         given revision
-     */
-    private Revision getLiveRevision(Document doc,
-            Revision maxRev, Set<Revision> validRevisions) {
-        @SuppressWarnings("unchecked")
-        Map<String, String> valueMap = (Map<String, String>) doc
-                .get(UpdateOp.DELETED);
-        if (valueMap == null) {
-            return null;
-        }
-        // first, search the newest deleted revision
-        Revision deletedRev = null;
-        if (valueMap instanceof TreeMap) {
-            // TODO instanceof should be avoided
-            // use descending keys (newest first) if map is sorted
-            valueMap = ((TreeMap<String, String>) valueMap).descendingMap();
-        }
-        for (String r : valueMap.keySet()) {
-            String value = valueMap.get(r);
-            if (!"true".equals(value)) {
-                // only look at deleted revisions now
-                continue;
-            }
-            Revision propRev = Revision.fromString(r);
-            if (isRevisionNewer(propRev, maxRev)
-                    || !isValidRevision(propRev, maxRev, doc, validRevisions)) {
-                continue;
-            }
-            if (deletedRev == null || isRevisionNewer(propRev, deletedRev)) {
-                deletedRev = propRev;
-            }
-        }
-        // now search the oldest non-deleted revision that is newer than the
-        // newest deleted revision
-        Revision liveRev = null;
-        for (String r : valueMap.keySet()) {
-            String value = valueMap.get(r);
-            if ("true".equals(value)) {
-                // ignore deleted revisions
-                continue;
-            }
-            Revision propRev = Revision.fromString(r);
-            if (deletedRev != null && isRevisionNewer(deletedRev, propRev)) {
-                // the node was deleted later on
-                continue;
-            }
-            if (isRevisionNewer(propRev, maxRev)
-                    || !isValidRevision(propRev, maxRev, doc, validRevisions)) {
-                continue;
-            }
-            if (liveRev == null || isRevisionNewer(liveRev, propRev)) {
-                liveRev = propRev;
-            }
-        }
-        return liveRev;
-    }
-    
-    /**
-     * Get the revision of the latest change made to this node.
-     * 
-     * @param doc the document
-     * @param changeRev the revision of the current change
-     * @param handler the conflict handler, which is called for concurrent changes
-     *                preceding <code>before</code>.
-     * @return the revision, or null if deleted
-     */
-    @SuppressWarnings("unchecked")
-    @Nullable Revision getNewestRevision(Document doc,
-                                         Revision changeRev, CollisionHandler handler) {
-        if (doc == null) {
-            return null;
-        }
-        SortedSet<String> revisions = new TreeSet<String>(Collections.reverseOrder());
-        if (doc.containsKey(UpdateOp.REVISIONS)) {
-            revisions.addAll(((Map<String, String>) doc.get(UpdateOp.REVISIONS)).keySet());
-        }
-        if (doc.containsKey(UpdateOp.COMMIT_ROOT)) {
-            revisions.addAll(((Map<String, Integer>) doc.get(UpdateOp.COMMIT_ROOT)).keySet());
-        }
-        Map<String, String> deletedMap = (Map<String, String>) doc
-                .get(UpdateOp.DELETED);
-        if (deletedMap != null) {
-            revisions.addAll(deletedMap.keySet());
-        }
-        Revision newestRev = null;
-        for (String r : revisions) {
-            Revision propRev = Revision.fromString(r);
-            if (isRevisionNewer(propRev, changeRev)) {
-                // we have seen a previous change from another cluster node
-                // (which might be conflicting or not) - we need to make
-                // sure this change is visible from now on
-                // TODO verify this is really needed
-                publishRevision(propRev, changeRev);
-            }
-            if (newestRev == null || isRevisionNewer(propRev, newestRev)) {
-                if (!propRev.equals(changeRev)) {
-                    if (!isValidRevision(
-                            propRev, changeRev, doc, new HashSet<Revision>())) {
-                        handler.concurrentModification(propRev);
-                    } else {
-                        newestRev = propRev;
-                    }
-                }
-            }
-        }
-        if (newestRev == null) {
-            return null;
-        }
-        if (deletedMap != null) {
-            String value = deletedMap.get(newestRev.toString());
-            if ("true".equals(value)) {
-                // deleted in the newest revision
-                return null;
-            }
-        }
-        return newestRev;
-    }
-    
-    /**
-     * Ensure the revision visible from now on, possibly by updating the head
-     * revision, so that the changes that occurred are visible.
-     * 
-     * @param foreignRevision the revision from another cluster node
-     * @param changeRevision the local revision that is sorted after the foreign revision
-     */
-    private void publishRevision(Revision foreignRevision, Revision changeRevision) {  
-        if (revisionComparator.compare(headRevision, foreignRevision) >= 0) {
-            // already visible
-            return;
-        }
-        int clusterNodeId = foreignRevision.getClusterId();
-        if (clusterNodeId == this.clusterId) {
-            return;
-        }
-        // the (old) head occurred first
-        Revision headSeen = Revision.newRevision(0);
-        // then we saw this new revision (from another cluster node) 
-        Revision otherSeen = Revision.newRevision(0);
-        // and after that, the current change
-        Revision changeSeen = Revision.newRevision(0);
-        revisionComparator.add(foreignRevision, otherSeen);
-        // TODO invalidating the whole cache is not really needed,
-        // but how to ensure we invalidate the right part of the cache?
-        // possibly simply wait for the background thread to pick
-        // up the changes, but this depends on how often this method is called
-        store.invalidateCache();
-        // the latest revisions of the current cluster node
-        // happened before the latest revisions of other cluster nodes
-        revisionComparator.add(headRevision, headSeen);
-        revisionComparator.add(changeRevision, changeSeen);
-        // the head revision is after other revisions
-        headRevision = Revision.newRevision(clusterId);
-    }
-    
     private static String stripBranchRevMarker(String revisionId) {
         if (revisionId.startsWith("b")) {
             return revisionId.substring(1);
@@ -1450,8 +1074,8 @@ public class MongoMK implements MicroKer
         Commit.setModified(op, mergeCommit);
         if (b != null) {
             for (Revision rev : b.getCommits()) {
-                op.setMapEntry(UpdateOp.REVISIONS, rev.toString(), "c-" + mergeCommit.toString());
-                op.containsMapEntry(UpdateOp.COLLISIONS, rev.toString(), false);
+                op.setMapEntry(NodeDocument.REVISIONS, rev.toString(), "c-" + mergeCommit.toString());
+                op.containsMapEntry(NodeDocument.COLLISIONS, rev.toString(), false);
             }
             if (store.findAndUpdate(Collection.NODES, op) != null) {
                 // remove from branchCommits map after successful update
@@ -1617,10 +1241,6 @@ public class MongoMK implements MicroKer
         stopBackground = true;
     }
     
-    RevisionComparator getRevisionComparator() {
-        return revisionComparator;
-    }
-    
     /**
      * A background thread.
      */

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java?rev=1514457&r1=1514456&r2=1514457&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java Thu Aug 15 19:46:28 2013
@@ -78,7 +78,7 @@ public class Node implements CacheValue 
         UpdateOp op = new UpdateOp(path, id, isNew);
         op.set(Document.ID, id);
         Commit.setModified(op, rev);
-        op.setMapEntry(UpdateOp.DELETED, rev.toString(), "false");
+        op.setMapEntry(NodeDocument.DELETED, rev.toString(), "false");
         for (String p : properties.keySet()) {
             String key = Utils.escapePropertyName(p);
             op.setMapEntry(key, rev.toString(), properties.get(p));

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java?rev=1514457&r1=1514456&r2=1514457&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java Thu Aug 15 19:46:28 2013
@@ -16,24 +16,542 @@
  */
 package org.apache.jackrabbit.oak.plugins.mongomk;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 import org.apache.jackrabbit.oak.cache.CacheValue;
+import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * A document storing data about a node.
  */
 public class NodeDocument extends Document implements CacheValue {
 
+    private static final Logger log = LoggerFactory.getLogger(NodeDocument.class);
+
     private static final long serialVersionUID = 6713219541688419314L;
 
+    /**
+     * Marker document, which indicates the document does not exist.
+     */
+    public static final NodeDocument NULL = new NodeDocument();
+
+    /**
+     * The list of revision to root commit depth mappings to find out if a
+     * revision is actually committed.
+     */
+    static final String COMMIT_ROOT = "_commitRoot";
+
+    /**
+     * The number of previous documents (documents that contain old revisions of
+     * this node). This property is only set if multiple documents per node
+     * exist. This is the case when a node is updated very often in a short
+     * time, such that the document gets very big.
+     */
+    static final String PREVIOUS = "_prev";
+
+    /**
+     * Whether this node is deleted. Key: revision, value: true/false.
+     */
+    static final String DELETED = "_deleted";
+
+    /**
+     * Revision collision markers set by commits with modifications, which
+     * overlap with un-merged branch commits.
+     * Key: revision, value:
+     */
+    static final String COLLISIONS = "_collisions";
+
+    /**
+     * The modified time (5 second resolution).
+     */
+    static final String MODIFIED = "_modified";
+
+    /**
+     * The list of recent revisions for this node, where this node is the
+     * root of the commit. Key: revision, value: true or the base revision of an
+     * un-merged branch commit.
+     */
+    static final String REVISIONS = "_revisions";
+
+    /**
+     * The last revision. Key: machine id, value: revision.
+     */
+    static final String LAST_REV = "_lastRev";
+
     private final long time = System.currentTimeMillis();
 
+    /**
+     * @return the system time this object was created.
+     */
     public final long getCreated() {
         return time;
     }
 
+    /**
+     * @return a map of the last known revision for each clusterId.
+     */
+    @Nonnull
+    public Map<Integer, Revision> getLastRev() {
+        Map<Integer, Revision> map = Maps.newHashMap();
+        @SuppressWarnings("unchecked")
+        Map<String, String> valueMap = (Map<String, String>) get(LAST_REV);
+        if (valueMap != null) {
+            for (Map.Entry<String, String> e : valueMap.entrySet()) {
+                int clusterId = Integer.parseInt(e.getKey());
+                Revision rev = Revision.fromString(e.getValue());
+                map.put(clusterId, rev);
+            }
+        }
+        return map;
+    }
+
+    /**
+     * Returns <code>true</code> if the given <code>revision</code> is marked
+     * committed in <strong>this</strong> document.
+     *
+     * @param revision the revision.
+     * @return <code>true</code> if committed; <code>false</code> otherwise.
+     */
+    public boolean isCommitted(@Nonnull String revision) {
+        checkNotNull(revision);
+        @SuppressWarnings("unchecked")
+        Map<String, String> revisions = (Map<String, String>) get(REVISIONS);
+        return revisions != null && Utils.isCommitted(revisions.get(revision));
+    }
+
+    /**
+     * Returns <code>true</code> if this document contains an entry for the
+     * given <code>revision</code> in the {@link #REVISIONS} map. Please note
+     * that an entry in the {@link #REVISIONS} map does not necessarily mean
+     * the the revision is committed. Use {@link #isCommitted(String)} to get
+     * the commit state of a revision.
+     *
+     * @param revision the revision to check.
+     * @return <code>true</code> if this document contains the given revision.
+     */
+    public boolean containsRevision(@Nonnull String revision) {
+        checkNotNull(revision);
+        @SuppressWarnings("unchecked")
+        Map<String, String> revisions = (Map<String, String>) get(REVISIONS);
+        return revisions != null && revisions.containsKey(revision);
+    }
+
+    /**
+     * Get the revision of the latest change made to this node.
+     *
+     * @param changeRev the revision of the current change
+     * @param handler the conflict handler, which is called for concurrent changes
+     *                preceding <code>before</code>.
+     * @return the revision, or null if deleted
+     */
+    @SuppressWarnings("unchecked")
+    @Nullable
+    public Revision getNewestRevision(RevisionContext context,
+                                      DocumentStore store,
+                                      Revision changeRev,
+                                      CollisionHandler handler) {
+        SortedSet<String> revisions = new TreeSet<String>(Collections.reverseOrder());
+        if (containsKey(REVISIONS)) {
+            revisions.addAll(((Map<String, String>) get(REVISIONS)).keySet());
+        }
+        if (containsKey(COMMIT_ROOT)) {
+            revisions.addAll(((Map<String, Integer>) get(COMMIT_ROOT)).keySet());
+        }
+        Map<String, String> deletedMap = (Map<String, String>)get(DELETED);
+        if (deletedMap != null) {
+            revisions.addAll(deletedMap.keySet());
+        }
+        Revision newestRev = null;
+        for (String r : revisions) {
+            Revision propRev = Revision.fromString(r);
+            if (isRevisionNewer(context, propRev, changeRev)) {
+                // we have seen a previous change from another cluster node
+                // (which might be conflicting or not) - we need to make
+                // sure this change is visible from now on
+                // TODO verify this is really needed
+                context.publishRevision(propRev, changeRev);
+            }
+            if (newestRev == null || isRevisionNewer(context, propRev, newestRev)) {
+                if (!propRev.equals(changeRev)) {
+                    if (!isValidRevision(context, store,
+                            propRev, changeRev, new HashSet<Revision>())) {
+                        handler.concurrentModification(propRev);
+                    } else {
+                        newestRev = propRev;
+                    }
+                }
+            }
+        }
+        if (newestRev == null) {
+            return null;
+        }
+        if (deletedMap != null) {
+            String value = deletedMap.get(newestRev.toString());
+            if ("true".equals(value)) {
+                // deleted in the newest revision
+                return null;
+            }
+        }
+        return newestRev;
+    }
+
+    /**
+     * Checks if the revision is valid for the given document. A revision is
+     * considered valid if the given document is the root of the commit, or the
+     * commit root has the revision set. This method may read further documents
+     * to perform this check.
+     * This method also takes pending branches into consideration.
+     * The <code>readRevision</code> identifies the read revision used by the
+     * client, which may be a branch revision logged in {@link RevisionContext#getBranches()}.
+     * The revision <code>rev</code> is valid if it is part of the branch
+     * history of <code>readRevision</code>.
+     *
+     * @param rev     revision to check.
+     * @param readRevision the read revision of the client.
+     * @param validRevisions set of revisions already checked against
+     *                       <code>readRevision</code> and considered valid.
+     * @return <code>true</code> if the revision is valid; <code>false</code>
+     *         otherwise.
+     */
+    boolean isValidRevision(@Nonnull RevisionContext context,
+                            @Nonnull DocumentStore store,
+                            @Nonnull Revision rev,
+                            @Nonnull Revision readRevision,
+                            @Nonnull Set<Revision> validRevisions) {
+        if (validRevisions.contains(rev)) {
+            return true;
+        }
+        @SuppressWarnings("unchecked")
+        Map<String, String> revisions = (Map<String, String>) get(NodeDocument.REVISIONS);
+        if (isCommitted(context, rev, readRevision, revisions)) {
+            validRevisions.add(rev);
+            return true;
+        } else if (revisions != null && revisions.containsKey(rev.toString())) {
+            // rev is in revisions map of this node, but not committed
+            // no need to check _commitRoot field
+            return false;
+        }
+        // check commit root
+        @SuppressWarnings("unchecked")
+        Map<String, Integer> commitRoot = (Map<String, Integer>) get(NodeDocument.COMMIT_ROOT);
+        String commitRootPath = null;
+        if (commitRoot != null) {
+            Integer depth = commitRoot.get(rev.toString());
+            if (depth != null) {
+                String p = Utils.getPathFromId(getId());
+                commitRootPath = PathUtils.getAncestorPath(p, PathUtils.getDepth(p) - depth);
+            }
+        }
+        if (commitRootPath == null) {
+            // shouldn't happen, either node is commit root for a revision
+            // or has a reference to the commit root
+            log.warn("Node {} does not have commit root reference for revision {}",
+                    getId(), rev);
+            return false;
+        }
+        // get root of commit
+        NodeDocument doc = store.find(Collection.NODES,
+                Utils.getIdFromPath(commitRootPath));
+        if (doc == null) {
+            return false;
+        }
+        @SuppressWarnings("unchecked")
+        Map<String, String> rootRevisions = (Map<String, String>) doc.get(NodeDocument.REVISIONS);
+        if (isCommitted(context, rev, readRevision, rootRevisions)) {
+            validRevisions.add(rev);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Returns a {@link Node} as seen at the given <code>readRevision</code>.
+     *
+     * @param context      the revision context.
+     * @param readRevision the read revision.
+     * @return the node or <code>null</code> if the node doesn't exist at the
+     *         given read revision.
+     */
+    @CheckForNull
+    public Node getNodeAtRevision(RevisionContext context,
+                                  DocumentStore store,
+                                  Revision readRevision) {
+        Revision min = getLiveRevision(context, store, readRevision, new HashSet<Revision>());
+        if (min == null) {
+            // deleted
+            return null;
+        }
+        String path = Utils.getPathFromId(getId());
+        Node n = new Node(path, readRevision);
+        for (String key : keySet()) {
+            if (!Utils.isPropertyName(key)) {
+                continue;
+            }
+            Object v = get(key);
+            @SuppressWarnings("unchecked")
+            Map<String, String> valueMap = (Map<String, String>) v;
+            if (valueMap != null) {
+                if (valueMap instanceof TreeMap) {
+                    // TODO instanceof should be avoided
+                    // use descending keys (newest first) if map is sorted
+                    valueMap = ((TreeMap<String, String>) valueMap).descendingMap();
+                }
+                String value = getLatestValue(context, valueMap, min, readRevision);
+                String propertyName = Utils.unescapePropertyName(key);
+                n.setProperty(propertyName, value);
+            }
+        }
+
+        // when was this node last modified?
+        Branch branch = context.getBranches().getBranch(readRevision);
+        Revision lastRevision = null;
+        Map<Integer, Revision> lastRevs = Maps.newHashMap(getLastRev());
+        // overlay with unsaved last modified from this instance
+        Revision lastModified = context.getPendingModifications().get(path);
+        if (lastModified != null) {
+            lastRevs.put(context.getClusterId(), lastModified);
+        }
+        // filter out revisions newer than branch base
+        if (branch != null) {
+            Revision base = branch.getBase(readRevision);
+            for (Iterator<Revision> it = lastRevs.values().iterator(); it
+                    .hasNext();) {
+                Revision r = it.next();
+                if (isRevisionNewer(context, r, base)) {
+                    it.remove();
+                }
+            }
+        }
+        for (Revision r : lastRevs.values()) {
+            // ignore if newer than readRevision
+            if (isRevisionNewer(context, r, readRevision)) {
+                continue;
+            }
+            if (lastRevision == null || isRevisionNewer(context, r, lastRevision)) {
+                lastRevision = r;
+            }
+        }
+        if (branch != null) {
+            // read from a branch
+            // -> possibly overlay with unsaved last revs from branch
+            Revision r = branch.getUnsavedLastRevision(path, readRevision);
+            if (r != null) {
+                lastRevision = r;
+            }
+        }
+        if (lastRevision == null) {
+            // use readRevision if none found
+            lastRevision = readRevision;
+        }
+        n.setLastRevision(lastRevision);
+        return n;
+    }
+
+    /**
+     * Get the earliest (oldest) revision where the node was alive at or before
+     * the provided revision, if the node was alive at the given revision.
+     *
+     * @param maxRev the maximum revision to return
+     * @param validRevisions the set of revisions already checked against maxRev
+     *            and considered valid.
+     * @return the earliest revision, or null if the node is deleted at the
+     *         given revision
+     */
+    public Revision getLiveRevision(RevisionContext context,
+                                    DocumentStore store,
+                                    Revision maxRev,
+                                    Set<Revision> validRevisions) {
+        @SuppressWarnings("unchecked")
+        Map<String, String> valueMap = (Map<String, String>) get(NodeDocument.DELETED);
+        if (valueMap == null) {
+            return null;
+        }
+        // first, search the newest deleted revision
+        Revision deletedRev = null;
+        if (valueMap instanceof TreeMap) {
+            // TODO instanceof should be avoided
+            // use descending keys (newest first) if map is sorted
+            valueMap = ((TreeMap<String, String>) valueMap).descendingMap();
+        }
+        for (String r : valueMap.keySet()) {
+            String value = valueMap.get(r);
+            if (!"true".equals(value)) {
+                // only look at deleted revisions now
+                continue;
+            }
+            Revision propRev = Revision.fromString(r);
+            if (isRevisionNewer(context, propRev, maxRev)
+                    || !isValidRevision(context, store, propRev, maxRev, validRevisions)) {
+                continue;
+            }
+            if (deletedRev == null || isRevisionNewer(context, propRev, deletedRev)) {
+                deletedRev = propRev;
+            }
+        }
+        // now search the oldest non-deleted revision that is newer than the
+        // newest deleted revision
+        Revision liveRev = null;
+        for (String r : valueMap.keySet()) {
+            String value = valueMap.get(r);
+            if ("true".equals(value)) {
+                // ignore deleted revisions
+                continue;
+            }
+            Revision propRev = Revision.fromString(r);
+            if (deletedRev != null && isRevisionNewer(context, deletedRev, propRev)) {
+                // the node was deleted later on
+                continue;
+            }
+            if (isRevisionNewer(context, propRev, maxRev)
+                    || !isValidRevision(context, store, propRev, maxRev, validRevisions)) {
+                continue;
+            }
+            if (liveRev == null || isRevisionNewer(context, liveRev, propRev)) {
+                liveRev = propRev;
+            }
+        }
+        return liveRev;
+    }
+
+    //-----------------------------< CacheValue >-------------------------------
+
     @Override
     public int getMemory() {
         return Utils.estimateMemoryUsage(this);
     }
+
+    //----------------------------< internal >----------------------------------
+
+    /**
+     * Checks that revision x is newer than another revision.
+     *
+     * @param x the revision to check
+     * @param previous the presumed earlier revision
+     * @return true if x is newer
+     */
+    private static boolean isRevisionNewer(@Nonnull RevisionContext context,
+                                           @Nonnull Revision x,
+                                           @Nonnull Revision previous) {
+        return context.getRevisionComparator().compare(x, previous) > 0;
+    }
+
+    /**
+     * TODO: turn into instance method?
+     * Returns <code>true</code> if the given revision
+     * {@link Utils#isCommitted(String)} in the revisions map and is visible
+     * from the <code>readRevision</code>.
+     *
+     * @param revision  the revision to check.
+     * @param readRevision the read revision.
+     * @param revisions the revisions map, or <code>null</code> if none is set.
+     * @return <code>true</code> if the revision is committed, otherwise
+     *         <code>false</code>.
+     */
+    private static boolean isCommitted(@Nonnull RevisionContext context,
+                                       @Nonnull Revision revision,
+                                       @Nonnull Revision readRevision,
+                                       @Nullable Map<String, String> revisions) {
+        if (revision.equals(readRevision)) {
+            return true;
+        }
+        if (revisions == null) {
+            return false;
+        }
+        String value = revisions.get(revision.toString());
+        if (value == null) {
+            return false;
+        }
+        if (Utils.isCommitted(value)) {
+            // resolve commit revision
+            revision = Utils.resolveCommitRevision(revision, value);
+            if (context.getBranches().getBranch(readRevision) == null) {
+                // readRevision is not from a branch
+                // compare resolved revision as is
+                return !isRevisionNewer(context, revision, readRevision);
+            }
+        } else {
+            // branch commit
+            if (Revision.fromString(value).getClusterId() != context.getClusterId()) {
+                // this is an unmerged branch commit from another cluster node,
+                // hence never visible to us
+                return false;
+            }
+        }
+        return includeRevision(context, revision, readRevision);
+    }
+
+    private static boolean includeRevision(RevisionContext context,
+                                    Revision x,
+                                    Revision requestRevision) {
+        Branch b = context.getBranches().getBranch(x);
+        if (b != null) {
+            // only include if requested revision is also a branch revision
+            // with a history including x
+            if (b.containsCommit(requestRevision)) {
+                // in same branch, include if the same revision or
+                // requestRevision is newer
+                return x.equals(requestRevision) || isRevisionNewer(context, requestRevision, x);
+            }
+            // not part of branch identified by requestedRevision
+            return false;
+        }
+        // assert: x is not a branch commit
+        b = context.getBranches().getBranch(requestRevision);
+        if (b != null) {
+            // reset requestRevision to branch base revision to make
+            // sure we don't include revisions committed after branch
+            // was created
+            requestRevision = b.getBase(requestRevision);
+        }
+        return context.getRevisionComparator().compare(requestRevision, x) >= 0;
+    }
+
+    /**
+     * Get the latest property value that is larger or equal the min revision,
+     * and smaller or equal the max revision.
+     *
+     * @param valueMap the revision-value map
+     * @param min the minimum revision (null meaning unlimited)
+     * @param max the maximum revision
+     * @return the value, or null if not found
+     */
+    private String getLatestValue(@Nonnull RevisionContext context,
+                                  @Nonnull Map<String, String> valueMap,
+                                  @Nullable Revision min,
+                                  @Nonnull Revision max) {
+        String value = null;
+        Revision latestRev = null;
+        for (String r : valueMap.keySet()) {
+            Revision propRev = Revision.fromString(r);
+            if (min != null && isRevisionNewer(context, min, propRev)) {
+                continue;
+            }
+            if (latestRev != null && !isRevisionNewer(context, propRev, latestRev)) {
+                continue;
+            }
+            if (includeRevision(context, propRev, max)) {
+                latestRev = propRev;
+                value = valueMap.get(r);
+            }
+        }
+        return value;
+    }
 }

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/RevisionContext.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/RevisionContext.java?rev=1514457&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/RevisionContext.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/RevisionContext.java Thu Aug 15 19:46:28 2013
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.mongomk;
+
+/**
+ * Provides revision related context.
+ */
+public interface RevisionContext {
+
+    /**
+     * @return the branches of the local MongoMK instance, which are not yet
+     *         merged.
+     */
+    public UnmergedBranches getBranches();
+
+    /**
+     * @return the pending modifications.
+     */
+    public UnsavedModifications getPendingModifications();
+
+    /**
+     * @return the revision comparator.
+     */
+    public Revision.RevisionComparator getRevisionComparator();
+
+    /**
+     * Ensure the revision visible from now on, possibly by updating the head
+     * revision, so that the changes that occurred are visible.
+     *
+     * @param foreignRevision the revision from another cluster node
+     * @param changeRevision the local revision that is sorted after the foreign revision
+     */
+    public void publishRevision(Revision foreignRevision, Revision changeRevision);
+
+    /**
+     * @return the cluster id of the local MongoMK instance.
+     */
+    public int getClusterId();
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/RevisionContext.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/RevisionContext.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnmergedBranches.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnmergedBranches.java?rev=1514457&r1=1514456&r2=1514457&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnmergedBranches.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnmergedBranches.java Thu Aug 15 19:46:28 2013
@@ -70,7 +70,7 @@ class UnmergedBranches {
         }
         NodeDocument doc = store.find(Collection.NODES, Utils.getIdFromPath("/"));
         @SuppressWarnings("unchecked")
-        Map<String, String> valueMap = (Map<String, String>) doc.get(UpdateOp.REVISIONS);
+        Map<String, String> valueMap = (Map<String, String>) doc.get(NodeDocument.REVISIONS);
         if (valueMap != null) {
             SortedMap<Revision, Revision> tmp =
                     new TreeMap<Revision, Revision>(comparator);

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UpdateOp.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UpdateOp.java?rev=1514457&r1=1514456&r2=1514457&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UpdateOp.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UpdateOp.java Thu Aug 15 19:46:28 2013
@@ -25,49 +25,6 @@ import java.util.TreeMap;
  */
 public class UpdateOp {
 
-    /**
-     * The last revision. Key: machine id, value: revision.
-     */
-    static final String LAST_REV = "_lastRev";
-    
-    /**
-     * The list of recent revisions for this node, where this node is the
-     * root of the commit. Key: revision, value: true or the base revision of an
-     * un-merged branch commit.
-     */
-    static final String REVISIONS = "_revisions";
-
-    /**
-     * The list of revision to root commit depth mappings to find out if a
-     * revision is actually committed.
-     */
-    static final String COMMIT_ROOT = "_commitRoot";
-
-    /**
-     * The number of previous documents (documents that contain old revisions of
-     * this node). This property is only set if multiple documents per node
-     * exist. This is the case when a node is updated very often in a short
-     * time, such that the document gets very big.
-     */
-    static final String PREVIOUS = "_prev";
-    
-    /**
-     * Whether this node is deleted. Key: revision, value: true/false.
-     */
-    static final String DELETED = "_deleted";
-
-    /**
-     * Revision collision markers set by commits with modifications, which
-     * overlap with un-merged branch commits.
-     * Key: revision, value:
-     */
-    static final String COLLISIONS = "_collisions";
-
-    /**
-     * The modified time (5 second resolution).
-     */
-    static final String MODIFIED = "_modified";
-    
     final String path;
     
     final String key;

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/SimpleTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/SimpleTest.java?rev=1514457&r1=1514456&r2=1514457&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/SimpleTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/SimpleTest.java Thu Aug 15 19:46:28 2013
@@ -76,7 +76,7 @@ public class SimpleTest {
         n.setProperty("name", "Hello");
         UpdateOp op = n.asOperation(true);
         // mark as commit root
-        op.setMapEntry(UpdateOp.REVISIONS, rev.toString(), "true");
+        op.setMapEntry(NodeDocument.REVISIONS, rev.toString(), "true");
         DocumentStore s = mk.getDocumentStore();
         assertTrue(s.create(Collection.NODES, Lists.newArrayList(op)));
         Node n2 = mk.getNode("/test", rev);
@@ -384,32 +384,32 @@ public class SimpleTest {
             head = mk.commit("", "+\"/test\":{\"foo\":{}}", head, null);
 
             // root node must not have the revision
-            Map<String, Object> rootNode = store.find(Collection.NODES, "0:/");
-            assertFalse(((Map<?, ?>) rootNode.get(UpdateOp.REVISIONS)).containsKey(head));
+            NodeDocument rootDoc = store.find(Collection.NODES, "0:/");
+            assertFalse(rootDoc.containsRevision(head));
 
             // test node must have head in revisions
-            Map<String, Object> node = store.find(Collection.NODES, "1:/test");
-            assertTrue(((Map<?, ?>) node.get(UpdateOp.REVISIONS)).containsKey(head));
+            NodeDocument node = store.find(Collection.NODES, "1:/test");
+            assertTrue(node.containsRevision(head));
 
             // foo must not have head in revisions and must refer to test
             // as commit root (depth = 1)
-            Map<String, Object> foo = store.find(Collection.NODES, "2:/test/foo");
-            assertTrue(foo.get(UpdateOp.REVISIONS) == null);
-            assertEquals(1, ((Map<?, ?>) foo.get(UpdateOp.COMMIT_ROOT)).get(head));
+            NodeDocument foo = store.find(Collection.NODES, "2:/test/foo");
+            assertTrue(foo.get(NodeDocument.REVISIONS) == null);
+            assertEquals(1, ((Map<?, ?>) foo.get(NodeDocument.COMMIT_ROOT)).get(head));
 
             head = mk.commit("", "+\"/bar\":{}+\"/test/foo/bar\":{}", head, null);
 
             // root node is root of commit
-            rootNode = store.find(Collection.NODES, "0:/");
-            assertTrue(((Map<?, ?>) rootNode.get(UpdateOp.REVISIONS)).containsKey(head));
+            rootDoc = store.find(Collection.NODES, "0:/");
+            assertTrue(((Map<?, ?>) rootDoc.get(NodeDocument.REVISIONS)).containsKey(head));
 
             // /bar refers to root nodes a commit root
-            Map<String, Object> bar = store.find(Collection.NODES, "1:/bar");
-            assertEquals(0, ((Map<?, ?>) bar.get(UpdateOp.COMMIT_ROOT)).get(head));
+            NodeDocument bar = store.find(Collection.NODES, "1:/bar");
+            assertEquals(0, ((Map<?, ?>) bar.get(NodeDocument.COMMIT_ROOT)).get(head));
 
             // /test/foo/bar refers to root nodes a commit root
             bar = store.find(Collection.NODES, "3:/test/foo/bar");
-            assertEquals(0, ((Map<?, ?>) bar.get(UpdateOp.COMMIT_ROOT)).get(head));
+            assertEquals(0, ((Map<?, ?>) bar.get(NodeDocument.COMMIT_ROOT)).get(head));
 
         } finally {
             mk.dispose();



Mime
View raw message