jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From thom...@apache.org
Subject svn commit: r1464576 - in /jackrabbit/oak/trunk/oak-mongomk/src: main/java/org/apache/jackrabbit/mongomk/osgi/ main/java/org/apache/jackrabbit/mongomk/prototype/ test/java/org/apache/jackrabbit/mongomk/prototype/
Date Thu, 04 Apr 2013 14:48:46 GMT
Author: thomasm
Date: Thu Apr  4 14:48:46 2013
New Revision: 1464576

URL: http://svn.apache.org/r1464576
Log:
OAK-746 Builder for the MongoMK

Modified:
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/osgi/MongoMicroKernelService.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/DocumentStore.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MemoryDocumentStore.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Revision.java
    jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BaseMongoMKTest.java
    jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BlobTest.java
    jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java
    jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedClusterTest.java
    jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedTest.java
    jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/osgi/MongoMicroKernelService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/osgi/MongoMicroKernelService.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/osgi/MongoMicroKernelService.java
(original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/osgi/MongoMicroKernelService.java
Thu Apr  4 14:48:46 2013
@@ -65,7 +65,8 @@ public class MongoMicroKernelService {
     private MongoMK mk;
 
     @Activate
-    private void activate(BundleContext context,Map<String,?> config) throws Exception
{
+    private void activate(BundleContext context, Map<String, ?> config)
+            throws Exception {
         String host = PropertiesUtil.toString(config.get(PROP_HOST), DEFAULT_HOST);
         int port = PropertiesUtil.toInteger(config.get(PROP_PORT), DEFAULT_PORT);
         String db = PropertiesUtil.toString(config.get(PROP_DB), DEFAULT_DB);
@@ -78,11 +79,11 @@ public class MongoMicroKernelService {
 
         logger.info("Connected to database {}", mongoDB);
 
-        mk = new MongoMK(mongoDB, 0);
+        mk = new MongoMK.Builder().setMongoDB(mongoDB).open();
 
         Properties props = new Properties();
-        props.setProperty("oak.mk.type","mongo");
-        reg = context.registerService(MicroKernel.class.getName(),mk,props);
+        props.setProperty("oak.mk.type", "mongo");
+        reg = context.registerService(MicroKernel.class.getName(), mk, props);
     }
 
     @Deactivate

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java
(original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java
Thu Apr  4 14:48:46 2013
@@ -90,6 +90,11 @@ public class Commit {
         diff.newline();
     }
     
+    public void touchNode(String path) {
+        UpdateOp op = getUpdateOperationForNode(path);
+        op.setMapEntry(UpdateOp.LAST_REV + "." + revision.getClusterId(), revision.toString());
       
+    }
+    
     void updateProperty(String path, String propertyName, String value) {
         UpdateOp op = getUpdateOperationForNode(path);
         String key = Utils.escapePropertyName(propertyName);

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/DocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/DocumentStore.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/DocumentStore.java
(original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/DocumentStore.java
Thu Apr  4 14:48:46 2013
@@ -35,15 +35,32 @@ public interface DocumentStore {
     enum Collection { NODES }
 
     /**
-     * Get a document. The returned map is a clone (the caller
-     * can modify it without affecting the stored version).
-     *
+     * Get a document.
+     * <p>
+     * The returned map is a clone (the caller can modify it without affecting
+     * the stored version).
+     * 
      * @param collection the collection
      * @param key the key
      * @return the map, or null if not found
      */
     @CheckForNull
     Map<String, Object> find(Collection collection, String key);
+    
+    /**
+     * Get a document, ignoring the cache if the cached entry is older than the
+     * specified time.
+     * <p>
+     * The returned map is a clone (the caller can modify it without affecting
+     * the stored version).
+     * 
+     * @param collection the collection
+     * @param key the key
+     * @param maxCacheAge the maximum age of the cached document
+     * @return the map, or null if not found
+     */
+    @CheckForNull
+    Map<String, Object> find(Collection collection, String key, int maxCacheAge);
 
     @Nonnull
     List<Map<String, Object>> query(Collection collection, String fromKey, String
toKey, int limit);
@@ -77,7 +94,9 @@ public interface DocumentStore {
     @Nonnull
     Map<String, Object> createOrUpdate(Collection collection, UpdateOp update)
             throws MicroKernelException;
-    
+
+    void invalidateCache();
+
     void dispose();
 
 }

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MemoryDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MemoryDocumentStore.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MemoryDocumentStore.java
(original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MemoryDocumentStore.java
Thu Apr  4 14:48:46 2013
@@ -48,9 +48,13 @@ public class MemoryDocumentStore impleme
     private ConcurrentSkipListMap<String, Map<String, Object>> nodes =
             new ConcurrentSkipListMap<String, Map<String, Object>>();
 
-    public Map<String, Object> find(Collection collection, String path) {
+    public Map<String, Object> find(Collection collection, String key, int maxCacheAge)
{
+        return find(collection, key);
+    }
+    
+    public Map<String, Object> find(Collection collection, String key) {
         ConcurrentSkipListMap<String, Map<String, Object>> map = getMap(collection);
-        Map<String, Object> n = map.get(path);
+        Map<String, Object> n = map.get(key);
         if (n == null) {
             return null;
         }
@@ -219,6 +223,11 @@ public class MemoryDocumentStore impleme
     }
 
     @Override
+    public void invalidateCache() {
+        // there is no cache, so nothing to invalidate
+    }
+
+    @Override
     public void dispose() {
         // ignore
     }

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java
(original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java
Thu Apr  4 14:48:46 2013
@@ -18,7 +18,6 @@ package org.apache.jackrabbit.mongomk.pr
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -27,13 +26,13 @@ import java.util.concurrent.ExecutionExc
 
 import javax.annotation.Nonnull;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import org.apache.jackrabbit.mk.api.MicroKernelException;
 import org.apache.jackrabbit.mongomk.prototype.UpdateOp.Operation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.mongodb.BasicDBObject;
 import com.mongodb.DB;
 import com.mongodb.DBCollection;
@@ -49,12 +48,6 @@ import com.mongodb.WriteResult;
  */
 public class MongoDocumentStore implements DocumentStore {
 
-    /**
-     * Marker instance to be used as a value in cache to indicate that no value exist for
given key as Guava
-     * cache does not allow null values
-     */
-    static final Map<String, Object> NULL_VAL = Collections.emptyMap();
-
     private static final Logger LOG = LoggerFactory.getLogger(MongoDocumentStore.class);
 
     private static final boolean LOG_TIME = false;
@@ -63,7 +56,7 @@ public class MongoDocumentStore implemen
     
     private long time;
     
-    private final Cache<String, Map<String, Object>> cache;
+    private final Cache<String, CachedDocument> cache;
 
     public MongoDocumentStore(DB db) {
         nodesCollection = db.getCollection(Collection.NODES.toString());
@@ -99,26 +92,44 @@ public class MongoDocumentStore implemen
         // oak-jcr doesn't call dispose()
         dispose();
     }
+    
+    @Override
+    public void invalidateCache() {
+        cache.invalidateAll();
+    }
 
+    public Map<String, Object> find(Collection collection, String path) {
+        return find(collection, path, Integer.MAX_VALUE);
+    }
+    
     @Override
-    public Map<String, Object> find(final Collection collection, final String path)
{
+    public Map<String, Object> find(final Collection collection, final String path,
int maxCacheAge) {
         try {
-            Map<String, Object> returnVal = cache.get(path, new Callable<Map<String,
Object>>() {
-                @Override
-                public Map<String, Object> call() throws Exception {
-                    Map<String, Object> result;
-                    result = loadDocument(collection, path);
-                    // support caching of null entries
-                    return result == null ? NULL_VAL : result;
+            CachedDocument doc;
+            while (true) {
+                doc = cache.get(path, new Callable<CachedDocument>() {
+                    @Override
+                    public CachedDocument call() throws Exception {
+                        Map<String, Object> map = findUncached(collection, path);
+                        return new CachedDocument(map);
+                    }
+                });
+                if (maxCacheAge == Integer.MAX_VALUE) {
+                    break;
                 }
-            });
-            return returnVal == NULL_VAL ?  null : returnVal;
+                if (System.currentTimeMillis() - doc.time < maxCacheAge) {
+                    break;
+                }
+                // too old: invalidate, try again
+                cache.invalidate(path);
+            }
+            return doc.value;
         } catch (ExecutionException e) {
             throw new IllegalStateException("Failed to load node " + path, e);
         }
     }
     
-    protected Map<String, Object> loadDocument(Collection collection, String path)
{
+    public Map<String, Object> findUncached(Collection collection, String path) {
         DBCollection dbCollection = getDBCollection(collection);
         long start = start();
         try {
@@ -150,7 +161,7 @@ public class MongoDocumentStore implemen
                 DBObject o = cursor.next();
                 Map<String, Object> map = convertFromDBObject(o);
                 String key = (String) map.get(UpdateOp.ID);
-                cache.put(key, map);
+                cache.put(key, new CachedDocument(map));
                 list.add(map);
             }
             return list;
@@ -247,7 +258,7 @@ public class MongoDocumentStore implemen
             Utils.deepCopyMap(map, newMap);
             String key = updateOp.getKey();
             MemoryDocumentStore.applyChanges(newMap, updateOp);
-            cache.put(key, newMap);
+            cache.put(key, new CachedDocument(newMap));
             
             log("createOrUpdate returns ", map);
             return map;
@@ -306,7 +317,7 @@ public class MongoDocumentStore implemen
                 }
                 for (Map<String, Object> map : maps) {
                     String id = (String) map.get(UpdateOp.ID);
-                    cache.put(id, map);
+                    cache.put(id, new CachedDocument(map));
                 }
                 return true;
             } catch (MongoException e) {
@@ -364,5 +375,16 @@ public class MongoDocumentStore implemen
             LOG.debug(message + argList);
         }
     }
+    
+    /**
+     * A cache entry.
+     */
+    static class CachedDocument {
+        final long time = System.currentTimeMillis();
+        final Map<String, Object> value;
+        CachedDocument(Map<String, Object> value) {
+            this.value = value;
+        }
+    }
 
 }
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
(original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
Thu Apr  4 14:48:46 2013
@@ -19,14 +19,17 @@ package org.apache.jackrabbit.mongomk.pr
 import java.io.InputStream;
 import java.lang.ref.WeakReference;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -81,7 +84,7 @@ public class MongoMK implements MicroKer
      * cache update).
      */
     // TODO test observation with multiple Oak instances
-    protected static final long ASYNC_DELAY = 1000;
+    protected int asyncDelay = 1000;
 
     /**
      * Whether this instance is disposed.
@@ -106,22 +109,32 @@ public class MongoMK implements MicroKer
     /**
      * The node cache.
      *
-     * Key: path@rev
-     * Value: node
+     * Key: path@rev, value: node
      */
     private final Cache<String, Node> nodeCache;
 
     /**
      * Child node cache.
+     * 
+     * Key: path@rev, value: children
      */
     private final Cache<String, Node.Children> nodeChildrenCache;
 
     /**
      * The unsaved last revisions.
+     * 
      * Key: path, value: revision.
      */
     private final Map<String, Revision> unsavedLastRevisions = 
-            new HashMap<String, Revision>();
+            new ConcurrentHashMap<String, Revision>();
+    
+    /**
+     * The last known revision for each cluster node.
+     * 
+     * Key: the machine id, value: revision.
+     */
+    private final Map<Integer, Revision> lastKnownRevision =
+            new ConcurrentHashMap<Integer, Revision>();
     
     /**
      * The last known head revision. This is the last-known revision.
@@ -130,7 +143,7 @@ public class MongoMK implements MicroKer
     
     private Thread backgroundThread;
     
-    private int simpleRevisionCounter;
+    private AtomicInteger simpleRevisionCounter;
 
     /**
      * Maps branch commit revision to revision it is based on
@@ -142,7 +155,7 @@ public class MongoMK implements MicroKer
      * Create a new in-memory MongoMK used for testing.
      */
     public MongoMK() {
-        this(new MemoryDocumentStore(), new MemoryBlobStore(), 0);
+        this(new Builder());
     }
     
     /**
@@ -152,9 +165,7 @@ public class MongoMK implements MicroKer
      * @param clusterId the cluster id (must be unique)
      */
     public MongoMK(DB db, int clusterId) {
-        this(db == null ? new MemoryDocumentStore() : new MongoDocumentStore(db),
-                db == null ? new MemoryBlobStore() : new MongoBlobStore(db), 
-                clusterId);
+        this(new Builder().setMongoDB(db).setClusterId(clusterId));
     }
 
     /**
@@ -165,9 +176,14 @@ public class MongoMK implements MicroKer
      * @param clusterId the cluster id (must be unique)
      */
     public MongoMK(DocumentStore store, BlobStore blobStore, int clusterId) {
-        this.store = store;
-        this.blobStore = blobStore;
-        this.clusterId = clusterId;
+        this(new Builder().setDocumentStore(store).setBlobStore(blobStore).setClusterId(clusterId));
+    }
+    
+    MongoMK(Builder builder) {
+        this.store = builder.getDocumentStore();
+        this.blobStore = builder.getBlobStore();
+        this.clusterId = builder.getClusterId();
+        this.asyncDelay = builder.getAsyncDelay();
 
         //TODO Use size based weigher
         nodeCache = CacheBuilder.newBuilder()
@@ -177,17 +193,17 @@ public class MongoMK implements MicroKer
         nodeChildrenCache =  CacheBuilder.newBuilder()
                         .maximumSize(CACHE_CHILDREN)
                         .build();
-
+        
+        init();
+    }
+    
+    void init() {
         backgroundThread = new Thread(
                 new BackgroundOperation(this, isDisposed),
                 "MongoMK background thread");
         backgroundThread.setDaemon(true);
         backgroundThread.start();
-            
-        init();
-    }
-    
-    void init() {
+
         headRevision = newRevision();
         Node n = readNode("/", headRevision);
         if (n == null) {
@@ -221,7 +237,7 @@ public class MongoMK implements MicroKer
      * for testing.
      */
     void useSimpleRevisions() {
-        this.simpleRevisionCounter = 1;
+        this.simpleRevisionCounter = new AtomicInteger(1);
         init();
     }
     
@@ -231,14 +247,90 @@ public class MongoMK implements MicroKer
      * @return the revision
      */
     Revision newRevision() {
-        if (simpleRevisionCounter > 0) {
-            return new Revision(simpleRevisionCounter++, 0, clusterId);
+        if (simpleRevisionCounter != null) {
+            return new Revision(simpleRevisionCounter.getAndIncrement(), 0, clusterId);
         }
         return Revision.newRevision(clusterId);
     }
     
     void runBackgroundOperations() {
-        // to be implemented
+        if (isDisposed.get()) {
+            return;
+        }
+        if (simpleRevisionCounter != null) {
+            // only when using timestamp
+            return;
+        }
+        try {
+            // backgroundWrite();
+            // backgroundRead();
+        } catch (RuntimeException e) {
+            if (isDisposed.get()) {
+                return;
+            }
+            LOG.warn("Background operation failed: " + e.toString(), e);
+        }
+    }
+    
+    private void backgroundRead() {
+        String id = Utils.getIdFromPath("/");
+        Map<String, Object> map = store.find(DocumentStore.Collection.NODES, id, asyncDelay);
+        @SuppressWarnings("unchecked")
+        Map<String, String> lastRevMap = (Map<String, String>) map.get(UpdateOp.LAST_REV);
+        
+        for (Entry<String, String> e : lastRevMap.entrySet()) {
+            int machineId = Integer.parseInt(e.getKey());
+            if (machineId == clusterId) {
+                continue;
+            }
+            Revision r = Revision.fromString(e.getValue());
+            Revision last = lastKnownRevision.get(machineId);
+            
+            if (last == null || last.compareRevisionTime(r) != 0) {
+                // TODO invalidating the whole cache is not really needed,
+                // instead only those children that are cached could be checked
+                
+                store.invalidateCache();
+                lastKnownRevision.put(machineId, r);
+                // add a new revision, so that changes are visible
+                headRevision = Revision.newRevision(clusterId);
+            }
+        }
+    }
+    
+    private void backgroundWrite() {
+        if (unsavedLastRevisions.size() == 0) {
+            return;
+        }
+        ArrayList<String> paths = new ArrayList<String>(unsavedLastRevisions.keySet());
+        // sort by depth (high depth first), then path
+        Collections.sort(paths, new Comparator<String>() {
+
+            @Override
+            public int compare(String o1, String o2) {
+                int d1 = Utils.pathDepth(o1);
+                int d2 = Utils.pathDepth(o1);
+                if (d1 != d2) {
+                    return Integer.signum(d1 - d2);
+                }
+                return o1.compareTo(o2);
+            }
+            
+        });
+        long now = Revision.getCurrentTimestamp();
+        for (String p : paths) {
+            Revision r = unsavedLastRevisions.get(p);
+            if (r == null) {
+                continue;
+            }
+            if (Revision.getTimestampDifference(now, r.getTimestamp()) < asyncDelay) {
+                continue;
+            }
+            
+            Commit commit = new Commit(this, null, r);
+            commit.touchNode(p);
+            commit.apply();
+        }
     }
     
     public void dispose() {
@@ -377,7 +469,8 @@ public class MongoMK implements MicroKer
             return false;
         }
         // get root of commit
-        nodeMap = store.find(DocumentStore.Collection.NODES, Utils.getIdFromPath(commitRootPath));
+        nodeMap = store.find(DocumentStore.Collection.NODES, 
+                Utils.getIdFromPath(commitRootPath));
         if (nodeMap == null) {
             return false;
         }
@@ -1075,21 +1168,68 @@ public class MongoMK implements MicroKer
         return store;
     }
     
+    public void setAsyncDelay(int delay) {
+        this.asyncDelay = delay;
+    }
+    
+    public int getAsyncDelay() {
+        return asyncDelay;
+    }
+
+    /**
+     * Apply the changes of a node to the cache.
+     * 
+     * @param rev the revision
+     * @param path the path
+     * @param isNew whether this is a new node
+     * @param isDelete whether the node is deleted
+     * @param isWritten whether the MongoDB documented was added / updated
+     * @param added the list of added child nodes
+     * @param removed the list of removed child nodes
+     */
+    public void applyChanges(Revision rev, String path, 
+            boolean isNew, boolean isDelete, boolean isWritten, 
+            ArrayList<String> added, ArrayList<String> removed) {
+        if (!isWritten) {
+            unsavedLastRevisions.put(path, rev);
+        } else {
+            // the document was updated:
+            // we no longer need to update it in a background process
+            unsavedLastRevisions.remove(path);
+        }
+        Children c = nodeChildrenCache.getIfPresent(path + "@" + rev);
+        if (isNew || (!isDelete && c != null)) {
+            String key = path + "@" + rev;
+            Children c2 = new Children(path, rev);
+            TreeSet<String> set = new TreeSet<String>();
+            if (c != null) {
+                set.addAll(c.children);
+            }
+            set.removeAll(removed);
+            set.addAll(added);
+            c2.children.addAll(set);
+            nodeChildrenCache.put(key, c2);
+        }
+    }
+    
     /**
      * A background thread.
      */
     static class BackgroundOperation implements Runnable {
         final WeakReference<MongoMK> ref;
         private final AtomicBoolean isDisposed;
+        private int delay;
+        
         BackgroundOperation(MongoMK mk, AtomicBoolean isDisposed) {
             ref = new WeakReference<MongoMK>(mk);
+            delay = mk.getAsyncDelay();
             this.isDisposed = isDisposed;
         }
         public void run() {
             while (!isDisposed.get()) {
                 synchronized (isDisposed) {
                     try {
-                        isDisposed.wait(ASYNC_DELAY);
+                        isDisposed.wait(delay);
                     } catch (InterruptedException e) {
                         // ignore
                     }
@@ -1097,32 +1237,111 @@ public class MongoMK implements MicroKer
                 MongoMK mk = ref.get();
                 if (mk != null) {
                     mk.runBackgroundOperations();
+                    delay = mk.getAsyncDelay();
                 }
             }
         }
     }
 
-    public void applyChanges(Revision rev, String path, 
-            boolean isNew, boolean isDelete, boolean isWritten, 
-            ArrayList<String> added, ArrayList<String> removed) {
-        if (!isWritten) {
-            unsavedLastRevisions.put(path, rev);
-        } else {
-            unsavedLastRevisions.remove(path);
-        }
-        Children c = nodeChildrenCache.getIfPresent(path + "@" + rev);
-        if (isNew || (!isDelete && c != null)) {
-            String key = path + "@" + rev;
-            Children c2 = new Children(path, rev);
-            TreeSet<String> set = new TreeSet<String>();
-            if (c != null) {
-                set.addAll(c.children);
-            }
-            set.removeAll(removed);
-            set.addAll(added);
-            c2.children.addAll(set);
-            nodeChildrenCache.put(key, c2);
+    /**
+     * A builder for a MongoMK instance.
+     */
+    public static class Builder {
+        
+        private DocumentStore documentStore;
+        private BlobStore blobStore;
+        private int clusterId;
+        private int asyncDelay = 1000;
+
+        /**
+         * Set the MongoDB connection to use. By default an in-memory store is used.
+         * 
+         * @param db the MongoDB connection
+         * @return this
+         */
+        public Builder setMongoDB(DB db) {
+            if (db != null) {
+                this.documentStore = new MongoDocumentStore(db);
+                this.blobStore = new MongoBlobStore(db);
+            }
+            return this;
+        }
+        
+        /**
+         * Set the document store to use. By default an in-memory store is used.
+         * 
+         * @param documentStore the document store
+         * @return this
+         */
+        public Builder setDocumentStore(DocumentStore documentStore) {
+            this.documentStore = documentStore;
+            return this;
+        }
+        
+        public DocumentStore getDocumentStore() {
+            if (documentStore == null) {
+                documentStore = new MemoryDocumentStore();
+            }
+            return documentStore;
+        }
+
+        /**
+         * Set the blob store to use. By default an in-memory store is used.
+         * 
+         * @param blobStore the blob store
+         * @return this
+         */
+        public Builder setBlobStore(BlobStore blobStore) {
+            this.blobStore = blobStore;
+            return this;
+        }
+
+        public BlobStore getBlobStore() {
+            if (blobStore == null) {
+                blobStore = new MemoryBlobStore();
+            }
+            return blobStore;
+        }
+
+        /**
+         * Set the cluster id to use. By default, 0 is used.
+         * 
+         * @param clusterId the cluster id
+         * @return this
+         */
+        public Builder setClusterId(int clusterId) {
+            this.clusterId = clusterId;
+            return this;
+        }
+        
+        public int getClusterId() {
+            return clusterId;
+        }
+        
+        /**
+         * Set the maximum delay to write the last revision to the root node. By
+         * default 1000 (meaning 1 second) is used.
+         * 
+         * @param asyncDelay in milliseconds
+         * @return this
+         */
+        public Builder setAsyncDelay(int asyncDelay) {
+            this.asyncDelay = asyncDelay;
+            return this;
+        }
+        
+        public int getAsyncDelay() {
+            return asyncDelay;
+        }
+        
+        /**
+         * Open the MongoMK instance using the configured options.
+         * 
+         * @return the MongoMK instance
+         */
+        public MongoMK open() {
+            return new MongoMK(this);
         }
     }
-    
+
 }

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Revision.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Revision.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Revision.java
(original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Revision.java
Thu Apr  4 14:48:46 2013
@@ -68,7 +68,7 @@ public class Revision {
      * @return the unique revision id
      */
     static Revision newRevision(int clusterId) {
-        long timestamp = System.currentTimeMillis() / 100 - timestampOffset;
+        long timestamp = getCurrentTimestamp();
         int c;
         synchronized (Revision.class) {
             if (timestamp > lastTimestamp) {
@@ -89,6 +89,26 @@ public class Revision {
         return new Revision(timestamp, c, clusterId);
     }
     
+    /**
+     * Get the timestamp value of the current date and time.
+     * 
+     * @return the timestamp
+     */
+    public static long getCurrentTimestamp() {
+        return System.currentTimeMillis() / 100 - timestampOffset;
+    }
+    
+    /**
+     * Get the difference between two timestamps (a - b) in milliseconds.
+     * 
+     * @param a the first timestamp
+     * @param b the second timestamp
+     * @return the difference in milliseconds
+     */
+    public static long getTimestampDifference(long a, long b) {
+        return (a - b) * 100;
+    }
+    
     public static Revision fromString(String rev) {
         if (!rev.startsWith("r")) {
             throw new IllegalArgumentException(rev);

Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BaseMongoMKTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BaseMongoMKTest.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BaseMongoMKTest.java
(original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BaseMongoMKTest.java
Thu Apr  4 14:48:46 2013
@@ -30,7 +30,7 @@ public class BaseMongoMKTest extends Bas
     @Override
     public void setUp() throws Exception {
         DB db = mongoConnection.getDB();
-        mk = new MongoMK(db, 0);
+        mk = new MongoMK.Builder().setMongoDB(db).open();
     }
 
     @Override

Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BlobTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BlobTest.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BlobTest.java
(original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BlobTest.java
Thu Apr  4 14:48:46 2013
@@ -50,7 +50,8 @@ public class BlobTest {
 
     @Test
     public void addBlobs() throws Exception {
-        MongoMK mk = new MongoMK(openMongoConnection(), 0);
+        MongoMK mk = new MongoMK.Builder().
+                setMongoDB(openMongoConnection()).open();
         long blobSize = TOTAL_SIZE / DOCUMENT_COUNT;
         ArrayList<String> blobIds = new ArrayList<String>();
         // use a new seed each time, to allow running the test multiple times 

Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java
(original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java
Thu Apr  4 14:48:46 2013
@@ -16,10 +16,12 @@
  */
 package org.apache.jackrabbit.mongomk.prototype;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import org.apache.jackrabbit.mk.api.MicroKernelException;
 import org.apache.jackrabbit.mk.blobs.MemoryBlobStore;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import com.mongodb.DB;
@@ -56,6 +58,35 @@ public class ClusterTest {
     }
     
     @Test
+    @Ignore
+    public void revisionVisibility() throws InterruptedException {
+        MongoMK mk1 = createMK(1);
+        MongoMK mk2 = createMK(2);
+        
+        String m2h;
+        m2h = mk2.getNodes("/", mk2.getHeadRevision(), 0, 0, 2, null);
+        assertEquals("{\":childNodeCount\":0}", m2h);
+        
+        mk1.commit("/", "+\"test\":{}", null, null);
+        String m1h = mk1.getNodes("/", mk1.getHeadRevision(), 0, 0, 1, null);
+        assertEquals("{\"test\":{},\":childNodeCount\":1}", m1h);
+        
+        m2h = mk2.getNodes("/", mk2.getHeadRevision(), 0, 0, 2, null);
+        // not available yet...
+        assertEquals("{\":childNodeCount\":0}", m2h);
+        
+        // the delay is 10 ms
+        Thread.sleep(100);
+        
+        // so now it should be available
+        m2h = mk2.getNodes("/", mk2.getHeadRevision(), 0, 0, 5, null);
+        assertEquals("{\"test\":{},\":childNodeCount\":1}", m2h);
+        
+        mk1.dispose();
+        mk2.dispose();
+    }    
+    
+    @Test
     public void rollbackAfterConflict() {
         MongoMK mk1 = createMK(1);
         MongoMK mk2 = createMK(2);
@@ -78,18 +109,22 @@ public class ClusterTest {
 
 
     private MongoMK createMK(int clusterId) {
+        MongoMK.Builder builder = new MongoMK.Builder();
         if (MONGO_DB) {
             DB db = MongoUtils.getConnection().getDB();
             MongoUtils.dropCollections(db);
-            return new MongoMK(db, clusterId);
-        }
-        if (ds == null) {
-            ds = new MemoryDocumentStore();
-        }
-        if (bs == null) {
-            bs = new MemoryBlobStore();
+            builder.setMongoDB(db);
+        } else {
+            if (ds == null) {
+                ds = new MemoryDocumentStore();
+            }
+            if (bs == null) {
+                bs = new MemoryBlobStore();
+            }
+            builder.setDocumentStore(ds).setBlobStore(bs);
         }
-        return new MongoMK(ds, bs, clusterId);
+        builder.setAsyncDelay(10);
+        return builder.setClusterId(clusterId).open();
     }
 
 }

Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedClusterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedClusterTest.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedClusterTest.java
(original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedClusterTest.java
Thu Apr  4 14:48:46 2013
@@ -236,18 +236,21 @@ System.out.print(msg);
     }
     
     private MongoMK createMK(int clusterId) {
+        MongoMK.Builder builder = new MongoMK.Builder();
         if (MONGO_DB) {
             DB db = MongoUtils.getConnection().getDB();
             MongoUtils.dropCollections(db);
-            return new MongoMK(db, clusterId);
+            builder.setMongoDB(db);
+        } else {
+            if (ds == null) {
+                ds = new MemoryDocumentStore();
+            }
+            if (bs == null) {
+                bs = new MemoryBlobStore();
+            }
+            builder.setDocumentStore(ds).setBlobStore(bs);
         }
-        if (ds == null) {
-            ds = new MemoryDocumentStore();
-        }
-        if (bs == null) {
-            bs = new MemoryBlobStore();
-        }
-        return new MongoMK(ds, bs, clusterId);
+        return builder.setClusterId(clusterId).open();
     }
     
     /**

Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedTest.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedTest.java
(original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedTest.java
Thu Apr  4 14:48:46 2013
@@ -237,12 +237,13 @@ public class RandomizedTest {
     }
     
     private static MongoMK createMK() {
+        MongoMK.Builder builder = new MongoMK.Builder();
         if (MONGO_DB) {
             DB db = MongoUtils.getConnection().getDB();
             MongoUtils.dropCollections(db);
-            return new MongoMK(db, 0);
+            builder.setMongoDB(db);
         }
-        return new MongoMK();
+        return builder.open();
     }
 
 }

Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java
(original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java
Thu Apr  4 14:48:46 2013
@@ -42,7 +42,7 @@ public class SimpleTest {
 
     @Test
     public void test() {
-        MongoMK mk = new MongoMK();
+        MongoMK mk = new MongoMK.Builder().open();
         mk.dispose();
     }
 
@@ -80,7 +80,7 @@ public class SimpleTest {
     
     @Test
     public void addNodeGetNode() {
-        MongoMK mk = new MongoMK();
+        MongoMK mk = new MongoMK.Builder().open();
         Revision rev = mk.newRevision();
         Node n = new Node("/test", rev);
         n.setProperty("name", "Hello");
@@ -351,12 +351,13 @@ public class SimpleTest {
     }
 
     private static MongoMK createMK() {
+        MongoMK.Builder builder = new MongoMK.Builder();
         if (MONGO_DB) {
             DB db = MongoUtils.getConnection().getDB();
             MongoUtils.dropCollections(db);
-            return new MongoMK(db, 0);
+            builder.setMongoDB(db);
         }
-        return new MongoMK();
+        return builder.open();
     }
 
 }



Mime
View raw message