jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mreut...@apache.org
Subject svn commit: r1479832 - /jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ConcurrentConflictTest.java
Date Tue, 07 May 2013 09:42:12 GMT
Author: mreutegg
Date: Tue May  7 09:42:12 2013
New Revision: 1479832

URL: http://svn.apache.org/r1479832
Log:
OAK-619 Lock-free MongoMK implementation
- Additional conflict test (currently fails and is ignored)

Added:
    jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ConcurrentConflictTest.java
  (with props)

Added: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ConcurrentConflictTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ConcurrentConflictTest.java?rev=1479832&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ConcurrentConflictTest.java
(added)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ConcurrentConflictTest.java
Tue May  7 09:42:12 2013
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.mongomk;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.jackrabbit.mk.api.MicroKernel;
+import org.apache.jackrabbit.mk.api.MicroKernelException;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * <code>ConcurrentConflictTest</code>...
+ */
+public class ConcurrentConflictTest extends BaseMongoMKTest {
+
+    private static final boolean USE_LOGGER = true;
+    private static final Logger log = LoggerFactory.getLogger(ConcurrentConflictTest.class);
+    private static final int NUM_WRITERS = 3;
+    private static final int NUM_NODES = 10;
+    private static final int NUM_TRANSFERS_PER_THREAD = 10;
+    private DocumentStore store;
+    private List<MongoMK> kernels = new ArrayList<MongoMK>();
+    private final StringBuilder logBuffer = new StringBuilder();
+
+    @Before
+    public void setup() {
+        logBuffer.setLength(0);
+        this.store = new MemoryDocumentStore();
+        MongoMK mk = openMongoMK();
+        for (int i = 0; i < NUM_NODES; i++) {
+            mk.commit("/", "+\"node-" + i + "\":{\"value\":100}", null, null);
+        }
+        for (int i = 0; i < NUM_WRITERS; i++) {
+            kernels.add(openMongoMK());
+        }
+    }
+
+    private MongoMK openMongoMK() {
+        return new MongoMK.Builder().setAsyncDelay(10).setDocumentStore(store).open();
+    }
+
+    @Ignore
+    @Test
+    public void concurrentUpdatesWithBranch() throws Exception {
+        concurrentUpdates(true);
+    }
+
+    @Ignore
+    @Test
+    public void concurrentUpdates() throws Exception {
+        concurrentUpdates(false);
+    }
+
+    private void concurrentUpdates(final boolean useBranch) throws Exception {
+        log.info("====== Start test =======");
+        final AtomicInteger conflicts = new AtomicInteger();
+        final List<Exception> exceptions = Collections.synchronizedList(
+                new ArrayList<Exception>());
+        List<Thread> writers = new ArrayList<Thread>();
+        for (final MicroKernel mk : kernels) {
+            writers.add(new Thread(new Runnable() {
+                Random random = new Random();
+                Map<Integer, JSONObject> nodes = new HashMap<Integer, JSONObject>();
+                @Override
+                public void run() {
+                    BitSet conflictSet = new BitSet();
+                    int numTransfers = NUM_TRANSFERS_PER_THREAD;
+                    try {
+                        while (numTransfers > 0) {
+                            try {
+                                if (!transfer()) {
+                                    continue;
+                                }
+                            } catch (MicroKernelException e) {
+                                log("Failed transfer @" + mk.getHeadRevision());
+                                // assume conflict
+                                conflicts.incrementAndGet();
+                                conflictSet.set(numTransfers);
+                            }
+                            numTransfers--;
+                        }
+                    } catch (Exception e) {
+                        exceptions.add(e);
+                    }
+                    log("conflicts: " + conflictSet);
+                }
+
+                private boolean transfer() throws Exception {
+                    // read 3 random nodes and re-distribute values
+                    nodes.clear();
+                    while (nodes.size() < 3) {
+                        nodes.put(random.nextInt(NUM_NODES), null);
+                    }
+                    String rev;
+                    if (useBranch) {
+                        rev = mk.branch(null);
+                    } else {
+                        rev = mk.getHeadRevision();
+                    }
+                    int sum = 0;
+                    for (Map.Entry<Integer, JSONObject> entry : nodes.entrySet()) {
+                        String json = mk.getNodes("/node-" + entry.getKey(), rev, 0, 0, 1000,
null);
+                        JSONParser parser = new JSONParser();
+                        JSONObject obj = (JSONObject) parser.parse(json);
+                        entry.setValue(obj);
+                        sum += (Long) obj.get("value");
+                    }
+                    if (sum < 60) {
+                        // retry with other nodes
+                        return false;
+                    }
+                    StringBuilder jsop = new StringBuilder();
+                    boolean withdrawn = false;
+                    for (Map.Entry<Integer, JSONObject> entry : nodes.entrySet()) {
+                        long value = (Long) entry.getValue().get("value");
+                        jsop.append("^\"/node-").append(entry.getKey());
+                        jsop.append("/value\":");
+                        if (value >= 20 && ! withdrawn) {
+                            jsop.append(value - 20);
+                            withdrawn = true;
+                        } else {
+                            jsop.append(value + 10);
+                        }
+                    }
+                    String oldRev = rev;
+                    rev = mk.commit("", jsop.toString(), rev, null);
+                    if (useBranch) {
+                        rev = mk.merge(rev, null);
+                    }
+                    log("Successful transfer @" + oldRev + ": " + jsop.toString() + " (new
rev: " + rev + ")");
+                    return true;
+                }
+            }));
+        }
+        for (Thread t : writers) {
+            t.start();
+        }
+        for (Thread t : writers) {
+            t.join();
+        }
+        // dispose will flush all pending revisions
+        for (MongoMK mk : kernels) {
+            mk.dispose();
+        }
+        MongoMK mk = openMongoMK();
+        String rev = mk.getHeadRevision();
+        long sum = 0;
+        for (int i = 0; i < NUM_NODES; i++) {
+            String json = mk.getNodes("/node-" + i, rev, 0, 0, 1000, null);
+            JSONParser parser = new JSONParser();
+            JSONObject obj = (JSONObject) parser.parse(json);
+            sum += (Long) obj.get("value");
+        }
+        log("Conflict rate: " + conflicts.get() +
+                "/" + (NUM_WRITERS * NUM_TRANSFERS_PER_THREAD));
+        System.out.print(logBuffer);
+        assertEquals(NUM_NODES * 100, sum);
+        if (!exceptions.isEmpty()) {
+            throw exceptions.get(0);
+        }
+    }
+
+    private void log(String msg) {
+        if (USE_LOGGER) {
+            log.info(msg);
+        } else {
+            synchronized (logBuffer) {
+                logBuffer.append(msg).append("\n");
+            }
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ConcurrentConflictTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/ConcurrentConflictTest.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL



Mime
View raw message