jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ju...@apache.org
Subject svn commit: r1574626 - in /jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr: delegate/SessionDelegate.java repository/RepositoryImpl.java session/RefreshStrategy.java
Date Wed, 05 Mar 2014 19:11:06 GMT
Author: jukka
Date: Wed Mar  5 19:11:06 2014
New Revision: 1574626

URL: http://svn.apache.org/r1574626
Log:
OAK-1418: Read performance regression

Inline RefreshStrategy.Once and .ThreadSynchronizing to optimize the
critical path in ReadPropertyTest

Modified:
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/RefreshStrategy.java

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java?rev=1574626&r1=1574625&r2=1574626&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
(original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
Wed Mar  5 19:11:06 2014
@@ -66,6 +66,22 @@ public class SessionDelegate {
 
     private final ContentSession contentSession;
     private final RefreshStrategy refreshStrategy;
+    private boolean refreshAtNextAccess = false;
+
+    /**
+     * The repository-wide {@link ThreadLocal} that keeps track of the number
+     * of saves performed in each thread.
+     */
+    private final ThreadLocal<Long> threadSaveCount;
+
+    /**
+     * Local copy of the {@link #threadSaveCount} for the current thread.
+     * If the repository-wide counter differs from our local copy, then
+     * some other session would have done a commit or this session is
+     * being accessed from some other thread. In either case it's best to
+     * refresh this session to avoid unexpected behaviour.
+     */
+    private long sessionSaveCount;
 
     private final Root root;
     private final IdentifierManager idManager;
@@ -111,9 +127,12 @@ public class SessionDelegate {
     public SessionDelegate(
             @Nonnull ContentSession contentSession,
             @Nonnull RefreshStrategy refreshStrategy,
+            @Nonnull ThreadLocal<Long> threadSaveCount,
             @Nonnull StatisticManager statisticManager) {
         this.contentSession = checkNotNull(contentSession);
         this.refreshStrategy = checkNotNull(refreshStrategy);
+        this.threadSaveCount = checkNotNull(threadSaveCount);
+        this.sessionSaveCount = getThreadSaveCount();
         this.root = contentSession.getLatestRoot();
         this.idManager = new IdentifierManager(root);
         this.sessionStats = new SessionStats(this);
@@ -129,6 +148,11 @@ public class SessionDelegate {
         return sessionStats;
     }
 
+    private long getThreadSaveCount() {
+        Long c = threadSaveCount.get();
+        return c == null ? 0 : c;
+    }
+
     public long getNanosecondsSinceLastAccess() {
         return nanoTime() - lastAccessTimeNS;
     }
@@ -182,8 +206,8 @@ public class SessionDelegate {
         return saveCount;
     }
 
-    public void refreshAtNextAccess() {
-        refreshStrategy.refreshAtNextAccess();
+    public synchronized void refreshAtNextAccess() {
+        refreshAtNextAccess = true;
     }
 
     /**
@@ -221,9 +245,12 @@ public class SessionDelegate {
             if (!sessionOperation.isRefresh()
                     && !sessionOperation.isSave()
                     && !sessionOperation.isLogout()
-                    && refreshStrategy.needsRefresh(t0 - lastAccessTimeNS)) {
+                    && (refreshAtNextAccess
+                        || sessionSaveCount != getThreadSaveCount()
+                        || refreshStrategy.needsRefresh(t0 - lastAccessTimeNS))) {
                 refresh(true);
-                refreshStrategy.refreshed();
+                refreshAtNextAccess = false;
+                sessionSaveCount = getThreadSaveCount();
                 updateCount++;
             }
             sessionOperation.checkPreconditions();
@@ -250,9 +277,12 @@ public class SessionDelegate {
                 readDuration.addAndGet(dt);
             }
             if (sessionOperation.isSave()) {
-                refreshStrategy.saved();
+                refreshAtNextAccess = false;
+                // Force refreshing on access through other sessions on the same thread
+                threadSaveCount.set(sessionSaveCount = (getThreadSaveCount() + 1));
             } else if (sessionOperation.isRefresh()) {
-                refreshStrategy.refreshed();
+                refreshAtNextAccess = false;
+                sessionSaveCount = getThreadSaveCount();
             }
         }
     }

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java?rev=1574626&r1=1574625&r2=1574626&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java
(original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java
Wed Mar  5 19:11:06 2014
@@ -40,6 +40,7 @@ import com.google.common.util.concurrent
 import com.google.common.util.concurrent.ListenableScheduledFuture;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+
 import org.apache.jackrabbit.api.JackrabbitRepository;
 import org.apache.jackrabbit.api.security.authentication.token.TokenCredentials;
 import org.apache.jackrabbit.commons.SimpleValueFactory;
@@ -49,10 +50,6 @@ import org.apache.jackrabbit.oak.api.jmx
 import org.apache.jackrabbit.oak.stats.StatisticManager;
 import org.apache.jackrabbit.oak.jcr.delegate.SessionDelegate;
 import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy;
-import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy.LogOnce;
-import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy.Once;
-import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy.ThreadSynchronising;
-import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy.Timed;
 import org.apache.jackrabbit.oak.jcr.session.SessionContext;
 import org.apache.jackrabbit.oak.jcr.session.SessionStats;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
@@ -85,7 +82,20 @@ public class RepositoryImpl implements J
     private final ContentRepository contentRepository;
     protected final Whiteboard whiteboard;
     private final SecurityProvider securityProvider;
-    private final ThreadLocal<Long> threadSaveCount;
+
+    /**
+     * {@link ThreadLocal} counter that keeps track of the save operations
+     * performed per thread so far. This is is then used to determine if
+     * the current session needs to be refreshed to see the changes done by
+     * another session in the same thread.
+     * <p>
+     * <b>Note</b> - This thread local is never cleared. However, we only
+     * store a {@link Long} instance and do not derive from
+     * {@link ThreadLocal} so that (class loader) leaks typically associated
+     * with thread locals do not occur.
+     */
+    private final ThreadLocal<Long> threadSaveCount = new ThreadLocal<Long>();
+
     private final ListeningScheduledExecutorService scheduledExecutor =
             MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor());
     private final StatisticManager statisticManager;
@@ -96,7 +106,6 @@ public class RepositoryImpl implements J
         this.contentRepository = checkNotNull(contentRepository);
         this.whiteboard = checkNotNull(whiteboard);
         this.securityProvider = checkNotNull(securityProvider);
-        this.threadSaveCount = new ThreadLocal<Long>();
         this.descriptors = determineDescriptors();
         this.statisticManager = new StatisticManager(whiteboard, scheduledExecutor);
     }
@@ -235,7 +244,9 @@ public class RepositoryImpl implements J
     private SessionDelegate createSessionDelegate(
             final RefreshStrategy refreshStrategy,
             final ContentSession contentSession) {
-        return new SessionDelegate(contentSession, refreshStrategy, statisticManager) {
+        return new SessionDelegate(
+                contentSession, refreshStrategy,
+                threadSaveCount, statisticManager) {
             // Defer session MBean registration to avoid cluttering the
             // JMX name space with short lived sessions
             ListenableScheduledFuture<Registration> registration = scheduledExecutor.schedule(
@@ -364,15 +375,11 @@ public class RepositoryImpl implements J
      * minute of inactivity.
      */
     private RefreshStrategy createRefreshStrategy(Long refreshInterval) {
-        return new RefreshStrategy(refreshInterval == null
-                ? new RefreshStrategy[] {
-                new Once(false),
-                new LogOnce(60),
-                new ThreadSynchronising(threadSaveCount)}
-                : new RefreshStrategy[] {
-                new Once(false),
-                new Timed(refreshInterval),
-                new ThreadSynchronising(threadSaveCount)});
+        if (refreshInterval == null) {
+            return new RefreshStrategy.LogOnce(60);
+        } else {
+            return new RefreshStrategy.Timed(refreshInterval);
+        }
     }
 
     private static class RegistrationCallable implements Callable<Registration> {

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/RefreshStrategy.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/RefreshStrategy.java?rev=1574626&r1=1574625&r2=1574626&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/RefreshStrategy.java
(original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/RefreshStrategy.java
Wed Mar  5 19:11:06 2014
@@ -23,41 +23,20 @@ import static java.util.concurrent.TimeU
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
-import org.apache.jackrabbit.oak.jcr.delegate.SessionDelegate;
-import org.apache.jackrabbit.oak.jcr.session.operation.SessionOperation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Instances of this class determine whether a session needs to be refreshed
- * before the next session operation is performed.
- * <p>
- * Before an operation is performed a session calls {@link #needsRefresh(SessionDelegate)},
- * to determine whether the session needs to be refreshed first. To maintain a session strategy's
- * state sessions call {@link #refreshed()} right after each refresh operation and
- * {@link #saved()} right after each save operation.
- * <p>
- * {@code RefreshStrategy} is a composite of zero or more {@code RefreshStrategy} instances,
- * each of which covers a certain strategy.
- * @see Default
- * @see Once
+ * Implementations of this interface determine whether a session needs
+ * to be refreshed before the next session operation is performed. This is
+ * done by the session calling {@link #needsRefresh(long)} to determine
+ * whether a refresh is needed.
+ *
+ * @see Composite
  * @see Timed
  * @see LogOnce
- * @see ThreadSynchronising
  */
-public class RefreshStrategy {
-    private static final Logger log = LoggerFactory.getLogger(RefreshStrategy.class);
-
-    private final RefreshStrategy[] refreshStrategies;
-
-    /**
-     * Create a new instance consisting of the composite of the passed {@code RefreshStrategy}
-     * instances.
-     * @param refreshStrategies  individual refresh strategies
-     */
-    public RefreshStrategy(RefreshStrategy... refreshStrategies) {
-        this.refreshStrategies = refreshStrategies;
-    }
+public interface RefreshStrategy {
 
     /**
      * Determine whether the given session needs to refresh before the next
@@ -70,162 +49,51 @@ public class RefreshStrategy {
      * @param nanosecondsSinceLastAccess nanoseconds since last access
      * @return  {@code true} if and only if the session needs to refresh.
      */
-    public boolean needsRefresh(long nanosecondsSinceLastAccess) {
-        for (RefreshStrategy r : refreshStrategies) {
-            if (r.needsRefresh(nanosecondsSinceLastAccess)) {
-                return true;
-            }
-        }
-        return false;
-    }
+    boolean needsRefresh(long nanosecondsSinceLastAccess);
 
     /**
-     * Called whenever a session has been refreshed.
-     * <p>
-     * This implementation forwards to the {@code refresh} method of the individual refresh
-     * strategies passed to the constructor.
+     * Composite of zero or more {@code RefreshStrategy} instances,
+     * each of which covers a certain strategy.
      */
-    public void refreshed() {
-        for (RefreshStrategy r : refreshStrategies) {
-            r.refreshed();
-        }
-    }
+    public static class Composite implements RefreshStrategy {
 
-    /**
-     * Called whenever a session has been saved.
-     * <p>
-     * This implementation forwards to the {@code save} method of the individual refresh
-     * strategies passed to the constructor.
-     */
-    public void saved() {
-        for (RefreshStrategy r : refreshStrategies) {
-            r.saved();
-        }
-    }
-
-    /**
-     * Accept the passed visitor.
-     * <p>
-     * This implementation forwards to the {@code accept} method of the individual refresh
-     * strategies passed to the constructor.
-     */
-    public void accept(Visitor visitor) {
-        for (RefreshStrategy r: refreshStrategies) {
-            r.accept(visitor);
-        }
-    }
-
-    /**
-     * Force the next call to {@link #needsRefresh(SessionOperation)} to return {@code true}
for
-     * every {@link Once} strategy in this composite.
-     * <p>
-     * This method is safe for calling concurrently to any other method of this class.
-     */
-    public void refreshAtNextAccess() {
-        accept(new Visitor() {
-            @Override
-            public void visit(Once strategy) {
-                strategy.reset();
-            }
-        });
-    }
-
-    /**
-     * Visitor for traversing the composite.
-     */
-    public static class Visitor {
-        public void visit(Default strategy) {}
-        public void visit(Once strategy) {}
-        public void visit(Timed strategy) {}
-        public void visit(LogOnce strategy) {}
-        public void visit(ThreadSynchronising strategy) {}
-    }
-
-    /**
-     * This refresh strategy does wither always or never refresh depending of the value of
the
-     * {@code refresh} argument passed to its constructor.
-     * <p>
-     */
-    public static class Default extends RefreshStrategy {
-
-        /** A refresh strategy that always refreshed */
-        public static RefreshStrategy ALWAYS = new Default(true);
-
-        /** A refresh strategy that never refreshed */
-        public static RefreshStrategy NEVER = new Default(false);
-
-        /** Value returned from {@code needsRefresh} */
-        protected volatile boolean refresh;
+        private final RefreshStrategy[] refreshStrategies;
 
         /**
-         * @param refresh  value returned from {@code needsRefresh}
+         * Create a new instance consisting of the composite of the
+         * passed {@code RefreshStrategy} instances.
+         * @param refreshStrategies  individual refresh strategies
          */
-        public Default(boolean refresh) {
-            this.refresh = refresh;
+        public Composite(RefreshStrategy... refreshStrategies) {
+            this.refreshStrategies = refreshStrategies;
         }
 
         /**
-         * @return {@link #refresh}
+         * Determine whether the given session needs to refresh before the next
+         * session operation is performed.
+         * <p>
+         * This implementation returns {@code true} if and only if any of the
+         * individual refresh strategies passed to the constructor returns
+         * {@code true}.
+         *
+         * @param nanosecondsSinceLastAccess nanoseconds since last access
+         * @return  {@code true} if and only if the session needs to refresh.
          */
-        @Override
         public boolean needsRefresh(long nanosecondsSinceLastAccess) {
-            return refresh;
-        }
-
-        @Override
-        public void refreshed() {
-        }
-
-        @Override
-        public void saved() {
-        }
-
-        @Override
-        public void accept(Visitor visitor) {
-            visitor.visit(this);
-        }
-    }
-
-    /**
-     * This refresh strategy refreshed exactly once when enabled. Calling
-     * {@link #reset()} enables the strategy.
-     */
-    public static class Once extends Default {
-
-        /**
-         * @param enabled  whether this refresh strategy is initially enabled
-         */
-        public Once(boolean enabled) {
-            super(enabled);
-        }
-
-        /**
-         * Enable this refresh strategy
-         */
-        public void reset() {
-            refresh = true;
-        }
-
-        @Override
-        public void refreshed() {
-            refresh = false;
-        }
-
-        @Override
-        public void saved() {
-            refresh = false;
+            for (RefreshStrategy r : refreshStrategies) {
+                if (r.needsRefresh(nanosecondsSinceLastAccess)) {
+                    return true;
+                }
+            }
+            return false;
         }
 
-        @Override
-        public void accept(Visitor visitor) {
-            visitor.visit(this);
-        }
     }
 
     /**
      * This refresh strategy refreshes after a given timeout of inactivity.
      */
-    public static class Timed extends RefreshStrategy {
+    public static class Timed implements RefreshStrategy {
 
         private final long interval;
 
@@ -242,10 +110,6 @@ public class RefreshStrategy {
             return nanosecondsSinceLastAccess > interval;
         }
 
-        @Override
-        public void accept(Visitor visitor) {
-            visitor.visit(this);
-        }
     }
 
     /**
@@ -254,8 +118,13 @@ public class RefreshStrategy {
      *
      * TODO replace logging with JMX monitoring. See OAK-941
      */
-    public static class LogOnce extends RefreshStrategy {
-        private final Exception initStackTrace = new Exception("The session was created here:");
+    public static class LogOnce implements RefreshStrategy {
+
+        private static final Logger log =
+                LoggerFactory.getLogger(RefreshStrategy.class);
+
+        private final Exception initStackTrace =
+                new Exception("The session was created here:");
 
         private final long interval;
 
@@ -287,65 +156,6 @@ public class RefreshStrategy {
             return false;
         }
 
-        @Override
-        public void accept(Visitor visitor) {
-            visitor.visit(this);
-        }
     }
 
-    /**
-     * This refresh strategy synchronises session states across accesses within the same
thread.
-     */
-    public static class ThreadSynchronising extends RefreshStrategy {
-        /**
-         * ThreadLocal instance to keep track of the save operations performed in the thread
so far
-         * This is is then used to determine if the current session needs to be refreshed
to see the
-         * changes done by another session in current thread.
-         * <p>
-         * <b>Note</b> - This thread local is never cleared. However, we only
store
-         * java.lang.Integer and do not derive from ThreadLocal such that (class loader)
-         * leaks typically associated with thread locals do not occur.
-         */
-        private final ThreadLocal<Long> threadSaveCount;
-
-        private long sessionSaveCount;
-
-        /**
-         * @param threadSaveCount  thread local for tracking thread local state.
-         */
-        public ThreadSynchronising(ThreadLocal<Long> threadSaveCount) {
-            this.threadSaveCount = threadSaveCount;
-            sessionSaveCount = getThreadSaveCount();
-        }
-
-        @Override
-        public boolean needsRefresh(long nanosecondsSinceLastAccess) {
-            // If the threadLocal counter differs from our seen sessionSaveCount so far then
-            // some other session would have done a commit. If that is the case a refresh
would
-            // be required
-            return getThreadSaveCount() != sessionSaveCount;
-        }
-
-        @Override
-        public void refreshed() {
-            // Avoid further refreshing if refreshed already
-            sessionSaveCount = getThreadSaveCount();
-        }
-
-        @Override
-        public void saved() {
-            // Force refreshing on access through other sessions on the same thread
-            threadSaveCount.set(sessionSaveCount = (getThreadSaveCount() + 1));
-        }
-
-        private long getThreadSaveCount() {
-            Long c = threadSaveCount.get();
-            return c == null ? 0 : c;
-        }
-
-        @Override
-        public void accept(Visitor visitor) {
-            visitor.visit(this);
-        }
-    }
 }



Mime
View raw message