jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chet...@apache.org
Subject svn commit: r1547289 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak: Oak.java plugins/index/AsyncIndexUpdate.java spi/whiteboard/WhiteboardUtils.java
Date Tue, 03 Dec 2013 05:43:03 GMT
Author: chetanm
Date: Tue Dec  3 05:43:03 2013
New Revision: 1547289

URL: http://svn.apache.org/r1547289
Log:
OAK-1246 - Make AsynchIndexUpdate task to run only on a single node in a cluster

Implemented the support by making use of
1. async-status flag along with timeout handling
2. Also restricting the scheduler to run job on only one node. Default scheduler
   does not support it. However say when running in Sling this would be used

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/WhiteboardUtils.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java?rev=1547289&r1=1547288&r2=1547289&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java Tue Dec
 3 05:43:03 2013
@@ -406,7 +406,7 @@ public class Oak {
 
         if (asyncIndexing) {
             Runnable task = new AsyncIndexUpdate("async", store, indexEditors);
-            WhiteboardUtils.scheduleWithFixedDelay(whiteboard, task, 5);
+            WhiteboardUtils.scheduleWithFixedDelay(whiteboard, task, 5, true);
         }
 
         // FIXME: OAK-810 move to proper workspace initialization

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1547289&r1=1547288&r2=1547289&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
Tue Dec  3 05:43:03 2013
@@ -32,6 +32,7 @@ import com.google.common.base.Objects;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.value.Conversions;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.EditorDiff;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
@@ -58,6 +59,12 @@ public class AsyncIndexUpdate implements
     private static final CommitFailedException CONCURRENT_UPDATE = new CommitFailedException(
             "Async", 1, "Concurrent update detected");
 
+    /**
+     * Timeout in minutes after which an async job would be considered as timed out. Another
+     * node in cluster would wait for timeout before taking over a running job
+     */
+    private static final int ASYNC_TIMEOUT = 15;
+
     private final String name;
 
     private final NodeStore store;
@@ -86,6 +93,11 @@ public class AsyncIndexUpdate implements
             return;
         }
 
+        if(isAlreadyRunning(store)){
+            log.debug("Async job found to be already running. Skipping");
+            return;
+        }
+
         preAsyncRun(store);
 
         NodeBuilder builder = store.getRoot().builder();
@@ -151,6 +163,34 @@ public class AsyncIndexUpdate implements
         }
     }
 
+    private boolean isAlreadyRunning(NodeStore store) {
+        NodeState indexState = store.getRoot().getChildNode(IndexConstants.INDEX_DEFINITIONS_NAME);
+
+        //Probably the first run
+        if (!indexState.exists()) {
+            return false;
+        }
+
+        //Check if already running or timed out
+        if ("running".equals(indexState.getString("async-status"))) {
+            PropertyState startTime = indexState.getProperty("async-start");
+            Calendar start = Conversions.convert(startTime.getValue(Type.DATE)).toCalendar();
+            Calendar now = Calendar.getInstance();
+            long delta = now.getTimeInMillis() - start.getTimeInMillis();
+
+            //Check if the job has timed out and we need to take over
+            if (TimeUnit.MILLISECONDS.toMinutes(delta) > ASYNC_TIMEOUT) {
+                log.info("Async job found which stated on {} has timed out in {} minutes.
" +
+                        "This node would take over the job.",
+                        startTime.getValue(Type.DATE), ASYNC_TIMEOUT);
+                return false;
+            }
+            return true;
+        }
+
+        return false;
+    }
+
     private static void preAsyncRunStatus(NodeBuilder builder) {
         builder.getChildNode(IndexConstants.INDEX_DEFINITIONS_NAME)
                 .setProperty("async-status", "running")

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/WhiteboardUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/WhiteboardUtils.java?rev=1547289&r1=1547288&r2=1547289&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/WhiteboardUtils.java
(original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/WhiteboardUtils.java
Tue Dec  3 05:43:03 2013
@@ -29,12 +29,21 @@ public class WhiteboardUtils {
     private static final AtomicLong COUNTER = new AtomicLong();
 
     public static Registration scheduleWithFixedDelay(
-            Whiteboard whiteboard, Runnable runnable, long delay) {
+            Whiteboard whiteboard, Runnable runnable, long delayInSeconds) {
+        return scheduleWithFixedDelay(whiteboard, runnable, delayInSeconds, false);
+    }
+
+    public static Registration scheduleWithFixedDelay(
+            Whiteboard whiteboard, Runnable runnable, long delayInSeconds, boolean runOnSingleClusterNode)
{
+        ImmutableMap.Builder<String,Object> builder = ImmutableMap.<String,Object>builder()
+                .put("scheduler.period", delayInSeconds)
+                .put("scheduler.concurrent", false);
+        if (runOnSingleClusterNode) {
+            //Make use of feature while running in Sling SLING-2979
+            builder.put("scheduler.runOn", "SINGLE");
+        }
         return whiteboard.register(
-                Runnable.class, runnable, ImmutableMap.builder()
-                    .put("scheduler.period", delay)
-                    .put("scheduler.concurrent", false)
-                    .build());
+                Runnable.class, runnable, builder.build());
     }
 
     public static <T> Registration registerMBean(



Mime
View raw message