jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mreut...@apache.org
Subject svn commit: r1567066 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/ test/java/org/apache/jackrabbit/oak/plugins/document/
Date Tue, 11 Feb 2014 11:44:08 GMT
Author: mreutegg
Date: Tue Feb 11 11:44:07 2014
New Revision: 1567066

URL: http://svn.apache.org/r1567066
Log:
OAK-1406: Background operations block writes

Only acquire exclusive lock when needed. Invalidate cache in background
thread without holding lock.

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1567066&r1=1567065&r2=1567066&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
Tue Feb 11 11:44:07 2014
@@ -89,7 +89,7 @@ public class DocumentMK implements Micro
     }
 
     void backgroundRead() {
-        nodeStore.backgroundRead();
+        nodeStore.backgroundRead(true);
     }
 
     void backgroundWrite() {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1567066&r1=1567065&r2=1567066&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
Tue Feb 11 11:44:07 2014
@@ -26,21 +26,17 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.ref.WeakReference;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -53,6 +49,8 @@ import com.google.common.cache.Cache;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 import org.apache.jackrabbit.mk.api.MicroKernelException;
 import org.apache.jackrabbit.mk.blobs.BlobStore;
 import org.apache.jackrabbit.mk.json.JsopStream;
@@ -84,11 +82,6 @@ import org.slf4j.LoggerFactory;
 public final class DocumentNodeStore
         implements NodeStore, RevisionContext, Observable {
     
-    /**
-     * The maximum number of document to update at once in a multi update.
-     */
-    static final int BACKGROUND_MULTI_UPDATE_LIMIT = 10000;
-
     private static final Logger LOG = LoggerFactory.getLogger(DocumentNodeStore.class);
 
     /**
@@ -325,7 +318,7 @@ public final class DocumentNodeStore
             // initialize branchCommits
             branches.init(store, this);
             // initial reading of the revisions of other cluster nodes
-            backgroundRead();
+            backgroundRead(false);
             if (headRevision == null) {
                 // no revision read from other cluster nodes
                 setHeadRevision(newRevision());
@@ -796,7 +789,7 @@ public final class DocumentNodeStore
         if (isNew) {
             CacheValue key = childNodeCacheKey(path, rev, null);
             Node.Children c = new Node.Children();
-            TreeSet<String> set = new TreeSet<String>(added);
+            Set<String> set = Sets.newTreeSet(added);
             set.removeAll(removed);
             for (String p : added) {
                 set.add(Utils.unshareString(p));
@@ -824,7 +817,7 @@ public final class DocumentNodeStore
             NodeDocument.Children docChildren = docChildrenCache.getIfPresent(docChildrenKey);
             if (docChildren != null) {
                 int currentSize = docChildren.childNames.size();
-                TreeSet<String> names = new TreeSet<String>(docChildren.childNames);
+                NavigableSet<String> names = Sets.newTreeSet(docChildren.childNames);
                 // incomplete cache entries must not be updated with
                 // names at the end of the list because there might be
                 // a next name in DocumentStore smaller than the one added
@@ -922,7 +915,7 @@ public final class DocumentNodeStore
             // first revision is the ancestor (tailSet is inclusive)
             // do not undo changes for this revision
             Revision base = it.next();
-            Map<String, UpdateOp> operations = new HashMap<String, UpdateOp>();
+            Map<String, UpdateOp> operations = Maps.newHashMap();
             while (it.hasNext()) {
                 Revision reset = it.next();
                 getRoot(reset).compareAgainstBaseState(getRoot(base),
@@ -1227,25 +1220,12 @@ public final class DocumentNodeStore
             return;
         }
         try {
-            
-            // does not create new revisions
+            // split documents (does not create new revisions)
             backgroundSplit();
-            
-            // we need to protect backgroundRead as well,
-            // as increment set the head revision in the read operation
-            // (the read operation might see changes from other cluster nodes,
-            // and so create a new head revision for the current cluster node,
-            // to order revisions)
-            Lock writeLock = backgroundOperationLock.writeLock();
-            writeLock.lock();
-            try {
-                backgroundWrite();
-                backgroundRead();
-                dispatcher.contentChanged(getRoot(), null);
-            } finally {
-                writeLock.unlock();
-            }
-            
+            // write back pending updates to _lastRev
+            backgroundWrite();
+            // pull in changes from other cluster nodes
+            backgroundRead(true);
         } catch (RuntimeException e) {
             if (isDisposed.get()) {
                 return;
@@ -1261,7 +1241,13 @@ public final class DocumentNodeStore
         clusterNodeInfo.renewLease(asyncDelay);
     }
 
-    void backgroundRead() {
+    /**
+     * Perform a background read and make external changes visible.
+     *
+     * @param dispatchChange whether to dispatch external changes
+     *                       to {@link #dispatcher}.
+     */
+    void backgroundRead(boolean dispatchChange) {
         String id = Utils.getIdFromPath("/");
         NodeDocument doc = store.find(Collection.NODES, id, asyncDelay);
         if (doc == null) {
@@ -1270,37 +1256,51 @@ public final class DocumentNodeStore
         Map<Integer, Revision> lastRevMap = doc.getLastRev();
 
         Revision.RevisionComparator revisionComparator = getRevisionComparator();
-        boolean hasNewRevisions = false;
         // the (old) head occurred first
         Revision headSeen = Revision.newRevision(0);
         // then we saw this new revision (from another cluster node)
         Revision otherSeen = Revision.newRevision(0);
+
+        Map<Revision, Revision> externalChanges = Maps.newHashMap();
         for (Map.Entry<Integer, Revision> e : lastRevMap.entrySet()) {
             int machineId = e.getKey();
             if (machineId == clusterId) {
+                // ignore own lastRev
                 continue;
             }
             Revision r = e.getValue();
             Revision last = lastKnownRevision.get(machineId);
             if (last == null || r.compareRevisionTime(last) > 0) {
-                if (!hasNewRevisions) {
-                    // publish our revision once before any foreign revision
-
-                    // the latest revisions of the current cluster node
-                    // happened before the latest revisions of other cluster nodes
-                    revisionComparator.add(newRevision(), headSeen);
-                }
-                hasNewRevisions = true;
                 lastKnownRevision.put(machineId, r);
-                revisionComparator.add(r, otherSeen);
+                externalChanges.put(r, otherSeen);
             }
         }
-        if (hasNewRevisions) {
+
+        if (!externalChanges.isEmpty()) {
+            // invalidate caches
             store.invalidateCache();
             // TODO only invalidate affected items
             docChildrenCache.invalidateAll();
-            // the head revision is after other revisions
-            setHeadRevision(newRevision());
+
+            // make sure update to revision comparator is atomic
+            // and no local commit is in progress
+            backgroundOperationLock.writeLock().lock();
+            try {
+                // the latest revisions of the current cluster node
+                // happened before the latest revisions of other cluster nodes
+                revisionComparator.add(newRevision(), headSeen);
+                // then we saw other revisions
+                for (Map.Entry<Revision, Revision> e : externalChanges.entrySet())
{
+                    revisionComparator.add(e.getKey(), e.getValue());
+                }
+                // the new head revision is after other revisions
+                setHeadRevision(newRevision());
+                if (dispatchChange) {
+                    dispatcher.contentChanged(getRoot(), null);
+                }
+            } finally {
+                backgroundOperationLock.writeLock().unlock();
+            }
         }
         revisionComparator.purge(Revision.getCurrentTimestamp() - REMEMBER_REVISION_ORDER_MILLIS);
     }
@@ -1330,50 +1330,13 @@ public final class DocumentNodeStore
         if (unsavedLastRevisions.getPaths().isEmpty()) {
             return;
         }
-        ArrayList<String> paths = new ArrayList<String>(unsavedLastRevisions.getPaths());
-        // sort by depth (high depth first), then path
-        Collections.sort(paths, PathComparator.INSTANCE);
-
-        UpdateOp updateOp = null;
-        Revision lastRev = null;
-        List<String> ids = new ArrayList<String>();
-        for (int i = 0; i < paths.size(); ) {
-            String p = paths.get(i);
-            Revision r = unsavedLastRevisions.get(p);
-            if (r == null) {
-                i++;
-                continue;
-            }
-            int size = ids.size();
-            if (updateOp == null) {
-                // create UpdateOp
-                Commit commit = new Commit(this, null, r);
-                commit.touchNode(p);
-                updateOp = commit.getUpdateOperationForNode(p);
-                lastRev = r;
-                ids.add(Utils.getIdFromPath(p));
-                i++;
-            } else if (r.equals(lastRev)) {
-                // use multi update when possible
-                ids.add(Utils.getIdFromPath(p));
-                i++;
-            }
-            // call update if any of the following is true:
-            // - this is the second-to-last or last path (update last path, the
-            //   root document, individually)
-            // - revision is not equal to last revision (size of ids didn't change)
-            // - the update limit is reached
-            if (i + 2 > paths.size()
-                    || size == ids.size()
-                    || ids.size() >= BACKGROUND_MULTI_UPDATE_LIMIT) {
-                store.update(Collection.NODES, ids, updateOp);
-                for (String id : ids) {
-                    unsavedLastRevisions.remove(Utils.getPathFromId(id));
-                }
-                ids.clear();
-                updateOp = null;
-                lastRev = null;
-            }
+        // write back the pending _lastRevs with an exclusive lock to
+        // ensure consistency
+        backgroundOperationLock.writeLock().lock();
+        try {
+            unsavedLastRevisions.persist(this);
+        } finally {
+            backgroundOperationLock.writeLock().unlock();
         }
     }
 
@@ -1435,7 +1398,7 @@ public final class DocumentNodeStore
         long minValue = Commit.getModified(minTimestamp);
         String fromKey = Utils.getKeyLowerLimit(path);
         String toKey = Utils.getKeyUpperLimit(path);
-        Set<String> paths = new HashSet<String>();
+        Set<String> paths = Sets.newHashSet();
         for (NodeDocument doc : store.query(Collection.NODES, fromKey, toKey,
                 NodeDocument.MODIFIED, minValue, Integer.MAX_VALUE)) {
             paths.add(Utils.getPathFromId(doc.getId()));
@@ -1495,7 +1458,7 @@ public final class DocumentNodeStore
     }
 
     private void diffFewChildren(JsopWriter w, Node.Children fromChildren, Revision fromRev,
Node.Children toChildren, Revision toRev) {
-        Set<String> childrenSet = new HashSet<String>(toChildren.children);
+        Set<String> childrenSet = Sets.newHashSet(toChildren.children);
         for (String n : fromChildren.children) {
             if (!childrenSet.contains(n)) {
                 w.tag('-').value(n).newline();
@@ -1513,7 +1476,7 @@ public final class DocumentNodeStore
                 }
             }
         }
-        childrenSet = new HashSet<String>(fromChildren.children);
+        childrenSet = Sets.newHashSet(fromChildren.children);
         for (String n : toChildren.children) {
             if (!childrenSet.contains(n)) {
                 w.tag('+').key(n).object().endObject().newline();

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java?rev=1567066&r1=1567065&r2=1567066&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
Tue Feb 11 11:44:07 2014
@@ -547,22 +547,6 @@ public class NodeDocument extends Docume
     }
 
     /**
-     * Returns <code>true</code> if this node is considered deleted at the
-     * given <code>readRevision</code>.
-     *
-     * @param context the revision context.
-     * @param readRevision the read revision.
-     * @param validRevisions the set of revisions already checked against
-     *                       <code>readRevision</code> and considered valid.
-     * @return <code>true</code> if deleted, <code>false</code> otherwise.
-     */
-    public boolean isDeleted(RevisionContext context,
-                             Revision readRevision,
-                             Set<Revision> validRevisions) {
-        return getLiveRevision(context, readRevision, validRevisions) == null;
-    }
-
-    /**
      * Get the earliest (oldest) revision where the node was alive at or before
      * the provided revision, if the node was alive at the given revision.
      * 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java?rev=1567066&r1=1567065&r2=1567066&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
Tue Feb 11 11:44:07 2014
@@ -16,19 +16,24 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
 
 /**
  * Keeps track of when nodes where last modified. To be persisted later by
@@ -36,6 +41,11 @@ import static com.google.common.base.Pre
  */
 class UnsavedModifications {
 
+    /**
+     * The maximum number of document to update at once in a multi update.
+     */
+    static final int BACKGROUND_MULTI_UPDATE_LIMIT = 10000;
+
     private final ConcurrentHashMap<String, Revision> map = new ConcurrentHashMap<String,
Revision>();
 
     /**
@@ -76,11 +86,6 @@ class UnsavedModifications {
         return map.get(path);
     }
 
-    @CheckForNull
-    public Revision remove(String path) {
-        return map.remove(path);
-    }
-
     @Nonnull
     public Collection<String> getPaths() {
         return map.keySet();
@@ -127,4 +132,63 @@ class UnsavedModifications {
             });
         }
     }
+
+    /**
+     * Persist the pending changes to _lastRev to the given store. This method
+     * does not guarantee consistency when there are concurrent updates on
+     * this instance through {@link #put(String, Revision)}. The caller must
+     * use proper synchronization to ensure no paths are added while this method
+     * is called.
+     *
+     * @param store the document node store.
+     */
+    public void persist(@Nonnull DocumentNodeStore store) {
+        checkNotNull(store);
+
+        ArrayList<String> paths = new ArrayList<String>(getPaths());
+        // sort by depth (high depth first), then path
+        Collections.sort(paths, PathComparator.INSTANCE);
+
+        UpdateOp updateOp = null;
+        Revision lastRev = null;
+        List<String> ids = new ArrayList<String>();
+        for (int i = 0; i < paths.size(); ) {
+            String p = paths.get(i);
+            Revision r = map.get(p);
+            if (r == null) {
+                i++;
+                continue;
+            }
+            int size = ids.size();
+            if (updateOp == null) {
+                // create UpdateOp
+                Commit commit = new Commit(store, null, r);
+                commit.touchNode(p);
+                updateOp = commit.getUpdateOperationForNode(p);
+                lastRev = r;
+                ids.add(Utils.getIdFromPath(p));
+                i++;
+            } else if (r.equals(lastRev)) {
+                // use multi update when possible
+                ids.add(Utils.getIdFromPath(p));
+                i++;
+            }
+            // call update if any of the following is true:
+            // - this is the second-to-last or last path (update last path, the
+            //   root document, individually)
+            // - revision is not equal to last revision (size of ids didn't change)
+            // - the update limit is reached
+            if (i + 2 > paths.size()
+                    || size == ids.size()
+                    || ids.size() >= BACKGROUND_MULTI_UPDATE_LIMIT) {
+                store.getDocumentStore().update(NODES, ids, updateOp);
+                for (String id : ids) {
+                    map.remove(Utils.getPathFromId(id));
+                }
+                ids.clear();
+                updateOp = null;
+                lastRev = null;
+            }
+        }
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java?rev=1567066&r1=1567065&r2=1567066&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java
Tue Feb 11 11:44:07 2014
@@ -35,7 +35,7 @@ public class BackgroundWriteTest {
                 new TestStore()).setAsyncDelay(0).open();
         List<String> paths = new ArrayList<String>();
         StringBuilder sb = new StringBuilder();
-        for (int i = 0; paths.size() < DocumentNodeStore.BACKGROUND_MULTI_UPDATE_LIMIT
* 2; i++) {
+        for (int i = 0; paths.size() < UnsavedModifications.BACKGROUND_MULTI_UPDATE_LIMIT
* 2; i++) {
             String child = "node-" + i;
             sb.append("+\"").append(child).append("\":{}");
             for (int j = 0; j < 1000; j++) {
@@ -62,7 +62,7 @@ public class BackgroundWriteTest {
         public <T extends Document> void update(Collection<T> collection,
                                                 List<String> keys,
                                                 UpdateOp updateOp) {
-            assertTrue(keys.size() <= DocumentNodeStore.BACKGROUND_MULTI_UPDATE_LIMIT);
+            assertTrue(keys.size() <= UnsavedModifications.BACKGROUND_MULTI_UPDATE_LIMIT);
             super.update(collection, keys, updateOp);
         }
     }



Mime
View raw message