Return-Path: X-Original-To: apmail-jackrabbit-oak-commits-archive@minotaur.apache.org Delivered-To: apmail-jackrabbit-oak-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1E8911079D for ; Thu, 13 Feb 2014 15:03:31 +0000 (UTC) Received: (qmail 37721 invoked by uid 500); 13 Feb 2014 15:03:30 -0000 Delivered-To: apmail-jackrabbit-oak-commits-archive@jackrabbit.apache.org Received: (qmail 37671 invoked by uid 500); 13 Feb 2014 15:03:28 -0000 Mailing-List: contact oak-commits-help@jackrabbit.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: oak-dev@jackrabbit.apache.org Delivered-To: mailing list oak-commits@jackrabbit.apache.org Received: (qmail 37662 invoked by uid 99); 13 Feb 2014 15:03:26 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Feb 2014 15:03:26 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Feb 2014 15:03:22 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 5A3D22388860; Thu, 13 Feb 2014 15:03:01 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1567943 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/ test/java/org/apache/jackrabbit/oak/plugins/document/ Date: Thu, 13 Feb 2014 15:03:01 -0000 To: oak-commits@jackrabbit.apache.org From: mreutegg@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140213150301.5A3D22388860@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mreutegg Date: Thu Feb 13 15:03:00 2014 New Revision: 1567943 URL: http://svn.apache.org/r1567943 Log: OAK-1420: ConcurrentAddIT fails on buildbot Potential fix. With these changes ConcurrentConflictTest.concurrentUpdates() now runs successful -> enabled. Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConcurrentConflictTest.java Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java?rev=1567943&r1=1567942&r2=1567943&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java Thu Feb 13 15:03:00 2014 @@ -36,6 +36,8 @@ import org.apache.jackrabbit.oak.commons import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; +import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.COLLISIONS; import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.SPLIT_CANDIDATE_THRESHOLD; /** @@ -243,7 +245,7 @@ public class Commit { try { if (newNodes.size() > 0) { // set commit root on new nodes - if (!store.create(Collection.NODES, newNodes)) { + if (!store.create(NODES, newNodes)) { // some of the documents already exist: // try to apply all changes one by one for (UpdateOp op : newNodes) { @@ -276,14 +278,41 @@ public class Commit { if (changedNodes.size() > 0 || !commitRoot.isNew()) { NodeDocument.setRevision(commitRoot, revision, commitValue); opLog.add(commitRoot); - createOrUpdateNode(store, commitRoot); + if (baseBranchRevision == null) { + // create a clone of the commitRoot in order + // to set isNew to false. If we get here the + // commitRoot document already exists and + // only needs an update + UpdateOp commit = commitRoot.clone(commitRoot.getId()); + commit.setNew(false); + // only set revision on commit root when there is + // no collision for this commit revision + commit.containsMapEntry(COLLISIONS, revision, false); + NodeDocument before = store.findAndUpdate(NODES, commit); + if (before == null) { + String msg = "Conflicting concurrent change. " + + "Update operation failed: " + commitRoot; + throw new MicroKernelException(msg); + } else { + // if we get here the commit was successful and + // the commit revision is set on the commitRoot + // document for this commit. + // now check for conflicts/collisions by other commits. + // use original commitRoot operation with + // correct isNew flag. + checkConflicts(commitRoot, before); + checkSplitCandidate(before); + } + } else { + // this is a branch commit, do not fail on collisions now + // trying to merge the branch will fail later + createOrUpdateNode(store, commitRoot); + } operations.put(commitRootPath, commitRoot); } } catch (MicroKernelException e) { - rollback(newNodes, opLog); - String msg = "Exception committing " + diff.toString(); - LOG.debug(msg, e); - throw new MicroKernelException(msg, e); + rollback(newNodes, opLog, commitRoot); + throw e; } } @@ -309,14 +338,14 @@ public class Commit { if (op.isNew()) { NodeDocument.setChildrenFlag(op, true); } else { - NodeDocument nd = store.getIfCached(Collection.NODES, Utils.getIdFromPath(parentPath)); + NodeDocument nd = store.getIfCached(NODES, Utils.getIdFromPath(parentPath)); if (nd != null && nd.hasChildren()) { continue; } NodeDocument.setChildrenFlag(op, true); } } else { - NodeDocument nd = store.getIfCached(Collection.NODES, Utils.getIdFromPath(parentPath)); + NodeDocument nd = store.getIfCached(NODES, Utils.getIdFromPath(parentPath)); if (nd != null && nd.hasChildren()) { //Flag already set to true. Nothing to do continue; @@ -328,15 +357,20 @@ public class Commit { } } - private void rollback(ArrayList newDocuments, ArrayList changed) { + private void rollback(List newDocuments, + List changed, + UpdateOp commitRoot) { DocumentStore store = nodeStore.getDocumentStore(); for (UpdateOp op : changed) { UpdateOp reverse = op.getReverseOperation(); - store.createOrUpdate(Collection.NODES, reverse); + store.createOrUpdate(NODES, reverse); } for (UpdateOp op : newDocuments) { - store.remove(Collection.NODES, op.id); + store.remove(NODES, op.id); } + UpdateOp removeCollision = new UpdateOp(commitRoot.getId(), false); + NodeDocument.removeCollision(removeCollision, revision); + store.createOrUpdate(NODES, removeCollision); } /** @@ -346,13 +380,35 @@ public class Commit { * @param store the store * @param op the operation */ - public void createOrUpdateNode(DocumentStore store, UpdateOp op) { + private void createOrUpdateNode(DocumentStore store, UpdateOp op) { + NodeDocument doc = store.createOrUpdate(NODES, op); + checkConflicts(op, doc); + checkSplitCandidate(doc); + } + + private void checkSplitCandidate(@Nullable NodeDocument doc) { + if (doc != null && doc.getMemory() > SPLIT_CANDIDATE_THRESHOLD) { + nodeStore.addSplitCandidate(doc.getId()); + } + } + + /** + * Checks if the update operation introduced any conflicts on the given + * document. The document shows the state right before the operation was + * applied. + * + * @param op the update operation. + * @param before how the document looked before the update was applied or + * {@code null} if it didn't exist before. + */ + private void checkConflicts(@Nonnull UpdateOp op, + @Nullable NodeDocument before) { + DocumentStore store = nodeStore.getDocumentStore(); collisions.clear(); - NodeDocument doc = store.createOrUpdate(Collection.NODES, op); if (baseRevision != null) { Revision newestRev = null; - if (doc != null) { - newestRev = doc.getNewestRevision(nodeStore, revision, + if (before != null) { + newestRev = before.getNewestRevision(nodeStore, revision, new CollisionHandler() { @Override void concurrentModification(Revision other) { @@ -363,19 +419,19 @@ public class Commit { String conflictMessage = null; if (newestRev == null) { if (op.isDelete() || !op.isNew()) { - conflictMessage = "The node " + + conflictMessage = "The node " + op.getId() + " does not exist or is already deleted"; } } else { if (op.isNew()) { - conflictMessage = "The node " + + conflictMessage = "The node " + op.getId() + " was already added in revision\n" + newestRev; } else if (nodeStore.isRevisionNewer(newestRev, baseRevision) - && (op.isDelete() || isConflicting(doc, op))) { - conflictMessage = "The node " + + && (op.isDelete() || isConflicting(before, op))) { + conflictMessage = "The node " + op.getId() + " was changed in revision\n" + newestRev + - ", which was applied after the base revision\n" + + ", which was applied after the base revision\n" + baseRevision; } } @@ -384,10 +440,10 @@ public class Commit { // -> check for collisions and conflict (concurrent updates // on a node are possible if property updates do not overlap) // TODO: unify above conflict detection and isConflicting() - if (!collisions.isEmpty() && isConflicting(doc, op)) { + if (!collisions.isEmpty() && isConflicting(before, op)) { for (Revision r : collisions) { // mark collisions on commit root - Collision c = new Collision(doc, r, op, revision, nodeStore); + Collision c = new Collision(before, r, op, revision, nodeStore); if (c.mark(store).equals(revision)) { // our revision was marked if (baseRevision.isBranch()) { @@ -405,16 +461,12 @@ public class Commit { } } if (conflictMessage != null) { - conflictMessage += ", before\n" + revision + - "; document:\n" + (doc == null ? "" : doc.format()) + + conflictMessage += ", before\n" + revision + + "; document:\n" + (before == null ? "" : before.format()) + ",\nrevision order:\n" + nodeStore.getRevisionComparator(); throw new MicroKernelException(conflictMessage); } } - - if (doc != null && doc.getMemory() > SPLIT_CANDIDATE_THRESHOLD) { - nodeStore.addSplitCandidate(doc.getId()); - } } /** Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java?rev=1567943&r1=1567942&r2=1567943&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java Thu Feb 13 15:03:00 2014 @@ -74,6 +74,10 @@ public final class UpdateOp { public boolean isNew() { return isNew; } + + public void setNew(boolean isNew) { + this.isNew = isNew; + } void setDelete(boolean isDelete) { this.isDelete = isDelete; Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConcurrentConflictTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConcurrentConflictTest.java?rev=1567943&r1=1567942&r2=1567943&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConcurrentConflictTest.java (original) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConcurrentConflictTest.java Thu Feb 13 15:03:00 2014 @@ -39,7 +39,8 @@ import org.slf4j.LoggerFactory; import static org.junit.Assert.assertEquals; /** - * ConcurrentConflictTest... + * Updates multiple nodes in the same commit with multiple threads and verifies + * the commit is atomic. */ public class ConcurrentConflictTest extends BaseDocumentMKTest { @@ -47,7 +48,7 @@ public class ConcurrentConflictTest exte private static final Logger LOG = LoggerFactory.getLogger(ConcurrentConflictTest.class); private static final int NUM_WRITERS = 3; private static final int NUM_NODES = 10; - private static final int NUM_TRANSFERS_PER_THREAD = 10; + private static final int NUM_TRANSFERS_PER_THREAD = 100; private DocumentStore store; private List kernels = new ArrayList(); private final StringBuilder logBuffer = new StringBuilder(); @@ -77,7 +78,6 @@ public class ConcurrentConflictTest exte concurrentUpdates(true); } - @Ignore @Test public void concurrentUpdates() throws Exception { concurrentUpdates(false); @@ -114,7 +114,7 @@ public class ConcurrentConflictTest exte } catch (Exception e) { exceptions.add(e); } - log("conflicts: " + conflictSet); + log("conflicts (" + conflictSet.cardinality() + "): " + conflictSet); } private boolean transfer() throws Exception {