Return-Path: X-Original-To: apmail-jackrabbit-oak-commits-archive@minotaur.apache.org Delivered-To: apmail-jackrabbit-oak-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D1AF610195 for ; Tue, 11 Feb 2014 11:44:36 +0000 (UTC) Received: (qmail 80864 invoked by uid 500); 11 Feb 2014 11:44:36 -0000 Delivered-To: apmail-jackrabbit-oak-commits-archive@jackrabbit.apache.org Received: (qmail 80821 invoked by uid 500); 11 Feb 2014 11:44:33 -0000 Mailing-List: contact oak-commits-help@jackrabbit.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: oak-dev@jackrabbit.apache.org Delivered-To: mailing list oak-commits@jackrabbit.apache.org Received: (qmail 80813 invoked by uid 99); 11 Feb 2014 11:44:30 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Feb 2014 11:44:30 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Feb 2014 11:44:28 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 379D923889FD; Tue, 11 Feb 2014 11:44:08 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1567066 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/ test/java/org/apache/jackrabbit/oak/plugins/document/ Date: Tue, 11 Feb 2014 11:44:08 -0000 To: oak-commits@jackrabbit.apache.org From: mreutegg@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140211114408.379D923889FD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mreutegg Date: Tue Feb 11 11:44:07 2014 New Revision: 1567066 URL: http://svn.apache.org/r1567066 Log: OAK-1406: Background operations block writes Only acquire exclusive lock when needed. Invalidate cache in background thread without holding lock. Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1567066&r1=1567065&r2=1567066&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java Tue Feb 11 11:44:07 2014 @@ -89,7 +89,7 @@ public class DocumentMK implements Micro } void backgroundRead() { - nodeStore.backgroundRead(); + nodeStore.backgroundRead(true); } void backgroundWrite() { Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1567066&r1=1567065&r2=1567066&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Tue Feb 11 11:44:07 2014 @@ -26,21 +26,17 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.lang.ref.WeakReference; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -53,6 +49,8 @@ import com.google.common.cache.Cache; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + import org.apache.jackrabbit.mk.api.MicroKernelException; import org.apache.jackrabbit.mk.blobs.BlobStore; import org.apache.jackrabbit.mk.json.JsopStream; @@ -84,11 +82,6 @@ import org.slf4j.LoggerFactory; public final class DocumentNodeStore implements NodeStore, RevisionContext, Observable { - /** - * The maximum number of document to update at once in a multi update. - */ - static final int BACKGROUND_MULTI_UPDATE_LIMIT = 10000; - private static final Logger LOG = LoggerFactory.getLogger(DocumentNodeStore.class); /** @@ -325,7 +318,7 @@ public final class DocumentNodeStore // initialize branchCommits branches.init(store, this); // initial reading of the revisions of other cluster nodes - backgroundRead(); + backgroundRead(false); if (headRevision == null) { // no revision read from other cluster nodes setHeadRevision(newRevision()); @@ -796,7 +789,7 @@ public final class DocumentNodeStore if (isNew) { CacheValue key = childNodeCacheKey(path, rev, null); Node.Children c = new Node.Children(); - TreeSet set = new TreeSet(added); + Set set = Sets.newTreeSet(added); set.removeAll(removed); for (String p : added) { set.add(Utils.unshareString(p)); @@ -824,7 +817,7 @@ public final class DocumentNodeStore NodeDocument.Children docChildren = docChildrenCache.getIfPresent(docChildrenKey); if (docChildren != null) { int currentSize = docChildren.childNames.size(); - TreeSet names = new TreeSet(docChildren.childNames); + NavigableSet names = Sets.newTreeSet(docChildren.childNames); // incomplete cache entries must not be updated with // names at the end of the list because there might be // a next name in DocumentStore smaller than the one added @@ -922,7 +915,7 @@ public final class DocumentNodeStore // first revision is the ancestor (tailSet is inclusive) // do not undo changes for this revision Revision base = it.next(); - Map operations = new HashMap(); + Map operations = Maps.newHashMap(); while (it.hasNext()) { Revision reset = it.next(); getRoot(reset).compareAgainstBaseState(getRoot(base), @@ -1227,25 +1220,12 @@ public final class DocumentNodeStore return; } try { - - // does not create new revisions + // split documents (does not create new revisions) backgroundSplit(); - - // we need to protect backgroundRead as well, - // as increment set the head revision in the read operation - // (the read operation might see changes from other cluster nodes, - // and so create a new head revision for the current cluster node, - // to order revisions) - Lock writeLock = backgroundOperationLock.writeLock(); - writeLock.lock(); - try { - backgroundWrite(); - backgroundRead(); - dispatcher.contentChanged(getRoot(), null); - } finally { - writeLock.unlock(); - } - + // write back pending updates to _lastRev + backgroundWrite(); + // pull in changes from other cluster nodes + backgroundRead(true); } catch (RuntimeException e) { if (isDisposed.get()) { return; @@ -1261,7 +1241,13 @@ public final class DocumentNodeStore clusterNodeInfo.renewLease(asyncDelay); } - void backgroundRead() { + /** + * Perform a background read and make external changes visible. + * + * @param dispatchChange whether to dispatch external changes + * to {@link #dispatcher}. + */ + void backgroundRead(boolean dispatchChange) { String id = Utils.getIdFromPath("/"); NodeDocument doc = store.find(Collection.NODES, id, asyncDelay); if (doc == null) { @@ -1270,37 +1256,51 @@ public final class DocumentNodeStore Map lastRevMap = doc.getLastRev(); Revision.RevisionComparator revisionComparator = getRevisionComparator(); - boolean hasNewRevisions = false; // the (old) head occurred first Revision headSeen = Revision.newRevision(0); // then we saw this new revision (from another cluster node) Revision otherSeen = Revision.newRevision(0); + + Map externalChanges = Maps.newHashMap(); for (Map.Entry e : lastRevMap.entrySet()) { int machineId = e.getKey(); if (machineId == clusterId) { + // ignore own lastRev continue; } Revision r = e.getValue(); Revision last = lastKnownRevision.get(machineId); if (last == null || r.compareRevisionTime(last) > 0) { - if (!hasNewRevisions) { - // publish our revision once before any foreign revision - - // the latest revisions of the current cluster node - // happened before the latest revisions of other cluster nodes - revisionComparator.add(newRevision(), headSeen); - } - hasNewRevisions = true; lastKnownRevision.put(machineId, r); - revisionComparator.add(r, otherSeen); + externalChanges.put(r, otherSeen); } } - if (hasNewRevisions) { + + if (!externalChanges.isEmpty()) { + // invalidate caches store.invalidateCache(); // TODO only invalidate affected items docChildrenCache.invalidateAll(); - // the head revision is after other revisions - setHeadRevision(newRevision()); + + // make sure update to revision comparator is atomic + // and no local commit is in progress + backgroundOperationLock.writeLock().lock(); + try { + // the latest revisions of the current cluster node + // happened before the latest revisions of other cluster nodes + revisionComparator.add(newRevision(), headSeen); + // then we saw other revisions + for (Map.Entry e : externalChanges.entrySet()) { + revisionComparator.add(e.getKey(), e.getValue()); + } + // the new head revision is after other revisions + setHeadRevision(newRevision()); + if (dispatchChange) { + dispatcher.contentChanged(getRoot(), null); + } + } finally { + backgroundOperationLock.writeLock().unlock(); + } } revisionComparator.purge(Revision.getCurrentTimestamp() - REMEMBER_REVISION_ORDER_MILLIS); } @@ -1330,50 +1330,13 @@ public final class DocumentNodeStore if (unsavedLastRevisions.getPaths().isEmpty()) { return; } - ArrayList paths = new ArrayList(unsavedLastRevisions.getPaths()); - // sort by depth (high depth first), then path - Collections.sort(paths, PathComparator.INSTANCE); - - UpdateOp updateOp = null; - Revision lastRev = null; - List ids = new ArrayList(); - for (int i = 0; i < paths.size(); ) { - String p = paths.get(i); - Revision r = unsavedLastRevisions.get(p); - if (r == null) { - i++; - continue; - } - int size = ids.size(); - if (updateOp == null) { - // create UpdateOp - Commit commit = new Commit(this, null, r); - commit.touchNode(p); - updateOp = commit.getUpdateOperationForNode(p); - lastRev = r; - ids.add(Utils.getIdFromPath(p)); - i++; - } else if (r.equals(lastRev)) { - // use multi update when possible - ids.add(Utils.getIdFromPath(p)); - i++; - } - // call update if any of the following is true: - // - this is the second-to-last or last path (update last path, the - // root document, individually) - // - revision is not equal to last revision (size of ids didn't change) - // - the update limit is reached - if (i + 2 > paths.size() - || size == ids.size() - || ids.size() >= BACKGROUND_MULTI_UPDATE_LIMIT) { - store.update(Collection.NODES, ids, updateOp); - for (String id : ids) { - unsavedLastRevisions.remove(Utils.getPathFromId(id)); - } - ids.clear(); - updateOp = null; - lastRev = null; - } + // write back the pending _lastRevs with an exclusive lock to + // ensure consistency + backgroundOperationLock.writeLock().lock(); + try { + unsavedLastRevisions.persist(this); + } finally { + backgroundOperationLock.writeLock().unlock(); } } @@ -1435,7 +1398,7 @@ public final class DocumentNodeStore long minValue = Commit.getModified(minTimestamp); String fromKey = Utils.getKeyLowerLimit(path); String toKey = Utils.getKeyUpperLimit(path); - Set paths = new HashSet(); + Set paths = Sets.newHashSet(); for (NodeDocument doc : store.query(Collection.NODES, fromKey, toKey, NodeDocument.MODIFIED, minValue, Integer.MAX_VALUE)) { paths.add(Utils.getPathFromId(doc.getId())); @@ -1495,7 +1458,7 @@ public final class DocumentNodeStore } private void diffFewChildren(JsopWriter w, Node.Children fromChildren, Revision fromRev, Node.Children toChildren, Revision toRev) { - Set childrenSet = new HashSet(toChildren.children); + Set childrenSet = Sets.newHashSet(toChildren.children); for (String n : fromChildren.children) { if (!childrenSet.contains(n)) { w.tag('-').value(n).newline(); @@ -1513,7 +1476,7 @@ public final class DocumentNodeStore } } } - childrenSet = new HashSet(fromChildren.children); + childrenSet = Sets.newHashSet(fromChildren.children); for (String n : toChildren.children) { if (!childrenSet.contains(n)) { w.tag('+').key(n).object().endObject().newline(); Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java?rev=1567066&r1=1567065&r2=1567066&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java Tue Feb 11 11:44:07 2014 @@ -547,22 +547,6 @@ public class NodeDocument extends Docume } /** - * Returns true if this node is considered deleted at the - * given readRevision. - * - * @param context the revision context. - * @param readRevision the read revision. - * @param validRevisions the set of revisions already checked against - * readRevision and considered valid. - * @return true if deleted, false otherwise. - */ - public boolean isDeleted(RevisionContext context, - Revision readRevision, - Set validRevisions) { - return getLiveRevision(context, readRevision, validRevisions) == null; - } - - /** * Get the earliest (oldest) revision where the node was alive at or before * the provided revision, if the node was alive at the given revision. * Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java?rev=1567066&r1=1567065&r2=1567066&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java Tue Feb 11 11:44:07 2014 @@ -16,19 +16,24 @@ */ package org.apache.jackrabbit.oak.plugins.document; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; +import org.apache.jackrabbit.oak.plugins.document.util.Utils; + import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; /** * Keeps track of when nodes where last modified. To be persisted later by @@ -36,6 +41,11 @@ import static com.google.common.base.Pre */ class UnsavedModifications { + /** + * The maximum number of document to update at once in a multi update. + */ + static final int BACKGROUND_MULTI_UPDATE_LIMIT = 10000; + private final ConcurrentHashMap map = new ConcurrentHashMap(); /** @@ -76,11 +86,6 @@ class UnsavedModifications { return map.get(path); } - @CheckForNull - public Revision remove(String path) { - return map.remove(path); - } - @Nonnull public Collection getPaths() { return map.keySet(); @@ -127,4 +132,63 @@ class UnsavedModifications { }); } } + + /** + * Persist the pending changes to _lastRev to the given store. This method + * does not guarantee consistency when there are concurrent updates on + * this instance through {@link #put(String, Revision)}. The caller must + * use proper synchronization to ensure no paths are added while this method + * is called. + * + * @param store the document node store. + */ + public void persist(@Nonnull DocumentNodeStore store) { + checkNotNull(store); + + ArrayList paths = new ArrayList(getPaths()); + // sort by depth (high depth first), then path + Collections.sort(paths, PathComparator.INSTANCE); + + UpdateOp updateOp = null; + Revision lastRev = null; + List ids = new ArrayList(); + for (int i = 0; i < paths.size(); ) { + String p = paths.get(i); + Revision r = map.get(p); + if (r == null) { + i++; + continue; + } + int size = ids.size(); + if (updateOp == null) { + // create UpdateOp + Commit commit = new Commit(store, null, r); + commit.touchNode(p); + updateOp = commit.getUpdateOperationForNode(p); + lastRev = r; + ids.add(Utils.getIdFromPath(p)); + i++; + } else if (r.equals(lastRev)) { + // use multi update when possible + ids.add(Utils.getIdFromPath(p)); + i++; + } + // call update if any of the following is true: + // - this is the second-to-last or last path (update last path, the + // root document, individually) + // - revision is not equal to last revision (size of ids didn't change) + // - the update limit is reached + if (i + 2 > paths.size() + || size == ids.size() + || ids.size() >= BACKGROUND_MULTI_UPDATE_LIMIT) { + store.getDocumentStore().update(NODES, ids, updateOp); + for (String id : ids) { + map.remove(Utils.getPathFromId(id)); + } + ids.clear(); + updateOp = null; + lastRev = null; + } + } + } } Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java?rev=1567066&r1=1567065&r2=1567066&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java (original) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java Tue Feb 11 11:44:07 2014 @@ -35,7 +35,7 @@ public class BackgroundWriteTest { new TestStore()).setAsyncDelay(0).open(); List paths = new ArrayList(); StringBuilder sb = new StringBuilder(); - for (int i = 0; paths.size() < DocumentNodeStore.BACKGROUND_MULTI_UPDATE_LIMIT * 2; i++) { + for (int i = 0; paths.size() < UnsavedModifications.BACKGROUND_MULTI_UPDATE_LIMIT * 2; i++) { String child = "node-" + i; sb.append("+\"").append(child).append("\":{}"); for (int j = 0; j < 1000; j++) { @@ -62,7 +62,7 @@ public class BackgroundWriteTest { public void update(Collection collection, List keys, UpdateOp updateOp) { - assertTrue(keys.size() <= DocumentNodeStore.BACKGROUND_MULTI_UPDATE_LIMIT); + assertTrue(keys.size() <= UnsavedModifications.BACKGROUND_MULTI_UPDATE_LIMIT); super.update(collection, keys, updateOp); } }