jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mdue...@apache.org
Subject svn commit: r1519422 - in /jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr: RefreshManager.java RefreshStrategy.java RepositoryImpl.java delegate/SessionDelegate.java
Date Mon, 02 Sep 2013 12:55:56 GMT
Author: mduerig
Date: Mon Sep  2 12:55:55 2013
New Revision: 1519422

URL: http://svn.apache.org/r1519422
Log:
OAK-960 Enable session refresh state coordination between multiple session in single thread
Slice RefreshManager into a composite of RefreshStrategy

Added:
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RefreshStrategy.java
  (with props)
Removed:
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RefreshManager.java
Modified:
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java

Added: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RefreshStrategy.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RefreshStrategy.java?rev=1519422&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RefreshStrategy.java
(added)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RefreshStrategy.java
Mon Sep  2 12:55:55 2013
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.jcr;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import org.apache.jackrabbit.oak.jcr.operation.SessionOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Instances of this class determine whether a session needs to be refreshed base on
+ * the current {@link SessionOperation operation} to be performed and past
+ * {@link #refreshed() refreshes} and {@link #saved() saves}.
+ * <p>
+ * Before an operation is performed a session calls {@link #needsRefresh(SessionOperation)},
+ * to determine whether the session needs to be refreshed first. To maintain a session strategy's
+ * state sessions call {@link #refreshed()} right after each refresh operation and
+ * {@link #saved()} right after each save operation.
+ * <p>
+ * {@code RefreshStrategy} is a composite of zero or more {@code RefreshStrategy} instances,
+ * each of which covers a certain strategy.
+ * @see Default
+ * @see Once
+ * @see Timed
+ * @see LogOnce
+ * @see ThreadSynchronising
+ */
+public class RefreshStrategy {
+    private static final Logger log = LoggerFactory.getLogger(RefreshStrategy.class);
+
+    private final RefreshStrategy[] refreshStrategies;
+
+    /**
+     * Create a new instance consisting of the composite of the passed {@code RefreshStrategy}
+     * instances.
+     * @param refreshStrategies  individual refresh strategies
+     */
+    public RefreshStrategy(RefreshStrategy... refreshStrategies) {
+        this.refreshStrategies = refreshStrategies;
+    }
+
+    /**
+     * Determine whether the session needs to refresh before {@code sessionOperation} is
performed.
+     * <p>
+     * This implementation return {@code false} if either {@code sessionsOperation} is an
refresh
+     * operation or a save operation. Otherwise it returns {@code true} if and only if any
of the
+     * individual refresh strategies passed to the constructor returns {@code true}.
+     * @param sessionOperation  operation about to be performed
+     * @return  {@code true} if and only if the session needs to refresh.
+     */
+    public boolean needsRefresh(SessionOperation<?> sessionOperation) {
+        // Don't refresh if this operation is a refresh operation itself or
+        // a save operation, which does an implicit refresh
+        if (sessionOperation.isRefresh() || sessionOperation.isSave()) {
+            return false;
+        }
+
+        boolean refresh = false;
+        // Don't shortcut here since the individual strategies rely on side effects of this
call
+        for (RefreshStrategy r : refreshStrategies) {
+            refresh |= r.needsRefresh(sessionOperation);
+        }
+        return refresh;
+    }
+
+    /**
+     * Called whenever a session has been refreshed.
+     * <p>
+     * This implementation forwards to the {@code refresh} method of the individual refresh
+     * strategies passed to the constructor.
+     */
+    public void refreshed() {
+        for (RefreshStrategy r : refreshStrategies) {
+            r.refreshed();
+        }
+    }
+
+    /**
+     * Called whenever a session has been saved.
+     * <p>
+     * This implementation forwards to the {@code save} method of the individual refresh
+     * strategies passed to the constructor.
+     */
+    public void saved() {
+        for (RefreshStrategy r : refreshStrategies) {
+            r.saved();
+        }
+    }
+
+    /**
+     * Accept the passed visitor.
+     * <p>
+     * This implementation forwards to the {@code accept} method of the individual refresh
+     * strategies passed to the constructor.
+     */
+    public void accept(Visitor visitor) {
+        for (RefreshStrategy r: refreshStrategies) {
+            r.accept(visitor);
+        }
+    }
+
+    /**
+     * Visitor for traversing the composite.
+     */
+    public static class Visitor {
+        public void visit(Default strategy) {}
+        public void visit(Once strategy) {}
+        public void visit(Timed strategy) {}
+        public void visit(LogOnce strategy) {}
+        public void visit(ThreadSynchronising strategy) {}
+    }
+
+    /**
+     * This refresh strategy does wither always or never refresh depending of the value of
the
+     * {@code refresh} argument passed to its constructor.
+     * <p>
+     */
+    public static class Default extends RefreshStrategy {
+
+        /** A refresh strategy that always refreshed */
+        public static RefreshStrategy ALWAYS = new Default(true);
+
+        /** A refresh strategy that never refreshed */
+        public static RefreshStrategy NEVER = new Default(false);
+
+        /** Value returned from {@code needsRefresh} */
+        protected boolean refresh;
+
+        /**
+         * @param refresh  value returned from {@code needsRefresh}
+         */
+        public Default(boolean refresh) {
+            this.refresh = refresh;
+        }
+
+        /**
+         * @return {@link #refresh}
+         */
+        @Override
+        public boolean needsRefresh(SessionOperation<?> sessionOperation) {
+            return refresh;
+        }
+
+        @Override
+        public void refreshed() {
+        }
+
+        @Override
+        public void saved() {
+        }
+
+        @Override
+        public void accept(Visitor visitor) {
+            visitor.visit(this);
+        }
+    }
+
+    /**
+     * This refresh strategy refreshed exactly once when enabled. Calling
+     * {@link #reset()} enables the strategy.
+     */
+    public static class Once extends Default {
+
+        /** Visitor for resetting this refresh strategy */
+        public static final Visitor RESETTING_VISITOR = new Visitor() {
+            @Override
+            public void visit(Once strategy) {
+                strategy.reset();
+            }
+        };
+
+        /**
+         * @param enabled  whether this refresh strategy is initially enabled
+         */
+        public Once(boolean enabled) {
+            super(enabled);
+        }
+
+        /**
+         * Enable this refresh strategy
+         */
+        public void reset() {
+            refresh = true;
+        }
+
+        @Override
+        public void refreshed() {
+            refresh = false;
+        }
+
+        @Override
+        public void saved() {
+            refresh = false;
+        }
+
+        @Override
+        public void accept(Visitor visitor) {
+            visitor.visit(this);
+        }
+    }
+
+    /**
+     * This refresh strategy refreshes after a given timeout of inactivity.
+     */
+    public static class Timed extends RefreshStrategy {
+        private final long interval;
+        private long lastAccessed = System.currentTimeMillis();
+
+        /**
+         * @param interval  Interval in seconds after which a session should refresh if there
was no
+         *                  activity.
+         */
+        public Timed(long interval) {
+            this.interval = MILLISECONDS.convert(interval, SECONDS);
+        }
+
+        /**
+         * Called whenever {@code needsRefresh} determines that the time out interval was
exceeded.
+         * This default implementation always returns {@code true}. Descendants may override
this
+         * method to provide more refined behaviour.
+         * @param timeElapsed  the time that elapsed since the session was last accessed.
+         * @return {@code true}
+         */
+        protected boolean timeOut(long timeElapsed) {
+            return true;
+        }
+
+        @Override
+        public boolean needsRefresh(SessionOperation<?> sessionOperation) {
+            long now = System.currentTimeMillis();
+            long timeElapsed = now - lastAccessed;
+            lastAccessed = now;
+            return timeElapsed > interval && timeOut(timeElapsed);
+        }
+
+        @Override
+        public void refreshed() {
+            lastAccessed = System.currentTimeMillis();
+        }
+
+        @Override
+        public void saved() {
+            lastAccessed = System.currentTimeMillis();
+        }
+
+        @Override
+        public void accept(Visitor visitor) {
+            visitor.visit(this);
+        }
+    }
+
+    /**
+     * This refresh strategy never refreshed the session but logs a warning if a session
has been
+     * idle for more than a given time.
+     *
+     * TODO replace logging with JMX monitoring. See OAK-941
+     */
+    public static class LogOnce extends Timed {
+        private final Exception initStackTrace = new Exception("The session was created here:");
+
+        private boolean warnIfIdle = true;
+
+        /**
+         * @param interval  Interval in seconds after which a warning is logged if there
was no
+         *                  activity.
+         */
+        public LogOnce(long interval) {
+            super(interval);
+        }
+
+        /**
+         * Log once
+         * @param timeElapsed  the time that elapsed since the session was last accessed.
+         * @return  {@code false}
+         */
+        @Override
+        protected boolean timeOut(long timeElapsed) {
+            if (warnIfIdle) {
+                log.warn("This session has been idle for " + MINUTES.convert(
+                        timeElapsed, MILLISECONDS) + " minutes and might be out of date.
" +
+                        "Consider using a fresh session or explicitly refresh the session.",
+                        initStackTrace);
+                warnIfIdle = false;
+            }
+            return false;
+        }
+
+        @Override
+        public void accept(Visitor visitor) {
+            visitor.visit(this);
+        }
+    }
+
+    /**
+     * This refresh strategy synchronises session states across accesses within the same
thread.
+     */
+    public static class ThreadSynchronising extends RefreshStrategy {
+        /**
+         * ThreadLocal instance to keep track of the save operations performed in the thread
so far
+         * This is is then used to determine if the current session needs to be refreshed
to see the
+         * changes done by another session in current thread.
+         * <p>
+         * <b>Note</b> - This thread local is never cleared. However, we only
store
+         * java.lang.Integer and do not derive from ThreadLocal such that (class loader)
+         * leaks typically associated with thread locals do not occur.
+         */
+        private final ThreadLocal<Long> threadSaveCount;
+
+        private long sessionSaveCount;
+
+        /**
+         * @param threadSaveCount  thread local for tracking thread local state.
+         */
+        public ThreadSynchronising(ThreadLocal<Long> threadSaveCount) {
+            this.threadSaveCount = threadSaveCount;
+            sessionSaveCount = getThreadSaveCount();
+        }
+
+        @Override
+        public boolean needsRefresh(SessionOperation<?> sessionOperation) {
+            // If the threadLocal counter differs from our seen sessionSaveCount so far then
+            // some other session would have done a commit. If that is the case a refresh
would
+            // be required
+            return getThreadSaveCount() != sessionSaveCount;
+        }
+
+        @Override
+        public void refreshed() {
+            // Avoid further refreshing if refreshed already
+            sessionSaveCount = getThreadSaveCount();
+        }
+
+        @Override
+        public void saved() {
+            // Force refreshing on access through other sessions on the same thread
+            threadSaveCount.set(sessionSaveCount = (getThreadSaveCount() + 1));
+        }
+
+        private long getThreadSaveCount() {
+            Long c = threadSaveCount.get();
+            return c == null ? 0 : c;
+        }
+
+        @Override
+        public void accept(Visitor visitor) {
+            visitor.visit(this);
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RefreshStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RefreshStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java?rev=1519422&r1=1519421&r2=1519422&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java
(original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java
Mon Sep  2 12:55:55 2013
@@ -17,8 +17,6 @@
 package org.apache.jackrabbit.oak.jcr;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
 
 import java.util.Collections;
 import java.util.Map;
@@ -39,6 +37,10 @@ import org.apache.jackrabbit.api.securit
 import org.apache.jackrabbit.commons.SimpleValueFactory;
 import org.apache.jackrabbit.oak.api.ContentRepository;
 import org.apache.jackrabbit.oak.api.ContentSession;
+import org.apache.jackrabbit.oak.jcr.RefreshStrategy.LogOnce;
+import org.apache.jackrabbit.oak.jcr.RefreshStrategy.Once;
+import org.apache.jackrabbit.oak.jcr.RefreshStrategy.ThreadSynchronising;
+import org.apache.jackrabbit.oak.jcr.RefreshStrategy.Timed;
 import org.apache.jackrabbit.oak.jcr.delegate.SessionDelegate;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
@@ -59,21 +61,15 @@ public class RepositoryImpl implements J
      * Name of the session attribute value determining the session refresh
      * interval in seconds.
      *
-     * @see RefreshManager
+     * @see RefreshStrategy
      */
     public static final String REFRESH_INTERVAL = "oak.refresh-interval";
 
-    /**
-     * Default value for {@link #REFRESH_INTERVAL}.
-     */
-    private static final long DEFAULT_REFRESH_INTERVAL = Long.getLong(
-            "default-refresh-interval", Long.MAX_VALUE);
-
     private final Descriptors descriptors = new Descriptors(new SimpleValueFactory());
     private final ContentRepository contentRepository;
     protected final Whiteboard whiteboard;
     private final SecurityProvider securityProvider;
-    private final ThreadLocal<Integer> threadSafeCount;
+    private final ThreadLocal<Long> threadSaveCount;
 
     public RepositoryImpl(@Nonnull ContentRepository contentRepository,
                           @Nonnull Whiteboard whiteboard,
@@ -81,7 +77,7 @@ public class RepositoryImpl implements J
         this.contentRepository = checkNotNull(contentRepository);
         this.whiteboard = checkNotNull(whiteboard);
         this.securityProvider = checkNotNull(securityProvider);
-        this.threadSafeCount = new ThreadLocal<Integer>();
+        this.threadSaveCount = new ThreadLocal<Long>();
     }
 
     //---------------------------------------------------------< Repository >---
@@ -203,18 +199,13 @@ public class RepositoryImpl implements J
             } else if (attributes.containsKey(REFRESH_INTERVAL)) {
                 throw new RepositoryException("Duplicate attribute '" + REFRESH_INTERVAL
+ "'.");
             }
-            if (refreshInterval == null) {
-                refreshInterval = DEFAULT_REFRESH_INTERVAL;
-            }
 
+            RefreshStrategy refreshStrategy = createRefreshStrategy(refreshInterval);
             ContentSession contentSession = contentRepository.login(credentials, workspaceName);
-            RefreshManager refreshManager = new RefreshManager(
-                    MILLISECONDS.convert(refreshInterval, SECONDS), threadSafeCount);
             SessionDelegate sessionDelegate = new SessionDelegate(
-                    contentSession, refreshManager, securityProvider);
+                    contentSession, refreshStrategy, securityProvider);
             SessionContext context = createSessionContext(
-                    Collections.<String, Object>singletonMap(REFRESH_INTERVAL, refreshInterval),
-                    sessionDelegate);
+                    createAttributes(refreshInterval), sessionDelegate);
             return context.getSession();
         } catch (LoginException e) {
             throw new javax.jcr.LoginException(e.getMessage(), e);
@@ -289,4 +280,39 @@ public class RepositoryImpl implements J
         }
     }
 
+    private static Map<String, Object> createAttributes(Long refreshInterval) {
+        return refreshInterval == null
+                ? Collections.<String, Object>emptyMap()
+                : Collections.<String, Object>singletonMap(REFRESH_INTERVAL, refreshInterval);
+    }
+
+    /**
+     * Auto refresh logic for sessions, which is done to enhance backwards compatibility
with
+     * Jackrabbit 2.
+     * <p>
+     * A sessions is automatically refreshed when
+     * <ul>
+     *     <li>it has not been accessed for the number of seconds specified by the
+     *         {@code refreshInterval} parameter,</li>
+     *     <li>an observation event has been delivered to a listener registered from
within this
+     *         session,</li>
+     *     <li>an updated occurred through a different session from <em>within
the same
+     *         thread.</em></li>
+     * </ul>
+     * In addition a warning is logged once per session if the session is accessed after
one
+     * minute of inactivity.
+     */
+    private RefreshStrategy createRefreshStrategy(Long refreshInterval) {
+        return new RefreshStrategy(refreshInterval == null
+                ? new RefreshStrategy[] {
+                new Once(false),
+                new LogOnce(60),
+                new ThreadSynchronising(threadSaveCount)}
+                : new RefreshStrategy[] {
+                new Once(false),
+                new Timed(refreshInterval),
+                new LogOnce(60),
+                new ThreadSynchronising(threadSaveCount)});
+    }
+
 }
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java?rev=1519422&r1=1519421&r2=1519422&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
(original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
Mon Sep  2 12:55:55 2013
@@ -35,7 +35,7 @@ import org.apache.jackrabbit.oak.api.Roo
 import org.apache.jackrabbit.oak.api.Tree;
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.core.IdentifierManager;
-import org.apache.jackrabbit.oak.jcr.RefreshManager;
+import org.apache.jackrabbit.oak.jcr.RefreshStrategy;
 import org.apache.jackrabbit.oak.jcr.operation.SessionOperation;
 import org.apache.jackrabbit.oak.jcr.security.AccessManager;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
@@ -52,7 +52,7 @@ public class SessionDelegate {
     static final Logger log = LoggerFactory.getLogger(SessionDelegate.class);
 
     private final ContentSession contentSession;
-    private final RefreshManager refreshManager;
+    private final RefreshStrategy refreshStrategy;
 
     private final Root root;
     private final IdentifierManager idManager;
@@ -71,13 +71,13 @@ public class SessionDelegate {
      * dispatcher in order.
      *
      * @param contentSession  the content session
-     * @param refreshManager  the refresh manager used to handle auto refreshing this session
+     * @param refreshStrategy  the refresh strategy used for auto refreshing this session
      * @param securityProvider the security provider
      */
-    public SessionDelegate(@Nonnull ContentSession contentSession, RefreshManager refreshManager,
+    public SessionDelegate(@Nonnull ContentSession contentSession, RefreshStrategy refreshStrategy,
             SecurityProvider securityProvider) {
         this.contentSession = checkNotNull(contentSession);
-        this.refreshManager = checkNotNull(refreshManager);
+        this.refreshStrategy = checkNotNull(refreshStrategy);
         this.root = contentSession.getLatestRoot();
         this.idManager = new IdentifierManager(root);
         this.permissionProvider = checkNotNull(securityProvider)
@@ -86,7 +86,7 @@ public class SessionDelegate {
     }
 
     public synchronized void refreshAtNextAccess() {
-        refreshManager.refreshAtNextAccess();
+        refreshStrategy.accept(RefreshStrategy.Once.RESETTING_VISITOR);
     }
 
     /**
@@ -106,8 +106,9 @@ public class SessionDelegate {
         // Synchronize to avoid conflicting refreshes from concurrent JCR API calls
         if (sessionOpCount == 0) {
             // Refresh and precondition checks only for non re-entrant session operations
-            if (refreshManager.needsRefresh(sessionOperation)) {
+            if (refreshStrategy.needsRefresh(sessionOperation)) {
                 refresh(true);
+                refreshStrategy.refreshed();
                 updateCount++;
             }
             sessionOperation.checkPreconditions();
@@ -120,6 +121,11 @@ public class SessionDelegate {
             if (sessionOperation.isUpdate()) {
                 updateCount++;
             }
+            if (sessionOperation.isSave()) {
+                refreshStrategy.saved();
+            } else if (sessionOperation.isRefresh()) {
+                refreshStrategy.refreshed();
+            }
         }
     }
 



Mime
View raw message