jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chet...@apache.org
Subject svn commit: r1594161 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/ main/java/org/apache/jackrabbit/oak/plugins/document/mongo/ main/java/org/apache/jackrabbit/oak/plugins/document/util/ test/java/org/apa...
Date Tue, 13 May 2014 09:17:10 GMT
Author: chetanm
Date: Tue May 13 09:17:10 2014
New Revision: 1594161

URL: http://svn.apache.org/r1594161
Log:
OAK-1645 - Route find queries to Mongo secondary in MongoDocumentStore

The find and query method now explicitly define the read preference to be used. The preference
is determined by type of Collection , maxCacheAge and modified time of parent. if preference
is secondary then actual preference used is picked from DB level settings

Also exposed a setting 'maxReplicationLagMillis' which determines the max possible lag in
replication beyond which its safe to route the read queries to secondaries

Added:
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java
  (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1594161&r1=1594160&r2=1594161&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
Tue May 13 09:17:10 2014
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.io.InputStream;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -463,6 +464,7 @@ public class DocumentMK implements Micro
         private boolean useSimpleRevision;
         private long splitDocumentAgeMillis = 5 * 60 * 1000;
         private long offHeapCacheSize = -1;
+        private long maxReplicationLagMillis = TimeUnit.HOURS.toMillis(6);
         private boolean disableBranches;
         private Clock clock = Clock.SIMPLE;
         private Executor executor;
@@ -731,6 +733,15 @@ public class DocumentMK implements Micro
             return clock;
         }
 
+        public Builder setMaxReplicationLag(long duration, TimeUnit unit){
+            maxReplicationLagMillis = unit.toMillis(duration);
+            return this;
+        }
+
+        public long getMaxReplicationLagMillis() {
+            return maxReplicationLagMillis;
+        }
+
         public Builder disableBranches() {
             disableBranches = true;
             return this;

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1594161&r1=1594160&r2=1594161&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
Tue May 13 09:17:10 2014
@@ -37,6 +37,7 @@ import javax.annotation.Nullable;
 import com.google.common.base.Splitter;
 
 import com.google.common.collect.Lists;
+import com.mongodb.ReadPreference;
 import org.apache.jackrabbit.mk.api.MicroKernelException;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.cache.CacheValue;
@@ -56,6 +57,8 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocOffHeapCache;
 import org.apache.jackrabbit.oak.plugins.document.cache.OffHeapCache;
 import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,7 +75,6 @@ import com.mongodb.DBCursor;
 import com.mongodb.DBObject;
 import com.mongodb.MongoException;
 import com.mongodb.QueryBuilder;
-import com.mongodb.ReadPreference;
 import com.mongodb.WriteConcern;
 import com.mongodb.WriteResult;
 
@@ -87,6 +89,13 @@ public class MongoDocumentStore implemen
 
     private static final DBObject BY_ID_ASC = new BasicDBObject(Document.ID, 1);
 
+    static enum DocumentReadPreference {
+        PRIMARY,
+        PREFER_PRIMARY,
+        PREFER_SECONDARY,
+        PREFER_SECONDARY_IF_OLD_ENOUGH
+    }
+
     public static final int IN_CLAUSE_BATCH_SIZE = 500;
 
     private final DBCollection nodes;
@@ -112,9 +121,16 @@ public class MongoDocumentStore implemen
      */
     private final Comparator<Revision> comparator = StableRevisionComparator.REVERSE;
 
+    private final Clock clock;
+
+    private final DB db;
+
+    private final long maxReplicationLagMillis;
+
     private String lastReadWriteMode;
 
     public MongoDocumentStore(DB db, DocumentMK.Builder builder) {
+        this.db = db;
         nodes = db.getCollection(
                 Collection.NODES.toString());
         clusterNodes = db.getCollection(
@@ -122,6 +138,9 @@ public class MongoDocumentStore implemen
         settings = db.getCollection(
                 Collection.SETTINGS.toString());
 
+        clock = builder.getClock();
+        maxReplicationLagMillis = builder.getMaxReplicationLagMillis();
+
         // indexes:
         // the _id field is the primary key, so we don't need to define it
         DBObject index = new BasicDBObject();
@@ -231,25 +250,32 @@ public class MongoDocumentStore implemen
 
     @Override
     public <T extends Document> T find(Collection<T> collection, String key)
{
-        return find(collection, key, Integer.MAX_VALUE);
+        return find(collection, key, true, -1);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <T extends Document> T find(final Collection<T> collection,
                                        final String key,
                                        int maxCacheAge) {
+        return find(collection, key, false, maxCacheAge);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T extends Document> T find(final Collection<T> collection,
+                                       final String key,
+                                       boolean preferCached,
+                                       final int maxCacheAge) {
         if (collection != Collection.NODES) {
-            return findUncached(collection, key);
+            return findUncached(collection, key, DocumentReadPreference.PRIMARY);
         }
         CacheValue cacheKey = new StringValue(key);
         NodeDocument doc;
-        if (maxCacheAge > 0) {
+        if (maxCacheAge > 0 || preferCached) {
             // first try without lock
             doc = nodesCache.getIfPresent(cacheKey);
             if (doc != null) {
-                if (maxCacheAge == Integer.MAX_VALUE ||
-                        System.currentTimeMillis() - doc.getCreated() < maxCacheAge) {
+                if (preferCached ||
+                        getTime() - doc.getCreated() < maxCacheAge) {
                     if (doc == NodeDocument.NULL) {
                         return null;
                     }
@@ -267,17 +293,17 @@ public class MongoDocumentStore implemen
                     doc = nodesCache.get(cacheKey, new Callable<NodeDocument>() {
                         @Override
                         public NodeDocument call() throws Exception {
-                            NodeDocument doc = (NodeDocument) findUncached(collection, key);
+                            NodeDocument doc = (NodeDocument) findUncached(collection, key,
getReadPreference(maxCacheAge));
                             if (doc == null) {
                                 doc = NodeDocument.NULL;
                             }
                             return doc;
                         }
                     });
-                    if (maxCacheAge == 0 || maxCacheAge == Integer.MAX_VALUE) {
+                    if (maxCacheAge == 0 || preferCached) {
                         break;
                     }
-                    if (System.currentTimeMillis() - doc.getCreated() < maxCacheAge) {
+                    if (getTime() - doc.getCreated() < maxCacheAge) {
                         break;
                     }
                     // too old: invalidate, try again
@@ -297,12 +323,25 @@ public class MongoDocumentStore implemen
     }
 
     @CheckForNull
-    <T extends Document> T findUncached(Collection<T> collection, String key)
{
+    private <T extends Document> T findUncached(Collection<T> collection, String
key, DocumentReadPreference docReadPref) {
         DBCollection dbCollection = getDBCollection(collection);
         long start = start();
         try {
-            DBObject obj = dbCollection.findOne(getByKeyQuery(key).get());
-            if (obj == null) {
+            ReadPreference readPreference = getMongoReadPreference(collection, key, docReadPref);
+            DBObject obj = dbCollection.findOne(getByKeyQuery(key).get(), null, null, readPreference);
+
+            if (obj == null
+                    && readPreference.isSlaveOk()) {
+                //In case secondary read preference is used and node is not found
+                //then check with primary again as it might happen that node document has
not been
+                //replicated. This is required for case like SplitDocument where the SplitDoc
is fetched with
+                //maxCacheAge == Integer.MAX_VALUE which results in readPreference of secondary.
+                //In such a case we know that document with such an id must exist
+                //but possibly dut to replication lag it has not reached to secondary. So
in that case read again
+                //from primary
+                obj = dbCollection.findOne(getByKeyQuery(key).get(), null, null, ReadPreference.primary());
+            }
+            if(obj == null){
                 return null;
             }
             T doc = convertFromDBObject(collection, obj);
@@ -345,6 +384,8 @@ public class MongoDocumentStore implemen
         long start = start();
         try {
             DBCursor cursor = dbCollection.find(query).sort(BY_ID_ASC);
+            cursor.setReadPreference(getMongoReadPreference(collection, fromKey, getDefaultReadPreference(collection)));
+
             List<T> list;
             try {
                 list = new ArrayList<T>();
@@ -624,6 +665,67 @@ public class MongoDocumentStore implemen
         }
     }
 
+    DocumentReadPreference getReadPreference(int maxCacheAge){
+        if(maxCacheAge >= 0 && maxCacheAge < maxReplicationLagMillis) {
+            return DocumentReadPreference.PRIMARY;
+        } else if(maxCacheAge == Integer.MAX_VALUE){
+            return DocumentReadPreference.PREFER_SECONDARY;
+        } else {
+           return DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH;
+        }
+    }
+
+    DocumentReadPreference getDefaultReadPreference(Collection col){
+        return col == Collection.NODES ? DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH
: DocumentReadPreference.PRIMARY;
+    }
+
+    <T extends Document> ReadPreference getMongoReadPreference(Collection<T>
collection,
+                                                               String key,
+                                                               DocumentReadPreference preference)
{
+        switch(preference){
+            case PRIMARY:
+                return ReadPreference.primary();
+            case PREFER_PRIMARY :
+                return ReadPreference.primaryPreferred();
+            case PREFER_SECONDARY :
+                return getDefaultReadPreference();
+            case PREFER_SECONDARY_IF_OLD_ENOUGH:
+                if(collection != Collection.NODES){
+                    return ReadPreference.primary();
+                }
+
+                //Default to primary preferred such that in case primary is being elected
+                //we can still read from secondary
+                //TODO REVIEW Would that be safe
+                ReadPreference readPreference = ReadPreference.primaryPreferred();
+                String parentId = Utils.getParentId(key);
+                if (parentId != null) {
+                    long replicationSafeLimit = getTime() - maxReplicationLagMillis;
+                    NodeDocument cachedDoc = (NodeDocument) getIfCached(collection, parentId);
+                    if (cachedDoc != null && !cachedDoc.hasBeenModifiedSince(replicationSafeLimit))
{
+
+                        //If parent has been modified loooong time back then there children
+                        //would also have not be modified. In that case we can read from
secondary
+                        readPreference = getDefaultReadPreference();
+                    }
+                }
+                return readPreference;
+            default:
+                throw new IllegalArgumentException("Unsupported usage " + preference);
+        }
+    }
+
+    /**
+     * Retrieves the ReadPreference specified for the Mongo DB in use irrespective of
+     * DBCollection. Depending on deployments the user can tweak the default references
+     * to read from secondary and in that also tag secondaries
+     *
+     * @return db level ReadPreference
+     */
+    ReadPreference getDefaultReadPreference(){
+        return db.getReadPreference();
+    }
+
     @CheckForNull
     <T extends Document> T convertFromDBObject(@Nonnull Collection<T> collection,
                                                @Nullable DBObject n) {
@@ -940,4 +1042,8 @@ public class MongoDocumentStore implemen
             LOG.error("Error setting readWriteMode " + readWriteMode, e);
         }
     }
+
+    private long getTime() {
+        return clock.getTime();
+    }
 }
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java?rev=1594161&r1=1594160&r2=1594161&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
Tue May 13 09:17:10 2014
@@ -30,6 +30,7 @@ import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
+import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
@@ -248,6 +249,31 @@ public class Utils {
         return depth + ":" + path;
     }
 
+    /**
+     * Returns the parent id for given id if possible
+     *
+     * <p>It would return null in following cases
+     * <ul>
+     *     <li>If id is from long path</li>
+     *     <li>If id is for root path</li>
+     * </ul>
+     *</p>
+     * @param id id for which parent id needs to be determined
+     * @return parent id. null if parent id cannot be determined
+     */
+    @CheckForNull
+    public static String getParentId(String id){
+        if(Utils.isIdFromLongPath(id)){
+            return null;
+        }
+        String path = Utils.getPathFromId(id);
+        if(PathUtils.denotesRoot(path)){
+            return null;
+        }
+        String parentPath = PathUtils.getParentPath(path);
+        return Utils.getIdFromPath(parentPath);
+    }
+
     public static boolean isLongPath(String path) {
         // the most common case: a short path
         // avoid calculating the parent path

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java?rev=1594161&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java
(added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java
Tue May 13 09:17:10 2014
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import java.util.concurrent.TimeUnit;
+
+import com.mongodb.ReadPreference;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.document.MongoUtils;
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.SETTINGS;
+import static org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore.DocumentReadPreference;
+import static org.junit.Assert.assertEquals;
+
+public class ReadPreferenceIT {
+
+    private DocumentNodeStore documentNodeStore;
+    private NodeStore nodeStore;
+
+    private MongoDocumentStore mongoDS;
+
+    private Clock clock;
+
+    private long replicationLag;
+
+    @BeforeClass
+    public static void checkMongoDbAvailable() {
+        Assume.assumeNotNull(MongoUtils.getConnection());
+    }
+
+    @Before
+    public void prepareStores() throws Exception {
+        clock = new Clock.Virtual();
+        replicationLag = TimeUnit.SECONDS.toMillis(10);
+        MongoConnection mc = MongoUtils.getConnection();
+        documentNodeStore = new DocumentMK.Builder()
+                .clock(clock)
+                .setMaxReplicationLag(replicationLag, TimeUnit.HOURS)
+                .setMongoDB(mc.getDB())
+                .setClusterId(1)
+                .getNodeStore();
+        mongoDS = (MongoDocumentStore) documentNodeStore.getDocumentStore();
+        nodeStore = documentNodeStore;
+    }
+
+    @After
+    public void clearDB() {
+        MongoUtils.dropCollections(mongoDS.getDBCollection(NODES).getDB());
+    }
+
+    @Test
+    public void testPreferenceConversion() throws Exception{
+        //For cacheAge < replicationLag result should be primary
+        assertEquals(DocumentReadPreference.PRIMARY, mongoDS.getReadPreference(0));
+        assertEquals(DocumentReadPreference.PRIMARY,
+                mongoDS.getReadPreference((int) (replicationLag - 100)));
+
+        //For Integer.MAX_VALUE it should be secondary as caller intends that value is stable
+        assertEquals(DocumentReadPreference.PREFER_SECONDARY,
+                mongoDS.getReadPreference(Integer.MAX_VALUE));
+
+        //For all other cases depends on age
+        assertEquals(DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH,
+                mongoDS.getReadPreference(-1));
+        assertEquals(DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH,
+                mongoDS.getReadPreference((int) (replicationLag + 100)));
+    }
+
+    @Test
+    public void testMongoReadPreferencesDefault() throws Exception{
+        assertEquals(ReadPreference.primary(),
+                mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PRIMARY));
+
+        assertEquals(ReadPreference.primaryPreferred(),
+                mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_PRIMARY));
+
+        //By default Mongo read preference is primary
+        assertEquals(ReadPreference.primary(),
+                mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY));
+
+        //Change the default and assert again
+        mongoDS.getDBCollection(NODES).getDB().setReadPreference(ReadPreference.secondary());
+        assertEquals(ReadPreference.secondary(),
+                mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY));
+
+        //for case where parent age cannot be determined the preference should be primaryPreferred
+        assertEquals(ReadPreference.primaryPreferred(),
+                mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+
+        //For collection other than NODES always primary
+        assertEquals(ReadPreference.primary(),
+                mongoDS.getMongoReadPreference(SETTINGS,"foo", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+
+    }
+
+    @Test
+    public void testMongoReadPreferencesWithAge() throws Exception{
+        //Change the default
+        ReadPreference testPref = ReadPreference.secondary();
+        mongoDS.getDBCollection(NODES).getDB().setReadPreference(testPref);
+
+        NodeBuilder b1 = nodeStore.getRoot().builder();
+        b1.child("x").child("y");
+        nodeStore.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        String id = Utils.getIdFromPath("/x/y");
+        mongoDS.invalidateCache(NODES,id);
+
+        //For modifiedTime < replicationLag primary should be preferred
+        assertEquals(ReadPreference.primaryPreferred(),
+                mongoDS.getMongoReadPreference(NODES,id, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+
+        //Going into future to make parent /x old enough
+        clock.waitUntil(Revision.getCurrentTimestamp() + replicationLag);
+
+        //For old modified nodes secondaries should be preferred
+        assertEquals(testPref,
+                mongoDS.getMongoReadPreference(NODES,id, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message