jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mreut...@apache.org
Subject svn commit: r1567943 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/ test/java/org/apache/jackrabbit/oak/plugins/document/
Date Thu, 13 Feb 2014 15:03:01 GMT
Author: mreutegg
Date: Thu Feb 13 15:03:00 2014
New Revision: 1567943

URL: http://svn.apache.org/r1567943
Log:
OAK-1420: ConcurrentAddIT fails on buildbot

Potential fix. With these changes ConcurrentConflictTest.concurrentUpdates() now runs successful
-> enabled.

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConcurrentConflictTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java?rev=1567943&r1=1567942&r2=1567943&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
Thu Feb 13 15:03:00 2014
@@ -36,6 +36,8 @@ import org.apache.jackrabbit.oak.commons
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
+import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.COLLISIONS;
 import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.SPLIT_CANDIDATE_THRESHOLD;
 
 /**
@@ -243,7 +245,7 @@ public class Commit {
         try {
             if (newNodes.size() > 0) {
                 // set commit root on new nodes
-                if (!store.create(Collection.NODES, newNodes)) {
+                if (!store.create(NODES, newNodes)) {
                     // some of the documents already exist:
                     // try to apply all changes one by one
                     for (UpdateOp op : newNodes) {
@@ -276,14 +278,41 @@ public class Commit {
             if (changedNodes.size() > 0 || !commitRoot.isNew()) {
                 NodeDocument.setRevision(commitRoot, revision, commitValue);
                 opLog.add(commitRoot);
-                createOrUpdateNode(store, commitRoot);
+                if (baseBranchRevision == null) {
+                    // create a clone of the commitRoot in order
+                    // to set isNew to false. If we get here the
+                    // commitRoot document already exists and
+                    // only needs an update
+                    UpdateOp commit = commitRoot.clone(commitRoot.getId());
+                    commit.setNew(false);
+                    // only set revision on commit root when there is
+                    // no collision for this commit revision
+                    commit.containsMapEntry(COLLISIONS, revision, false);
+                    NodeDocument before = store.findAndUpdate(NODES, commit);
+                    if (before == null) {
+                        String msg = "Conflicting concurrent change. " +
+                                "Update operation failed: " + commitRoot;
+                        throw new MicroKernelException(msg);
+                    } else {
+                        // if we get here the commit was successful and
+                        // the commit revision is set on the commitRoot
+                        // document for this commit.
+                        // now check for conflicts/collisions by other commits.
+                        // use original commitRoot operation with
+                        // correct isNew flag.
+                        checkConflicts(commitRoot, before);
+                        checkSplitCandidate(before);
+                    }
+                } else {
+                    // this is a branch commit, do not fail on collisions now
+                    // trying to merge the branch will fail later
+                    createOrUpdateNode(store, commitRoot);
+                }
                 operations.put(commitRootPath, commitRoot);
             }
         } catch (MicroKernelException e) {
-            rollback(newNodes, opLog);
-            String msg = "Exception committing " + diff.toString();
-            LOG.debug(msg, e);
-            throw new MicroKernelException(msg, e);
+            rollback(newNodes, opLog, commitRoot);
+            throw e;
         }
     }
 
@@ -309,14 +338,14 @@ public class Commit {
                 if (op.isNew()) {
                     NodeDocument.setChildrenFlag(op, true);
                 } else {
-                    NodeDocument nd = store.getIfCached(Collection.NODES, Utils.getIdFromPath(parentPath));
+                    NodeDocument nd = store.getIfCached(NODES, Utils.getIdFromPath(parentPath));
                     if (nd != null && nd.hasChildren()) {
                         continue;
                     }
                     NodeDocument.setChildrenFlag(op, true);
                 }
             } else {
-                NodeDocument nd = store.getIfCached(Collection.NODES, Utils.getIdFromPath(parentPath));
+                NodeDocument nd = store.getIfCached(NODES, Utils.getIdFromPath(parentPath));
                 if (nd != null && nd.hasChildren()) {
                     //Flag already set to true. Nothing to do
                     continue;
@@ -328,15 +357,20 @@ public class Commit {
         }
     }
     
-    private void rollback(ArrayList<UpdateOp> newDocuments, ArrayList<UpdateOp>
changed) {
+    private void rollback(List<UpdateOp> newDocuments,
+                          List<UpdateOp> changed,
+                          UpdateOp commitRoot) {
         DocumentStore store = nodeStore.getDocumentStore();
         for (UpdateOp op : changed) {
             UpdateOp reverse = op.getReverseOperation();
-            store.createOrUpdate(Collection.NODES, reverse);
+            store.createOrUpdate(NODES, reverse);
         }
         for (UpdateOp op : newDocuments) {
-            store.remove(Collection.NODES, op.id);
+            store.remove(NODES, op.id);
         }
+        UpdateOp removeCollision = new UpdateOp(commitRoot.getId(), false);
+        NodeDocument.removeCollision(removeCollision, revision);
+        store.createOrUpdate(NODES, removeCollision);
     }
 
     /**
@@ -346,13 +380,35 @@ public class Commit {
      * @param store the store
      * @param op the operation
      */
-    public void createOrUpdateNode(DocumentStore store, UpdateOp op) {
+    private void createOrUpdateNode(DocumentStore store, UpdateOp op) {
+        NodeDocument doc = store.createOrUpdate(NODES, op);
+        checkConflicts(op, doc);
+        checkSplitCandidate(doc);
+    }
+
+    private void checkSplitCandidate(@Nullable NodeDocument doc) {
+        if (doc != null && doc.getMemory() > SPLIT_CANDIDATE_THRESHOLD) {
+            nodeStore.addSplitCandidate(doc.getId());
+        }
+    }
+
+    /**
+     * Checks if the update operation introduced any conflicts on the given
+     * document. The document shows the state right before the operation was
+     * applied.
+     *
+     * @param op the update operation.
+     * @param before how the document looked before the update was applied or
+     *               {@code null} if it didn't exist before.
+     */
+    private void checkConflicts(@Nonnull UpdateOp op,
+                                @Nullable NodeDocument before) {
+        DocumentStore store = nodeStore.getDocumentStore();
         collisions.clear();
-        NodeDocument doc = store.createOrUpdate(Collection.NODES, op);
         if (baseRevision != null) {
             Revision newestRev = null;
-            if (doc != null) {
-                newestRev = doc.getNewestRevision(nodeStore, revision,
+            if (before != null) {
+                newestRev = before.getNewestRevision(nodeStore, revision,
                         new CollisionHandler() {
                             @Override
                             void concurrentModification(Revision other) {
@@ -363,19 +419,19 @@ public class Commit {
             String conflictMessage = null;
             if (newestRev == null) {
                 if (op.isDelete() || !op.isNew()) {
-                    conflictMessage = "The node " + 
+                    conflictMessage = "The node " +
                             op.getId() + " does not exist or is already deleted";
                 }
             } else {
                 if (op.isNew()) {
-                    conflictMessage = "The node " + 
+                    conflictMessage = "The node " +
                             op.getId() + " was already added in revision\n" +
                             newestRev;
                 } else if (nodeStore.isRevisionNewer(newestRev, baseRevision)
-                        && (op.isDelete() || isConflicting(doc, op))) {
-                    conflictMessage = "The node " + 
+                        && (op.isDelete() || isConflicting(before, op))) {
+                    conflictMessage = "The node " +
                             op.getId() + " was changed in revision\n" + newestRev +
-                            ", which was applied after the base revision\n" + 
+                            ", which was applied after the base revision\n" +
                             baseRevision;
                 }
             }
@@ -384,10 +440,10 @@ public class Commit {
                 // -> check for collisions and conflict (concurrent updates
                 // on a node are possible if property updates do not overlap)
                 // TODO: unify above conflict detection and isConflicting()
-                if (!collisions.isEmpty() && isConflicting(doc, op)) {
+                if (!collisions.isEmpty() && isConflicting(before, op)) {
                     for (Revision r : collisions) {
                         // mark collisions on commit root
-                        Collision c = new Collision(doc, r, op, revision, nodeStore);
+                        Collision c = new Collision(before, r, op, revision, nodeStore);
                         if (c.mark(store).equals(revision)) {
                             // our revision was marked
                             if (baseRevision.isBranch()) {
@@ -405,16 +461,12 @@ public class Commit {
                 }
             }
             if (conflictMessage != null) {
-                conflictMessage += ", before\n" + revision + 
-                        "; document:\n" + (doc == null ? "" : doc.format()) +
+                conflictMessage += ", before\n" + revision +
+                        "; document:\n" + (before == null ? "" : before.format()) +
                         ",\nrevision order:\n" + nodeStore.getRevisionComparator();
                 throw new MicroKernelException(conflictMessage);
             }
         }
-
-        if (doc != null && doc.getMemory() > SPLIT_CANDIDATE_THRESHOLD) {
-            nodeStore.addSplitCandidate(doc.getId());
-        }
     }
 
     /**

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java?rev=1567943&r1=1567942&r2=1567943&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java
Thu Feb 13 15:03:00 2014
@@ -74,6 +74,10 @@ public final class UpdateOp {
     public boolean isNew() {
         return isNew;
     }
+
+    public void setNew(boolean isNew) {
+        this.isNew = isNew;
+    }
     
     void setDelete(boolean isDelete) {
         this.isDelete = isDelete;

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConcurrentConflictTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConcurrentConflictTest.java?rev=1567943&r1=1567942&r2=1567943&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConcurrentConflictTest.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConcurrentConflictTest.java
Thu Feb 13 15:03:00 2014
@@ -39,7 +39,8 @@ import org.slf4j.LoggerFactory;
 import static org.junit.Assert.assertEquals;
 
 /**
- * <code>ConcurrentConflictTest</code>...
+ * Updates multiple nodes in the same commit with multiple threads and verifies
+ * the commit is atomic.
  */
 public class ConcurrentConflictTest extends BaseDocumentMKTest {
 
@@ -47,7 +48,7 @@ public class ConcurrentConflictTest exte
     private static final Logger LOG = LoggerFactory.getLogger(ConcurrentConflictTest.class);
     private static final int NUM_WRITERS = 3;
     private static final int NUM_NODES = 10;
-    private static final int NUM_TRANSFERS_PER_THREAD = 10;
+    private static final int NUM_TRANSFERS_PER_THREAD = 100;
     private DocumentStore store;
     private List<DocumentMK> kernels = new ArrayList<DocumentMK>();
     private final StringBuilder logBuffer = new StringBuilder();
@@ -77,7 +78,6 @@ public class ConcurrentConflictTest exte
         concurrentUpdates(true);
     }
 
-    @Ignore
     @Test
     public void concurrentUpdates() throws Exception {
         concurrentUpdates(false);
@@ -114,7 +114,7 @@ public class ConcurrentConflictTest exte
                     } catch (Exception e) {
                         exceptions.add(e);
                     }
-                    log("conflicts: " + conflictSet);
+                    log("conflicts (" + conflictSet.cardinality() + "): " + conflictSet);
                 }
 
                 private boolean transfer() throws Exception {



Mime
View raw message