jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mreut...@apache.org
Subject svn commit: r1521438 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/mongomk/ main/java/org/apache/jackrabbit/oak/plugins/mongomk/util/ test/java/org/apache/jackrabbit/oak/plugins/mongomk/ test/java/org/apache/jackr...
Date Tue, 10 Sep 2013 11:37:48 GMT
Author: mreutegg
Date: Tue Sep 10 11:37:48 2013
New Revision: 1521438

URL: http://svn.apache.org/r1521438
Log:
OAK-926: MongoMK: split documents when they are too large (work in progress)
- split off REVISIONS as a first step

Added:
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/util/
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/util/UtilsTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collection.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Document.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MemoryDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/util/Utils.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/BaseMongoMKTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collection.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collection.java?rev=1521438&r1=1521437&r2=1521438&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collection.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collection.java Tue Sep 10 11:37:48 2013
@@ -37,8 +37,9 @@ public abstract class Collection<T exten
     public static final Collection<NodeDocument> NODES =
             new Collection<NodeDocument>("nodes") {
                 @Override
-                public NodeDocument newDocument() {
-                    return new NodeDocument();
+                @Nonnull
+                public NodeDocument newDocument(DocumentStore store) {
+                    return new NodeDocument(store);
                 }
             };
 
@@ -49,7 +50,8 @@ public abstract class Collection<T exten
     public static final Collection<ClusterNodeInfoDocument> CLUSTER_NODES =
             new Collection<ClusterNodeInfoDocument>("clusterNodes") {
                 @Override
-                public ClusterNodeInfoDocument newDocument() {
+                @Nonnull
+                public ClusterNodeInfoDocument newDocument(DocumentStore store) {
                     return new ClusterNodeInfoDocument();
                 }
             };
@@ -66,8 +68,9 @@ public abstract class Collection<T exten
     }
 
     /**
+     * @param store the document store.
      * @return a new document for this collection.
      */
     @Nonnull
-    public abstract T newDocument();
+    public abstract T newDocument(DocumentStore store);
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java?rev=1521438&r1=1521437&r2=1521438&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java Tue Sep 10 11:37:48 2013
@@ -44,18 +44,18 @@ class Collision {
     private static final Logger LOG = LoggerFactory.getLogger(Collision.class);
 
     private final NodeDocument document;
-    private final String theirRev;
+    private final Revision theirRev;
     private final UpdateOp ourOp;
-    private final String ourRev;
+    private final Revision ourRev;
 
     Collision(@Nonnull NodeDocument document,
               @Nonnull Revision theirRev,
               @Nonnull UpdateOp ourOp,
               @Nonnull Revision ourRev) {
         this.document = checkNotNull(document);
-        this.theirRev = checkNotNull(theirRev).toString();
+        this.theirRev = checkNotNull(theirRev);
         this.ourOp = checkNotNull(ourOp);
-        this.ourRev = checkNotNull(ourRev).toString();
+        this.ourRev = checkNotNull(ourRev);
     }
 
     /**
@@ -72,7 +72,7 @@ class Collision {
             return;
         }
         // their commit wins, we have to mark ourRev
-        NodeDocument newDoc = Collection.NODES.newDocument();
+        NodeDocument newDoc = Collection.NODES.newDocument(store);
         document.deepCopy(newDoc);
         MemoryDocumentStore.applyChanges(newDoc, ourOp);
         if (!markCommitRoot(newDoc, ourRev, store)) {
@@ -94,8 +94,9 @@ class Collision {
      *         successfully; <code>false</code> otherwise.
      */
     private static boolean markCommitRoot(@Nonnull NodeDocument document,
-                                          @Nonnull String revision,
+                                          @Nonnull Revision revision,
                                           @Nonnull DocumentStore store) {
+        String rev = revision.toString();
         String p = Utils.getPathFromId(document.getId());
         String commitRootPath = null;
         // first check if we can mark the commit with the given revision
@@ -109,9 +110,9 @@ class Collision {
             commitRootPath = p;
         } else {
             // next look at commit root
-            commitRootPath = document.getCommitRootPath(revision);
+            commitRootPath = document.getCommitRootPath(rev);
             if (commitRootPath == null) {
-                throwNoCommitRootException(revision, document);
+                throwNoCommitRootException(rev, document);
             }
         }
         // at this point we have a commitRootPath
@@ -121,7 +122,7 @@ class Collision {
         if (document.isCommitted(revision)) {
             return false;
         }
-        op.setMapEntry(NodeDocument.COLLISIONS, revision, true);
+        op.setMapEntry(NodeDocument.COLLISIONS, rev, true);
         document = store.createOrUpdate(Collection.NODES, op);
         // check again on old document right before our update was applied
         if (document.isCommitted(revision)) {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java?rev=1521438&r1=1521437&r2=1521438&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java Tue Sep 10 11:37:48 2013
@@ -35,29 +35,15 @@ import org.apache.jackrabbit.oak.commons
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.jackrabbit.oak.plugins.mongomk.NodeDocument.SPLIT_CANDIDATE_THRESHOLD;
+
 /**
  * A higher level object representing a commit.
  */
 public class Commit {
 
-    /**
-     * Whether to purge old revisions if a node gets too large. If false, old
-     * revisions are stored in a separate document. If true, old revisions are
-     * removed (purged).
-     * TODO: enable once document split and garbage collection implementation is complete.
-     */
-    static final boolean PURGE_OLD_REVISIONS = false;
-    
     private static final Logger LOG = LoggerFactory.getLogger(Commit.class);
 
-    /**
-     * The maximum size of a document. If it is larger, it is split.
-     * TODO: check which value is the best one
-     *       Document splitting is currently disabled until the implementation
-     *       is complete.
-     */
-    private static final int MAX_DOCUMENT_SIZE = Integer.MAX_VALUE;
-   
     private final MongoMK mk;
     private final Revision baseRevision;
     private final Revision revision;
@@ -221,7 +207,7 @@ public class Commit {
                 NodeDocument.setLastRev(op, revision);
             }
             if (op.isNew) {
-                op.setMapEntry(NodeDocument.DELETED, revision.toString(), "false");
+                NodeDocument.setDeleted(op, revision, false);
             }
             if (op == commitRoot) {
                 // apply at the end
@@ -306,7 +292,7 @@ public class Commit {
             final AtomicReference<List<Revision>> collisions = new AtomicReference<List<Revision>>();
             Revision newestRev = null;
             if (doc != null) {
-                newestRev = doc.getNewestRevision(mk, store, revision,
+                newestRev = doc.getNewestRevision(mk, revision,
                         new CollisionHandler() {
                             @Override
                             void concurrentModification(Revision other) {
@@ -353,23 +339,8 @@ public class Commit {
             }
         }
 
-        if (doc != null && doc.getMemory() > MAX_DOCUMENT_SIZE) {
-            UpdateOp[] split = doc.splitDocument(mk, revision, mk.getSplitDocumentAgeMillis());
-            
-            // TODO check if the new main document is actually smaller;
-            // otherwise, splitting doesn't make sense
-            
-            // the old version
-            UpdateOp old = split[0];
-            if (old != null) {
-                store.createOrUpdate(Collection.NODES, old);
-            }
-            
-            // the (shrunken) main document
-            UpdateOp main = split[1];
-            if (main != null) {
-                store.createOrUpdate(Collection.NODES, main);
-            }
+        if (doc != null && doc.getMemory() > SPLIT_CANDIDATE_THRESHOLD) {
+            mk.addSplitCandidate(doc.getId());
         }
     }
 
@@ -392,44 +363,7 @@ public class Commit {
             // or document did not exist before
             return false;
         }
-        // did existence of node change after baseRevision?
-        @SuppressWarnings("unchecked")
-        Map<String, String> deleted = (Map<String, String>) doc.get(NodeDocument.DELETED);
-        if (deleted != null) {
-            for (Map.Entry<String, String> entry : deleted.entrySet()) {
-                if (mk.isRevisionNewer(Revision.fromString(entry.getKey()), baseRevision)) {
-                    return true;
-                }
-            }
-        }
-
-        for (Map.Entry<String, UpdateOp.Operation> entry : op.changes.entrySet()) {
-            if (entry.getValue().type != UpdateOp.Operation.Type.SET_MAP_ENTRY) {
-                continue;
-            }
-            int idx = entry.getKey().indexOf('.');
-            String name = entry.getKey().substring(0, idx);
-            if (NodeDocument.DELETED.equals(name)) {
-                // existence of node changed, this always conflicts with
-                // any other concurrent change
-                return true;
-            }
-            if (!Utils.isPropertyName(name)) {
-                continue;
-            }
-            // was this property touched after baseRevision?
-            @SuppressWarnings("unchecked")
-            Map<String, Object> changes = (Map<String, Object>) doc.get(name);
-            if (changes == null) {
-                continue;
-            }
-            for (String rev : changes.keySet()) {
-                if (mk.isRevisionNewer(Revision.fromString(rev), baseRevision)) {
-                    return true;
-                }
-            }
-        }
-        return false;
+        return doc.isConflicting(op, baseRevision, mk);
     }
 
     /**
@@ -507,7 +441,7 @@ public class Commit {
         removedNodes.add(path);
         UpdateOp op = getUpdateOperationForNode(path);
         op.setDelete(true);
-        op.setMapEntry(NodeDocument.DELETED, revision.toString(), "true");
+        NodeDocument.setDeleted(op, revision, true);
     }
 
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Document.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Document.java?rev=1521438&r1=1521437&r2=1521438&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Document.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Document.java Tue Sep 10 11:37:48 2013
@@ -18,15 +18,19 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import org.apache.jackrabbit.oak.cache.CacheValue;
 import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
 
+import com.google.common.collect.Maps;
+
 /**
  * A document corresponds to a node stored in the MongoMK. A document contains
  * all the revisions of a node stored in the {@link DocumentStore}.
@@ -42,7 +46,7 @@ public class Document implements CacheVa
     /**
      * The data of this document.
      */
-    protected Map<String, Object> data = new TreeMap<String, Object>();
+    protected Map<String, Object> data = Maps.newHashMap();
 
     /**
      * Whether this document is sealed (immutable data).
@@ -94,7 +98,14 @@ public class Document implements CacheVa
      */
     public void seal() {
         if (!sealed.getAndSet(true)) {
-            data = seal(data);
+            for (Map.Entry<String, Object> entry : data.entrySet()) {
+                if (entry.getValue() instanceof Map) {
+                    @SuppressWarnings("unchecked")
+                    Map<Object, Object> map = (Map<Object, Object>) entry.getValue();
+                    entry.setValue(transformAndSeal(map, entry.getKey(), 1));
+                }
+            }
+            data = Collections.unmodifiableMap(data);
         }
     }
 
@@ -123,17 +134,34 @@ public class Document implements CacheVa
         return Utils.estimateMemoryUsage(this.data);
     }
 
-    //------------------------------< internal >--------------------------------
-
-    private static Map<String, Object> seal(Map<String, Object> map) {
-        for (Map.Entry<String, Object> entry : map.entrySet()) {
+    /**
+     * Transform and seal the data of this document. That is, the data becomes
+     * immutable and transformation may be performed on the data.
+     *
+     * @param map the map to transform.
+     * @param key the key for the given map or <code>null</code> if the map
+     *            is the top level data map.
+     * @param level the level. Zero for the top level map, one for an entry in
+     *              the top level map, etc.
+     * @return the transformed and sealed map.
+     */
+    @Nonnull
+    protected Map<?, ?> transformAndSeal(@Nonnull Map<Object, Object> map,
+                                         @Nullable String key,
+                                         int level) {
+        for (Map.Entry<Object, Object> entry : map.entrySet()) {
             Object value = entry.getValue();
             if (value instanceof Map) {
                 @SuppressWarnings("unchecked")
-                Map<String, Object> childMap = (Map<String, Object>) value;
-                entry.setValue(seal(childMap));
+                Map<Object, Object> childMap = (Map<Object, Object>) value;
+                entry.setValue(transformAndSeal(
+                        childMap, entry.getKey().toString(), level + 1));
             }
         }
-        return Collections.unmodifiableMap(map);
+        if (map instanceof NavigableMap) {
+            return Maps.unmodifiableNavigableMap((NavigableMap<Object, Object>) map);
+        } else {
+            return Collections.unmodifiableMap(map);
+        }
     }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MemoryDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MemoryDocumentStore.java?rev=1521438&r1=1521437&r2=1521438&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MemoryDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MemoryDocumentStore.java Tue Sep 10 11:37:48 2013
@@ -166,7 +166,7 @@ public class MemoryDocumentStore impleme
             // get the node if it's there
             oldDoc = map.get(update.key);
 
-            T doc = collection.newDocument();
+            T doc = collection.newDocument(this);
             if (oldDoc == null) {
                 if (!update.isNew) {
                     throw new MicroKernelException("Document does not exist: " + update.key);

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java?rev=1521438&r1=1521437&r2=1521438&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java Tue Sep 10 11:37:48 2013
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionExc
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import org.apache.jackrabbit.mk.api.MicroKernelException;
 import org.apache.jackrabbit.oak.plugins.mongomk.UpdateOp.Operation;
@@ -176,7 +177,9 @@ public class MongoDocumentStore implemen
                 return null;
             }
             T doc = convertFromDBObject(collection, obj);
-            doc.seal();
+            if (doc != null) {
+                doc.seal();
+            }
             return doc;
         } finally {
             end("findUncached", start);
@@ -217,8 +220,8 @@ public class MongoDocumentStore implemen
             for (int i = 0; i < limit && cursor.hasNext(); i++) {
                 DBObject o = cursor.next();
                 T doc = convertFromDBObject(collection, o);
-                doc.seal();
-                if (collection == Collection.NODES) {
+                if (collection == Collection.NODES && doc != null) {
+                    doc.seal();
                     nodesCache.put(doc.getId(), (NodeDocument) doc);
                 }
                 list.add(doc);
@@ -322,19 +325,21 @@ public class MongoDocumentStore implemen
             if (checkConditions && oldNode == null) {
                 return null;
             }
-            T doc = convertFromDBObject(collection, oldNode);
+            T oldDoc = convertFromDBObject(collection, oldNode);
             
             // cache the new document
             if (collection == Collection.NODES) {
-                T newDoc = collection.newDocument();
-                doc.deepCopy(newDoc);
+                T newDoc = collection.newDocument(this);
+                if (oldDoc != null) {
+                    oldDoc.deepCopy(newDoc);
+                    oldDoc.seal();
+                }
                 String key = updateOp.getKey();
                 MemoryDocumentStore.applyChanges(newDoc, updateOp);
                 newDoc.seal();
                 nodesCache.put(key, (NodeDocument) newDoc);
             }
-            doc.seal();
-            return doc;
+            return oldDoc;
         } catch (Exception e) {
             throw new MicroKernelException(e);
         } finally {
@@ -370,7 +375,7 @@ public class MongoDocumentStore implemen
         for (int i = 0; i < updateOps.size(); i++) {
             inserts[i] = new BasicDBObject();
             UpdateOp update = updateOps.get(i);
-            T target = collection.newDocument();
+            T target = collection.newDocument(this);
             MemoryDocumentStore.applyChanges(target, update);
             docs.add(target);
             for (Entry<String, Operation> entry : update.changes.entrySet()) {
@@ -422,10 +427,12 @@ public class MongoDocumentStore implemen
         }        
     }
 
-    private static <T extends Document> T convertFromDBObject(Collection<T> collection,
-                                                              DBObject n) {
-        T copy = collection.newDocument();
+    @CheckForNull
+    private <T extends Document> T convertFromDBObject(@Nonnull Collection<T> collection,
+                                                       @Nullable DBObject n) {
+        T copy = null;
         if (n != null) {
+            copy = collection.newDocument(this);
             for (String key : n.keySet()) {
                 Object o = n.get(key);
                 if (o instanceof String) {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java?rev=1521438&r1=1521437&r2=1521438&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java Tue Sep 10 11:37:48 2013
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -61,6 +62,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.Weigher;
+import com.google.common.collect.Maps;
 import com.mongodb.DB;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -181,6 +183,11 @@ public class MongoMK implements MicroKer
      * Key: path, value: revision.
      */
     private final UnsavedModifications unsavedLastRevisions = new UnsavedModifications();
+
+    /**
+     * Set of IDs for documents that may need to be split.
+     */
+    private final Map<String, String> splitCandidates = Maps.newConcurrentMap();
     
     /**
      * The last known revision for each cluster instance.
@@ -317,6 +324,7 @@ public class MongoMK implements MicroKer
         }
         synchronized (this) {
             try {
+                backgroundSplit();
                 backgroundWrite();
                 backgroundRead();
             } catch (RuntimeException e) {
@@ -372,7 +380,26 @@ public class MongoMK implements MicroKer
         }
         revisionComparator.purge(Revision.getCurrentTimestamp() - REMEMBER_REVISION_ORDER_MILLIS);
     }
-    
+
+    private void backgroundSplit() {
+        for (Iterator<String> it = splitCandidates.keySet().iterator(); it.hasNext(); ) {
+            String id = it.next();
+            NodeDocument doc = store.find(Collection.NODES, id);
+            if (doc == null) {
+                continue;
+            }
+            for (UpdateOp op : doc.split(this)) {
+                NodeDocument before = store.createOrUpdate(Collection.NODES, op);
+                if (before != null) {
+                    NodeDocument after = store.find(Collection.NODES, op.getKey());
+                    LOG.info("Split operation on {}. Size before: {}, after: {}",
+                            new Object[]{id, before.getMemory(), after.getMemory()});
+                }
+            }
+            it.remove();
+        }
+    }
+
     void backgroundWrite() {
         if (unsavedLastRevisions.getPaths().size() == 0) {
             return;
@@ -411,7 +438,7 @@ public class MongoMK implements MicroKer
             unsavedLastRevisions.remove(p);
         }
     }
-    
+
     public void dispose() {
         // force background write (with asyncDelay > 0, the root wouldn't be written)
         // TODO make this more obvious / explicit
@@ -455,6 +482,15 @@ public class MongoMK implements MicroKer
         }
         return node;
     }
+
+    /**
+     * Enqueue the document with the given id as a split candidate.
+     *
+     * @param id the id of the document to check if it needs to be split.
+     */
+    void addSplitCandidate(String id) {
+        splitCandidates.put(id, id);
+    }
     
     private void checkRevisionAge(Revision r, String path) {
         // TODO only log if there are new revisions available for the given node
@@ -526,7 +562,7 @@ public class MongoMK implements MicroKer
             Set<Revision> validRevisions = new HashSet<Revision>();
             for (NodeDocument doc : list) {
                 // filter out deleted children
-                if (doc.getLiveRevision(this, store, rev, validRevisions) == null) {
+                if (doc.isDeleted(this, rev, validRevisions)) {
                     continue;
                 }
                 String p = Utils.getPathFromId(doc.getId());
@@ -553,7 +589,7 @@ public class MongoMK implements MicroKer
         if (doc == null) {
             return null;
         }
-        return doc.getNodeAtRevision(this, store, readRevision);
+        return doc.getNodeAtRevision(this, readRevision);
     }
     
     @Override

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java?rev=1521438&r1=1521437&r2=1521438&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Node.java Tue Sep 10 11:37:48 2013
@@ -78,7 +78,7 @@ public class Node implements CacheValue 
         UpdateOp op = new UpdateOp(id, isNew);
         op.set(Document.ID, id);
         NodeDocument.setModified(op, rev);
-        op.setMapEntry(NodeDocument.DELETED, rev.toString(), "false");
+        NodeDocument.setDeleted(op, rev, false);
         for (String p : properties.keySet()) {
             String key = Utils.escapePropertyName(p);
             op.setMapEntry(key, rev.toString(), properties.get(p));

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java?rev=1521438&r1=1521437&r2=1521438&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java Tue Sep 10 11:37:48 2013
@@ -16,10 +16,16 @@
  */
 package org.apache.jackrabbit.oak.plugins.mongomk;
 
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
@@ -35,8 +41,13 @@ import org.apache.jackrabbit.oak.plugins
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Maps;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
@@ -46,10 +57,24 @@ public class NodeDocument extends Docume
 
     private static final Logger log = LoggerFactory.getLogger(NodeDocument.class);
 
+    private static final SortedMap<Revision, Range> EMPTY_RANGE_MAP
+            = Collections.unmodifiableSortedMap(new TreeMap<Revision, Range>());
+
+    /**
+     * A size threshold after which to consider a document a split candidate.
+     * TODO: check which value is the best one
+     */
+    static final int SPLIT_CANDIDATE_THRESHOLD = 32 * 1024;
+
+    /**
+     * Only split off at least this number of revisions.
+     */
+    static final int REVISIONS_SPLIT_OFF_SIZE = 1000;
+
     /**
      * Marker document, which indicates the document does not exist.
      */
-    public static final NodeDocument NULL = new NodeDocument();
+    public static final NodeDocument NULL = new NodeDocument(new MemoryDocumentStore());
 
     /**
      * The list of revision to root commit depth mappings to find out if a
@@ -68,7 +93,7 @@ public class NodeDocument extends Docume
     /**
      * Whether this node is deleted. Key: revision, value: true/false.
      */
-    static final String DELETED = "_deleted";
+    private static final String DELETED = "_deleted";
 
     /**
      * Revision collision markers set by commits with modifications, which
@@ -96,6 +121,12 @@ public class NodeDocument extends Docume
 
     private final long time = System.currentTimeMillis();
 
+    private final DocumentStore store;
+
+    NodeDocument(@Nonnull DocumentStore store) {
+        this.store = checkNotNull(store);
+    }
+
     /**
      * @return the system time this object was created.
      */
@@ -128,28 +159,52 @@ public class NodeDocument extends Docume
      * @param revision the revision.
      * @return <code>true</code> if committed; <code>false</code> otherwise.
      */
-    public boolean isCommitted(@Nonnull String revision) {
-        checkNotNull(revision);
-        @SuppressWarnings("unchecked")
-        Map<String, String> revisions = (Map<String, String>) get(REVISIONS);
-        return revisions != null && Utils.isCommitted(revisions.get(revision));
+    public boolean isCommitted(@Nonnull Revision revision) {
+        String rev = checkNotNull(revision).toString();
+        String value = getRevisions().get(rev);
+        if (value != null) {
+            return Utils.isCommitted(value);
+        }
+        // check previous docs
+        for (NodeDocument prev : getPreviousDocs(revision)) {
+            if (prev.containsRevision(revision)) {
+                return prev.isCommitted(revision);
+            }
+        }
+        return false;
     }
 
     /**
      * Returns <code>true</code> if this document contains an entry for the
      * given <code>revision</code> in the {@link #REVISIONS} map. Please note
      * that an entry in the {@link #REVISIONS} map does not necessarily mean
-     * the the revision is committed. Use {@link #isCommitted(String)} to get
+     * the the revision is committed. Use {@link #isCommitted(Revision)} to get
      * the commit state of a revision.
      *
      * @param revision the revision to check.
      * @return <code>true</code> if this document contains the given revision.
      */
+    public boolean containsRevision(@Nonnull Revision revision) {
+        String rev = checkNotNull(revision).toString();
+        if (getRevisionsMap().containsKey(rev)) {
+            return true;
+        }
+        for (NodeDocument prev : getPreviousDocs(revision)) {
+            if (prev.containsRevision(revision)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Same as {@link #containsRevision(Revision)}, but with a String parameter.
+     *
+     * @param revision the revision.
+     * @return <code>true</code> if this document contains the given revision.
+     */
     public boolean containsRevision(@Nonnull String revision) {
-        checkNotNull(revision);
-        @SuppressWarnings("unchecked")
-        Map<String, String> revisions = (Map<String, String>) get(REVISIONS);
-        return revisions != null && revisions.containsKey(revision);
+        return containsRevision(Revision.fromString(checkNotNull(revision)));
     }
 
     /**
@@ -162,18 +217,17 @@ public class NodeDocument extends Docume
      * @return the uncommitted revisions of this document.
      */
     public SortedMap<Revision, Revision> getUncommittedRevisions(RevisionContext context) {
-        @SuppressWarnings("unchecked")
-        Map<String, String> valueMap = (Map<String, String>) get(NodeDocument.REVISIONS);
+        // only look at revisions in this document.
+        // uncommitted revisions are not split off
+        Map<String, String> valueMap = getRevisionsMap();
         SortedMap<Revision, Revision> revisions =
                 new TreeMap<Revision, Revision>(context.getRevisionComparator());
-        if (valueMap != null) {
-            for (Map.Entry<String, String> commit : valueMap.entrySet()) {
-                if (!Utils.isCommitted(commit.getValue())) {
-                    Revision r = Revision.fromString(commit.getKey());
-                    if (r.getClusterId() == context.getClusterId()) {
-                        Revision b = Revision.fromString(commit.getValue());
-                        revisions.put(r, b);
-                    }
+        for (Map.Entry<String, String> commit : valueMap.entrySet()) {
+            if (!Utils.isCommitted(commit.getValue())) {
+                Revision r = Revision.fromString(commit.getKey());
+                if (r.getClusterId() == context.getClusterId()) {
+                    Revision b = Revision.fromString(commit.getValue());
+                    revisions.put(r, b);
                 }
             }
         }
@@ -213,15 +267,12 @@ public class NodeDocument extends Docume
      * @return the revision, or null if deleted
      */
     @SuppressWarnings("unchecked")
-    @Nullable
+    @CheckForNull
     public Revision getNewestRevision(RevisionContext context,
-                                      DocumentStore store,
                                       Revision changeRev,
                                       CollisionHandler handler) {
         SortedSet<String> revisions = new TreeSet<String>(Collections.reverseOrder());
-        if (data.containsKey(REVISIONS)) {
-            revisions.addAll(((Map<String, String>) get(REVISIONS)).keySet());
-        }
+        revisions.addAll(getRevisions().keySet());
         if (data.containsKey(COMMIT_ROOT)) {
             revisions.addAll(((Map<String, Integer>) get(COMMIT_ROOT)).keySet());
         }
@@ -241,8 +292,7 @@ public class NodeDocument extends Docume
             }
             if (newestRev == null || isRevisionNewer(context, propRev, newestRev)) {
                 if (!propRev.equals(changeRev)) {
-                    if (!isValidRevision(context, store,
-                            propRev, changeRev, new HashSet<Revision>())) {
+                    if (!isValidRevision(context, propRev, changeRev, new HashSet<Revision>())) {
                         handler.concurrentModification(propRev);
                     } else {
                         newestRev = propRev;
@@ -282,22 +332,21 @@ public class NodeDocument extends Docume
      *         otherwise.
      */
     boolean isValidRevision(@Nonnull RevisionContext context,
-                            @Nonnull DocumentStore store,
                             @Nonnull Revision rev,
                             @Nonnull Revision readRevision,
                             @Nonnull Set<Revision> validRevisions) {
         if (validRevisions.contains(rev)) {
             return true;
         }
-        @SuppressWarnings("unchecked")
-        Map<String, String> revisions = (Map<String, String>) get(REVISIONS);
-        if (isCommitted(context, rev, readRevision, revisions)) {
-            validRevisions.add(rev);
-            return true;
-        } else if (revisions != null && revisions.containsKey(rev.toString())) {
-            // rev is in revisions map of this node, but not committed
-            // no need to check _commitRoot field
-            return false;
+        if (containsRevision(rev)) {
+            if (isCommitted(context, rev, readRevision)) {
+                validRevisions.add(rev);
+                return true;
+            } else {
+                // rev is in revisions map of this node, but not committed
+                // no need to check _commitRoot field
+                return false;
+            }
         }
         // check commit root
         @SuppressWarnings("unchecked")
@@ -323,9 +372,7 @@ public class NodeDocument extends Docume
         if (doc == null) {
             return false;
         }
-        @SuppressWarnings("unchecked")
-        Map<String, String> rootRevisions = (Map<String, String>) doc.get(REVISIONS);
-        if (isCommitted(context, rev, readRevision, rootRevisions)) {
+        if (doc.isCommitted(context, rev, readRevision)) {
             validRevisions.add(rev);
             return true;
         }
@@ -341,10 +388,8 @@ public class NodeDocument extends Docume
      *         given read revision.
      */
     @CheckForNull
-    public Node getNodeAtRevision(RevisionContext context,
-                                  DocumentStore store,
-                                  Revision readRevision) {
-        Revision min = getLiveRevision(context, store, readRevision, new HashSet<Revision>());
+    public Node getNodeAtRevision(RevisionContext context, Revision readRevision) {
+        Revision min = getLiveRevision(context, readRevision, new HashSet<Revision>());
         if (min == null) {
             // deleted
             return null;
@@ -359,10 +404,10 @@ public class NodeDocument extends Docume
             @SuppressWarnings("unchecked")
             Map<String, String> valueMap = (Map<String, String>) v;
             if (valueMap != null) {
-                if (valueMap instanceof TreeMap) {
+                if (valueMap instanceof NavigableMap) {
                     // TODO instanceof should be avoided
                     // use descending keys (newest first) if map is sorted
-                    valueMap = ((TreeMap<String, String>) valueMap).descendingMap();
+                    valueMap = ((NavigableMap<String, String>) valueMap).descendingMap();
                 }
                 String value = getLatestValue(context, valueMap, min, readRevision);
                 String propertyName = Utils.unescapePropertyName(key);
@@ -416,6 +461,48 @@ public class NodeDocument extends Docume
     }
 
     /**
+     * Returns <code>true</code> if this node is considered deleted at the
+     * given <code>readRevision</code>.
+     *
+     * @param context the revision context.
+     * @param readRevision the read revision.
+     * @param validRevisions the set of revisions already checked against
+     *                       <code>readRevision</code> and considered valid.
+     * @return <code>true</code> if deleted, <code>false</code> otherwise.
+     */
+    public boolean isDeleted(RevisionContext context,
+                             Revision readRevision,
+                             Set<Revision> validRevisions) {
+        @SuppressWarnings("unchecked")
+        Map<String, String> valueMap = (Map<String, String>) get(NodeDocument.DELETED);
+        if (valueMap == null) {
+            return false;
+        }
+        if (valueMap instanceof NavigableMap) {
+            // TODO instanceof should be avoided
+            // use descending keys (newest first) if map is sorted
+            valueMap = ((NavigableMap<String, String>) valueMap).descendingMap();
+        }
+        Revision mostRecent = null;
+        boolean deleted = false;
+        for (Map.Entry<String, String> entry : valueMap.entrySet()) {
+            Revision r = Revision.fromString(entry.getKey());
+            if (isRevisionNewer(context, r, readRevision)) {
+                // ignore -> newer than readRevision
+                continue;
+            }
+            if (mostRecent != null && isRevisionNewer(context, mostRecent, r)) {
+                continue;
+            }
+            if (isValidRevision(context, r, readRevision, validRevisions)) {
+                mostRecent = r;
+                deleted = "true".equals(entry.getValue());
+            }
+        }
+        return mostRecent == null || deleted;
+    }
+
+    /**
      * Get the earliest (oldest) revision where the node was alive at or before
      * the provided revision, if the node was alive at the given revision.
      *
@@ -425,9 +512,8 @@ public class NodeDocument extends Docume
      * @return the earliest revision, or null if the node is deleted at the
      *         given revision
      */
-    public Revision getLiveRevision(RevisionContext context,
-                                    DocumentStore store,
-                                    Revision maxRev,
+    @CheckForNull
+    public Revision getLiveRevision(RevisionContext context, Revision maxRev,
                                     Set<Revision> validRevisions) {
         @SuppressWarnings("unchecked")
         Map<String, String> valueMap = (Map<String, String>) get(NodeDocument.DELETED);
@@ -436,10 +522,10 @@ public class NodeDocument extends Docume
         }
         // first, search the newest deleted revision
         Revision deletedRev = null;
-        if (valueMap instanceof TreeMap) {
+        if (valueMap instanceof NavigableMap) {
             // TODO instanceof should be avoided
             // use descending keys (newest first) if map is sorted
-            valueMap = ((TreeMap<String, String>) valueMap).descendingMap();
+            valueMap = ((NavigableMap<String, String>) valueMap).descendingMap();
         }
         for (String r : valueMap.keySet()) {
             String value = valueMap.get(r);
@@ -449,7 +535,7 @@ public class NodeDocument extends Docume
             }
             Revision propRev = Revision.fromString(r);
             if (isRevisionNewer(context, propRev, maxRev)
-                    || !isValidRevision(context, store, propRev, maxRev, validRevisions)) {
+                    || !isValidRevision(context, propRev, maxRev, validRevisions)) {
                 continue;
             }
             if (deletedRev == null || isRevisionNewer(context, propRev, deletedRev)) {
@@ -471,7 +557,7 @@ public class NodeDocument extends Docume
                 continue;
             }
             if (isRevisionNewer(context, propRev, maxRev)
-                    || !isValidRevision(context, store, propRev, maxRev, validRevisions)) {
+                    || !isValidRevision(context, propRev, maxRev, validRevisions)) {
                 continue;
             }
             if (liveRev == null || isRevisionNewer(context, liveRev, propRev)) {
@@ -482,75 +568,154 @@ public class NodeDocument extends Docume
     }
 
     /**
-     * Split this document in two.
+     * Returns <code>true</code> if the given operation is conflicting with this
+     * document.
      *
-     * @param context
-     * @param commitRevision
-     * @param splitDocumentAgeMillis
-     * @return
-     */
-    public UpdateOp[] splitDocument(@Nonnull RevisionContext context,
-                                    @Nonnull Revision commitRevision,
-                                    long splitDocumentAgeMillis) {
-        String id = getId();
-        String path = Utils.getPathFromId(id);
-        Long previous = (Long) get(NodeDocument.PREVIOUS);
-        if (previous == null) {
-            previous = 0L;
-        } else {
-            previous++;
+     * @param op the update operation.
+     * @param baseRevision the base revision for the update operation.
+     * @param context the revision context.
+     * @return <code>true</code> if conflicting, <code>false</code> otherwise.
+     */
+    public boolean isConflicting(@Nonnull UpdateOp op,
+                                 @Nonnull Revision baseRevision,
+                                 @Nonnull RevisionContext context) {
+        // did existence of node change after baseRevision?
+        @SuppressWarnings("unchecked")
+        Map<String, String> deleted = (Map<String, String>) get(DELETED);
+        if (deleted != null) {
+            for (Map.Entry<String, String> entry : deleted.entrySet()) {
+                if (isRevisionNewer(context, Revision.fromString(entry.getKey()), baseRevision)) {
+                    return true;
+                }
+            }
         }
-        UpdateOp old = new UpdateOp(id + "/" + previous, true);
-        setModified(old, commitRevision);
-        UpdateOp main = new UpdateOp(id, false);
-        setModified(main, commitRevision);
-        main.set(NodeDocument.PREVIOUS, previous);
-        for (Map.Entry<String, Object> e : data.entrySet()) {
-            String key = e.getKey();
-            if (key.equals(Document.ID)) {
-                // ok
-            } else if (key.equals(NodeDocument.MODIFIED)) {
-                // ok
-            } else if (key.equals(NodeDocument.PREVIOUS)) {
-                // ok
-            } else if (key.equals(NodeDocument.LAST_REV)) {
-                // only maintain the lastRev in the main document
-                main.setMap(NodeDocument.LAST_REV,
-                        String.valueOf(commitRevision.getClusterId()),
-                        commitRevision.toString());
-            } else {
-                // UpdateOp.DELETED,
-                // UpdateOp.REVISIONS,
-                // and regular properties
-                @SuppressWarnings("unchecked")
-                Map<String, Object> valueMap = (Map<String, Object>) e.getValue();
-                Revision latestRev = null;
-                for (String r : valueMap.keySet()) {
-                    Revision propRev = Revision.fromString(r);
-                    if (latestRev == null || isRevisionNewer(context, propRev, latestRev)) {
-                        latestRev = propRev;
-                    }
+
+        for (Map.Entry<String, UpdateOp.Operation> entry : op.changes.entrySet()) {
+            if (entry.getValue().type != UpdateOp.Operation.Type.SET_MAP_ENTRY) {
+                continue;
+            }
+            int idx = entry.getKey().indexOf('.');
+            String name = entry.getKey().substring(0, idx);
+            if (DELETED.equals(name)) {
+                // existence of node changed, this always conflicts with
+                // any other concurrent change
+                return true;
+            }
+            if (!Utils.isPropertyName(name)) {
+                continue;
+            }
+            // was this property touched after baseRevision?
+            @SuppressWarnings("unchecked")
+            Map<String, Object> changes = (Map<String, Object>) get(name);
+            if (changes == null) {
+                continue;
+            }
+            for (String rev : changes.keySet()) {
+                if (isRevisionNewer(context, Revision.fromString(rev), baseRevision)) {
+                    return true;
                 }
-                for (String r : valueMap.keySet()) {
-                    Revision propRev = Revision.fromString(r);
-                    Object v = valueMap.get(r);
-                    if (propRev.equals(latestRev)) {
-                        main.setMap(key, propRev.toString(), v);
-                    } else {
-                        long ageMillis = Revision.getCurrentTimestamp() - propRev.getTimestamp();
-                        if (ageMillis > splitDocumentAgeMillis) {
-                            old.setMapEntry(key, propRev.toString(), v);
-                        } else {
-                            main.setMap(key, propRev.toString(), v);
-                        }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Returns update operations to split this document. The implementation may
+     * decide to not return any operations if no splitting is required.
+     *
+     * @param context the revision context.
+     * @return the split operations.
+     */
+    @Nonnull
+    public Iterable<UpdateOp> split(@Nonnull RevisionContext context) {
+        String id = getId();
+        SortedMap<Revision, Range> previous = getPreviousRanges();
+        // what's the most recent previous revision?
+        Revision recentPrevious = null;
+        for (Revision rev : previous.keySet()) {
+            if (rev.getClusterId() != context.getClusterId()) {
+                continue;
+            }
+            if (recentPrevious == null
+                    || isRevisionNewer(context, rev, recentPrevious)) {
+                recentPrevious = rev;
+            }
+        }
+        NavigableMap<Revision, String> splitRevs
+                = Maps.newTreeMap(context.getRevisionComparator());
+        Map<String, String> revisions = getRevisionsMap();
+        // only consider if there are enough revisions
+        if (revisions.size() > REVISIONS_SPLIT_OFF_SIZE) {
+            // collect commits of this cluster node after the
+            // most recent previous split revision
+            for (Map.Entry<String, String> entry : revisions.entrySet()) {
+                Revision rev = Revision.fromString(entry.getKey());
+                if (rev.getClusterId() != context.getClusterId()) {
+                    continue;
+                }
+                if (recentPrevious == null
+                        || isRevisionNewer(context, rev, recentPrevious)) {
+                    String commitValue = entry.getValue();
+                    if (Utils.isCommitted(commitValue)) {
+                        splitRevs.put(rev, commitValue);
                     }
                 }
             }
         }
-        if (Commit.PURGE_OLD_REVISIONS) {
-            old = null;
+        List<UpdateOp> splitOps = Collections.emptyList();
+        if (splitRevs.size() > REVISIONS_SPLIT_OFF_SIZE) {
+            // enough revisions to split off
+            splitOps = new ArrayList<UpdateOp>(2);
+            // keep the most recent in the main document
+            splitRevs.remove(splitRevs.lastKey());
+            // move the others to another document
+            Revision high = splitRevs.lastEntry().getKey();
+            Revision low = splitRevs.firstEntry().getKey();
+            UpdateOp main = new UpdateOp(id, false);
+            main.setMapEntry(PREVIOUS, high.toString(), low.toString());
+            UpdateOp old = new UpdateOp(Utils.getPreviousIdFor(id, high), true);
+            old.set(ID, old.getKey());
+            for (Map.Entry<Revision, String> entry : splitRevs.entrySet()) {
+                String r = entry.getKey().toString();
+                main.removeMapEntry(REVISIONS, r);
+                old.setMapEntry(REVISIONS, r, entry.getValue());
+            }
+            splitOps.add(old);
+            splitOps.add(main);
         }
-        return new UpdateOp[]{old, main};
+        return splitOps;
+    }
+
+    @Override
+    @Nonnull
+    protected Map<?, ?> transformAndSeal(@Nonnull Map<Object, Object> map,
+                                         @Nullable String key,
+                                         int level) {
+        if (level == 1) {
+            if (PREVIOUS.equals(key)) {
+                SortedMap<Revision, Range> transformed = new TreeMap<Revision, Range>(
+                        new Comparator<Revision>() {
+                            @Override
+                            public int compare(Revision o1, Revision o2) {
+                                // in reverse order!
+                                int c = o2.compareRevisionTime(o1);
+                                if (c == 0) {
+                                    c = o1.getClusterId() < o2.getClusterId()
+                                            ? -1
+                                            : (o1.getClusterId() == o2.getClusterId() ? 0 : 1);
+                                }
+                                return c;
+                            }
+                        });
+                for (Map.Entry<Object, Object> entry : map.entrySet()) {
+                    Revision high = Revision.fromString(entry.getKey().toString());
+                    Revision low = Revision.fromString(entry.getValue().toString());
+                    transformed.put(high, new Range(high, low));
+                }
+                return Collections.unmodifiableSortedMap(transformed);
+            }
+        }
+        return super.transformAndSeal(map, key, level);
     }
 
     //-------------------------< UpdateOp modifiers >---------------------------
@@ -587,6 +752,12 @@ public class NodeDocument extends Docume
                 checkNotNull(revision).toString(), commitRootDepth);
     }
 
+    public static void setDeleted(@Nonnull UpdateOp op,
+                                  @Nonnull Revision revision,
+                                  boolean deleted) {
+        checkNotNull(op).setMapEntry(DELETED, checkNotNull(revision).toString(), String.valueOf(deleted));
+    }
+
     //----------------------------< internal >----------------------------------
 
     /**
@@ -603,28 +774,33 @@ public class NodeDocument extends Docume
     }
 
     /**
-     * TODO: turn into instance method?
      * Returns <code>true</code> if the given revision
-     * {@link Utils#isCommitted(String)} in the revisions map and is visible
-     * from the <code>readRevision</code>.
+     * {@link Utils#isCommitted(String)} in the revisions map (including
+     * revisions split off to previous documents) and is visible from the
+     * <code>readRevision</code>.
      *
      * @param revision  the revision to check.
      * @param readRevision the read revision.
-     * @param revisions the revisions map, or <code>null</code> if none is set.
      * @return <code>true</code> if the revision is committed, otherwise
      *         <code>false</code>.
      */
-    private static boolean isCommitted(@Nonnull RevisionContext context,
-                                       @Nonnull Revision revision,
-                                       @Nonnull Revision readRevision,
-                                       @Nullable Map<String, String> revisions) {
+    private boolean isCommitted(@Nonnull RevisionContext context,
+                                @Nonnull Revision revision,
+                                @Nonnull Revision readRevision) {
         if (revision.equals(readRevision)) {
             return true;
         }
-        if (revisions == null) {
-            return false;
+        String r = revision.toString();
+        String value = getRevisionsMap().get(r);
+        if (value == null) {
+            // check previous
+            for (NodeDocument prev : getPreviousDocs(revision)) {
+                value = prev.getRevisionsMap().get(r);
+                if (value != null) {
+                    break;
+                }
+            }
         }
-        String value = revisions.get(revision.toString());
         if (value == null) {
             return false;
         }
@@ -648,8 +824,8 @@ public class NodeDocument extends Docume
     }
 
     private static boolean includeRevision(RevisionContext context,
-                                    Revision x,
-                                    Revision requestRevision) {
+                                           Revision x,
+                                           Revision requestRevision) {
         Branch b = context.getBranches().getBranch(x);
         if (b != null) {
             // only include if requested revision is also a branch revision
@@ -682,6 +858,7 @@ public class NodeDocument extends Docume
      * @param max the maximum revision
      * @return the value, or null if not found
      */
+    @CheckForNull
     private String getLatestValue(@Nonnull RevisionContext context,
                                   @Nonnull Map<String, String> valueMap,
                                   @Nullable Revision min,
@@ -703,4 +880,189 @@ public class NodeDocument extends Docume
         }
         return value;
     }
+
+    private Map<String, String> getRevisions() {
+        final Map<String, String> map = getRevisionsMap();
+        if (!data.containsKey(PREVIOUS)) {
+            return map;
+        }
+        final Set<Map.Entry<String, String>> revisions
+                = new AbstractSet<Map.Entry<String, String>>() {
+
+            @Override
+            @Nonnull
+            public Iterator<Map.Entry<String, String>> iterator() {
+                return Iterators.concat(map.entrySet().iterator(),
+                        Iterators.concat(new Iterator<Iterator<Map.Entry<String, String>>>() {
+                            private final Iterator<NodeDocument> previous
+                                    = getPreviousDocs(null).iterator();
+
+                            @Override
+                            public boolean hasNext() {
+                                return previous.hasNext();
+                            }
+
+                            @Override
+                            public Iterator<Map.Entry<String, String>> next() {
+                                return previous.next().getRevisions().entrySet().iterator();
+                            }
+
+                            @Override
+                            public void remove() {
+                                throw new UnsupportedOperationException();
+                            }
+                        }));
+            }
+
+            @Override
+            public int size() {
+                int size = map.size();
+                for (NodeDocument prev : getPreviousDocs(null)) {
+                    size += prev.getRevisions().size();
+                }
+                return size;
+            }
+        };
+        return new AbstractMap<String, String>() {
+
+            private final Map<String, String> map = getRevisionsMap();
+
+            @Override
+            @Nonnull
+            public Set<Entry<String, String>> entrySet() {
+                return revisions;
+            }
+
+            @Override
+            public String get(Object key) {
+                // first check revisions map of this document
+                String value = map.get(key);
+                if (value != null) {
+                    return value;
+                }
+                Revision r = Revision.fromString(key.toString());
+                for (NodeDocument prev : getPreviousDocs(r)) {
+                    value = prev.getRevisions().get(key);
+                    if (value != null) {
+                        return value;
+                    }
+                }
+                // not found
+                return null;
+            }
+
+            @Override
+            public boolean containsKey(Object key) {
+                // can use get()
+                // the revisions map does not have null values
+                return get(key) != null;
+            }
+
+        };
+    }
+
+    @Nonnull
+    Map<String, String> getRevisionsMap() {
+        @SuppressWarnings("unchecked")
+        Map<String, String> map = (Map<String, String>) get(REVISIONS);
+        if (map == null) {
+            map = Collections.emptyMap();
+        }
+        return map;
+    }
+
+    /**
+     * Returns previous {@link NodeDocument}, which include the given revision.
+     * If the <code>revision</code> is <code>null</code>, then all previous
+     * documents are returned.
+     *
+     * @param revision the revision to match or <code>null</code>.
+     * @return previous documents.
+     */
+    private Iterable<NodeDocument> getPreviousDocs(final @Nullable Revision revision) {
+        Iterable<NodeDocument> docs = Iterables.transform(
+                Iterables.filter(getPreviousRanges().entrySet(),
+                new Predicate<Map.Entry<Revision, Range>>() {
+            @Override
+            public boolean apply(Map.Entry<Revision, Range> input) {
+                return revision == null || input.getValue().includes(revision);
+            }
+        }), new Function<Map.Entry<Revision, Range>, NodeDocument>() {
+            @Nullable
+            @Override
+            public NodeDocument apply(Map.Entry<Revision, Range> input) {
+                Revision r = input.getKey();
+                String prevId = Utils.getPreviousIdFor(getId(), r);
+                NodeDocument prev = store.find(Collection.NODES, prevId);
+                if (prev == null) {
+                    log.warn("Document with previous revisions not found: " + prevId);
+                }
+                return prev;
+            }
+        });
+        // filter out null docs and check if the revision is actually in there
+        return Iterables.filter(docs, new Predicate<NodeDocument>() {
+            @Override
+            public boolean apply(@Nullable NodeDocument input) {
+                if (input == null) {
+                    return false;
+                }
+                return revision == null || input.containsRevision(revision.toString());
+            }
+        });
+    }
+
+    /**
+     * Returns previous revision ranges for this document. The revision keys are
+     * sorted descending, newest first!
+     *
+     * @return the previous ranges for this document.
+     */
+    @Nonnull
+    private SortedMap<Revision, Range> getPreviousRanges() {
+        @SuppressWarnings("unchecked")
+        SortedMap<Revision, Range> previous = (SortedMap<Revision, Range>) get(PREVIOUS);
+        if (previous == null) {
+            previous = EMPTY_RANGE_MAP;
+        }
+        return previous;
+    }
+
+    private static final class Range {
+
+        final Revision high;
+        final Revision low;
+
+        /**
+         * A range of revisions, with both inclusive bounds.
+         *
+         * @param high the high bound.
+         * @param low the low bound.
+         */
+        Range(@Nonnull Revision high, @Nonnull Revision low) {
+            this.high = checkNotNull(high);
+            this.low = checkNotNull(low);
+            checkArgument(high.getClusterId() == low.getClusterId(),
+                    "Revisions from have the same clusterId");
+            checkArgument(high.compareRevisionTime(low) > 0,
+                    "High Revision must be later than low Revision");
+        }
+
+        /**
+         * Returns <code>true</code> if the given revision is within this range.
+         *
+         * @param r the revision to check.
+         * @return <code>true</code> if within this range; <code>false</code>
+         * otherwise.
+         */
+        boolean includes(Revision r) {
+            return high.compareRevisionTime(r) >= 0
+                    && low.compareRevisionTime(r) <= 0;
+        }
+
+        @Override
+        public String toString() {
+            return low.toString();
+        }
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/util/Utils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/util/Utils.java?rev=1521438&r1=1521437&r2=1521438&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/util/Utils.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/util/Utils.java Tue Sep 10 11:37:48 2013
@@ -60,14 +60,14 @@ public class Utils {
     }
 
     @SuppressWarnings("unchecked")
-    public static int estimateMemoryUsage(Map<String, Object> map) {
+    public static int estimateMemoryUsage(Map<?, Object> map) {
         if (map == null) {
             return 0;
         }
         int size = 0;
 
-        for (Entry<String, Object> e : map.entrySet()) {
-            size += e.getKey().length() * 2;
+        for (Entry<?, Object> e : map.entrySet()) {
+            size += e.getKey().toString().length() * 2;
             Object o = e.getValue();
             if (o instanceof String) {
                 size += ((String) o).length() * 2;
@@ -180,6 +180,10 @@ public class Utils {
         return id.substring(index + 1);
     }
 
+    public static String getPreviousIdFor(String id, Revision r) {
+        return getIdFromPath("p" + PathUtils.concat(getPathFromId(id), r.toString()));
+    }
+
     /**
      * Deep copy of a map that may contain map values.
      * 

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/BaseMongoMKTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/BaseMongoMKTest.java?rev=1521438&r1=1521437&r2=1521438&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/BaseMongoMKTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/BaseMongoMKTest.java Tue Sep 10 11:37:48 2013
@@ -17,6 +17,7 @@
 package org.apache.jackrabbit.oak.plugins.mongomk;
 
 import org.apache.jackrabbit.mk.api.MicroKernel;
+import org.junit.After;
 import org.junit.Before;
 
 /**
@@ -35,4 +36,12 @@ public class BaseMongoMKTest extends Mon
     protected MicroKernel getMicroKernel() {
         return mk;
     }
+
+    @After
+    public void disposeMongoMK() {
+        if (mk != null) {
+            mk.dispose();
+            mk = null;
+        }
+    }
 }

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java?rev=1521438&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java Tue Sep 10 11:37:48 2013
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.mongomk;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Check correct splitting of documents (OAK-926).
+ */
+public class DocumentSplitTest extends BaseMongoMKTest {
+
+    @Test
+    public void splitRevisions() throws Exception {
+        DocumentStore store = mk.getDocumentStore();
+        Set<String> revisions = Sets.newHashSet();
+        NodeDocument doc = store.find(Collection.NODES, Utils.getIdFromPath("/"));
+        assertNotNull(doc);
+        revisions.addAll(doc.getRevisionsMap().keySet());
+        int numRevs = 1; // MongoMK initializes with a root node with a single revision
+        revisions.add(mk.commit("/", "+\"foo\":{}+\"bar\":{}", null, null));
+        numRevs++;
+        // create nodes
+        while (numRevs++ <= NodeDocument.REVISIONS_SPLIT_OFF_SIZE) {
+            revisions.add(mk.commit("/", "+\"foo/node-" + numRevs + "\":{}" +
+                    "+\"bar/node-" + numRevs + "\":{}", null, null));
+        }
+        mk.runBackgroundOperations();
+        String head = mk.getHeadRevision();
+        doc = store.find(Collection.NODES, Utils.getIdFromPath("/"));
+        assertNotNull(doc);
+        Map<String, String> revs = doc.getRevisionsMap();
+        // one remaining in the local revisions map
+        assertEquals(1, revs.size());
+        for (String r : revisions) {
+            Revision rev = Revision.fromString(r);
+            assertTrue(doc.containsRevision(rev));
+            assertTrue(doc.isCommitted(rev));
+        }
+        // check if document is still there
+        assertNotNull(doc.getNodeAtRevision(mk, Revision.fromString(head)));
+        mk.commit("/", "+\"baz\":{}", null, null);
+        mk.setAsyncDelay(0);
+        mk.backgroundWrite();
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/util/UtilsTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/util/UtilsTest.java?rev=1521438&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/util/UtilsTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/util/UtilsTest.java Tue Sep 10 11:37:48 2013
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.mongomk.util;
+
+import org.apache.jackrabbit.oak.plugins.mongomk.Revision;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link Utils}.
+ */
+public class UtilsTest {
+
+    @Test
+    public void getPreviousIdFor() {
+        Revision r = new Revision(System.currentTimeMillis(), 0, 0);
+        assertEquals("1:p/" + r.toString(), Utils.getPreviousIdFor("0:/", r));
+        assertEquals("2:p/test/" + r.toString(), Utils.getPreviousIdFor("1:/test", r));
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/util/UtilsTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/util/UtilsTest.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL



Mime
View raw message