incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cziege...@apache.org
Subject svn commit: r708132 - in /incubator/sling/trunk/extensions/event/src/main: java/org/apache/sling/event/EventUtil.java java/org/apache/sling/event/impl/JobEventHandler.java resources/OSGI-INF/metatype/metatype.properties
Date Mon, 27 Oct 2008 10:58:41 GMT
Author: cziegeler
Date: Mon Oct 27 03:58:40 2008
New Revision: 708132

URL: http://svn.apache.org/viewvc?rev=708132&view=rev
Log:
SLING-339 Implementing a job acknowledge mechanism: All started jobs are queried after a configured
time, if someone started to process them. If not, they're requeued. If more than one job processor
wants to process a job, only the first one is used to notify the job event handler of success/failure.

Modified:
    incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
    incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
    incubator/sling/trunk/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties

Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=708132&r1=708131&r2=708132&view=diff
==============================================================================
--- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
(original)
+++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
Mon Oct 27 03:58:40 2008
@@ -190,39 +190,50 @@
     public static boolean isJobEvent(Event event) {
         return event.getProperty(PROPERTY_JOB_TOPIC) != null;
     }
+
     /**
-     * Notify a finished job.
+     * Check if this a job event and return the notifier context.
+     * @throws IllegalArgumentException If the event is a job event but does not have a notifier
context.
      */
-    public static void finishedJob(Event job) {
+    private static JobStatusNotifier.NotifierContext getNotifierContext(final Event job)
{
         // check if this is a job event
         if ( !isJobEvent(job) ) {
-            return;
+            return null;
         }
         final JobStatusNotifier.NotifierContext ctx = (NotifierContext) job.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
         if ( ctx == null ) {
-            throw new NullPointerException("JobStatusNotifier context is not available in
event properties.");
+            throw new IllegalArgumentException("JobStatusNotifier context is not available
in event properties.");
+        }
+        return ctx;
+    }
+
+    /**
+     * Notify a finished job.
+     * @throws IllegalArgumentException If the event is a job event but does not have a notifier
context.
+     */
+    public static void finishedJob(Event job) {
+        final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+        if ( ctx != null ) {
+            ctx.notifier.finishedJob(job, ctx.eventNodePath, false);
         }
-        ctx.notifier.finishedJob(job, ctx.eventNodePath, false);
     }
 
     /**
      * Notify a failed job.
      * @return <code>true</code> if the job has been rescheduled, <code>false</code>
otherwise.
+     * @throws IllegalArgumentException If the event is a job event but does not have a notifier
context.
      */
     public static boolean rescheduleJob(Event job) {
-        // check if this is a job event
-        if ( !isJobEvent(job) ) {
-            return false;
+        final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+        if ( ctx != null ) {
+           return ctx.notifier.finishedJob(job, ctx.eventNodePath, true);
         }
-        final JobStatusNotifier.NotifierContext ctx = (NotifierContext) job.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
-        if ( ctx == null ) {
-            throw new NullPointerException("JobStatusNotifier context is not available in
event properties.");
-        }
-        return ctx.notifier.finishedJob(job, ctx.eventNodePath, true);
+        return false;
     }
 
     /**
      * Process a job in the background and notify its success.
+     * This method also sends an acknowledge message to the job event handler.
      */
     public static void processJob(final Event job, final JobProcessor processor) {
         final Runnable task = new Runnable() {
@@ -232,17 +243,31 @@
              */
             public void run() {
                 boolean result = false;
+                boolean notifyResult = true;
                 try {
+                    // first check for a notifier context to send an acknowledge
+                    final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+                    if ( ctx != null ) {
+                        if ( !ctx.notifier.sendAcknowledge(job, ctx.eventNodePath) ) {
+                            // if we don't get an ack, someone else is already processing
this job.
+                            // we process but do not notify the job event handler.
+                            LoggerFactory.getLogger(EventUtil.class).info("Someone else is
already processing job {}.", job);
+                            notifyResult = false;
+                        }
+                    }
+
                     result = processor.process(job);
                 } catch (Throwable t) {
-                    LoggerFactory.getLogger(EventUtil.class).error("Unhandled error occured
in job processor " + t.getMessage(), t);
+                    LoggerFactory.getLogger(EventUtil.class).error("Unhandled error occured
in job processor " + t.getMessage() + " while processing job " + job, t);
                     // we don't reschedule if an exception occurs
                     result = true;
                 } finally {
-                    if ( result ) {
-                        EventUtil.finishedJob(job);
-                    } else {
-                        EventUtil.rescheduleJob(job);
+                    if ( notifyResult ) {
+                        if ( result ) {
+                            EventUtil.finishedJob(job);
+                        } else {
+                            EventUtil.rescheduleJob(job);
+                        }
                     }
                 }
             }
@@ -277,6 +302,15 @@
         }
 
         /**
+         * Send an acknowledge message that someone is processing the job.
+         * @param job The job.
+         * @param eventNodePath The storage node in the repository.
+         * @return <code>true</code> if the ack is ok, <code>false</code>
otherwise (e.g. if
+         *   someone else already send an ack for this job.
+         */
+        boolean sendAcknowledge(Event job, String eventNodePath);
+
+        /**
          * Notify that the job is finished.
          * If the job is not rescheduled, a return value of <code>false</code>
indicates an error
          * during the processing. If the job should be rescheduled, <code>true</code>
indicates

Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=708132&r1=708131&r2=708132&view=diff
==============================================================================
--- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Mon Oct 27 03:58:40 2008
@@ -51,6 +51,7 @@
 import org.apache.sling.event.EventPropertiesMap;
 import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.JobStatusProvider;
+import org.osgi.framework.Constants;
 import org.osgi.service.component.ComponentConstants;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.event.Event;
@@ -70,7 +71,7 @@
  * We schedule this event handler to run in the background and clean up
  * obsolete events.
  * @scr.service interface="java.lang.Runnable"
- * @scr.property name="scheduler.period" value="600" type="Long"
+ * @scr.property name="scheduler.period" value="300" type="Long" label="%jobscheduler.period.name"
description="%jobscheduler.period.description"
  * @scr.property name="scheduler.concurrent" value="false" type="Boolean" private="true"
  */
 public class JobEventHandler
@@ -95,12 +96,21 @@
     /** @scr.property valueRef="DEFAULT_MAX_JOB_RETRIES" */
     private static final String CONFIG_PROPERTY_MAX_JOB_RETRIES = "max.job.retries";
 
+    /** Default number of seconds to wait for an ack. */
+    private static final long DEFAULT_WAIT_FOR_ACK = 90; // by default we wait 90 secs
+
+    /** @scr.property valueRef="DEFAULT_WAIT_FOR_ACK" */
+    private static final String CONFIG_PROPERTY_WAIT_FOR_ACK = "wait.for.ack";
+
     /** We check every 30 secs by default. */
     private long sleepTime;
 
     /** How often should a job be retried by default. */
     private int maxJobRetries;
 
+    /** How long do we wait for an ack (in ms) */
+    private long waitForAckMs;
+
     /** Background session. */
     private Session backgroundSession;
 
@@ -110,10 +120,10 @@
     /** List of deleted jobs. */
     private Set<String>deletedJobs = new HashSet<String>();
 
-    /** Default clean up time is 10 minutes. */
-    private static final int DEFAULT_CLEANUP_PERIOD = 10;
+    /** Default clean up time is 5 minutes. */
+    private static final int DEFAULT_CLEANUP_PERIOD = 5;
 
-    /** @scr.property valueRef="DEFAULT_CLEANUP_PERIOD" type="Integer" */
+    /** @scr.property valueRef="DEFAULT_CLEANUP_PERIOD" type="Integer" label="%jobcleanup.period.name"
description="%jobcleanup.period.description" */
     private static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
 
     /** We remove everything which is older than 5 min by default. */
@@ -125,6 +135,9 @@
     /** Our component context. */
     private ComponentContext componentContext;
 
+    /** The map of events we're currently processing. */
+    private final Map<String, StartedJobInfo> processingEventsList = new HashMap<String,
StartedJobInfo>();
+
     public static ThreadPool JOB_THREAD_POOL;
 
     /**
@@ -139,6 +152,7 @@
         this.cleanupPeriod = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD),
DEFAULT_CLEANUP_PERIOD);
         this.sleepTime = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_SLEEP_TIME), DEFAULT_SLEEP_TIME);
         this.maxJobRetries = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAX_JOB_RETRIES),
DEFAULT_MAX_JOB_RETRIES);
+        this.waitForAckMs = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_WAIT_FOR_ACK), DEFAULT_WAIT_FOR_ACK)
* 1000;
         this.componentContext = context;
         super.activate(context);
         JOB_THREAD_POOL = this.threadPool;
@@ -199,37 +213,63 @@
      * @see java.lang.Runnable#run()
      */
     public void run() {
-        if ( this.cleanupPeriod > 0 && this.running ) {
-            this.logger.debug("Cleaning up repository, removing all finished jobs older than
{} minutes.", this.cleanupPeriod);
-
-            final String queryString = this.getCleanUpQueryString();
-            // we create an own session for concurrency issues
-            Session s = null;
-            try {
-                s = this.createSession();
-                final Node parentNode = (Node)s.getItem(this.repositoryPath);
-                logger.debug("Executing query {}", queryString);
-                final Query q = s.getWorkspace().getQueryManager().createQuery(queryString,
Query.XPATH);
-                final NodeIterator iter = q.execute().getNodes();
-                int count = 0;
-                while ( iter.hasNext() ) {
-                    final Node eventNode = iter.nextNode();
-                    eventNode.remove();
-                    count++;
+        if ( this.running ) {
+            // check for jobs that were started but never got an aknowledge
+            final long tooOld = System.currentTimeMillis() - this.waitForAckMs;
+            // to keep the synchronized block as fast as possible we just store the
+            // jobs to be removed in a new list and process this list afterwards
+            final List<StartedJobInfo> restartJobs = new ArrayList<StartedJobInfo>();
+            synchronized ( this.processingEventsList ) {
+                final Iterator<Map.Entry<String, StartedJobInfo>> i = this.processingEventsList.entrySet().iterator();
+                while ( i.hasNext() ) {
+                    final Map.Entry<String, StartedJobInfo> entry = i.next();
+                    if ( entry.getValue().started <= tooOld ) {
+                        restartJobs.add(entry.getValue());
+                        i.remove();
+                    }
                 }
-                parentNode.save();
-                logger.debug("Removed {} entries from the repository.", count);
+            }
+            final Iterator<StartedJobInfo> jobIter = restartJobs.iterator();
+            while ( jobIter.hasNext() ) {
+                final StartedJobInfo info = jobIter.next();
+                this.logger.info("No acknowledge received for job {} stored at {}. Requeueing
job.", info.event, info.nodePath);
+                this.finishedJob(info.event, info.nodePath, true);
+            }
+
+            // remove obsolete jobs from the repository
+            if ( this.cleanupPeriod > 0 ) {
+                this.logger.debug("Cleaning up repository, removing all finished jobs older
than {} minutes.", this.cleanupPeriod);
+
+                final String queryString = this.getCleanUpQueryString();
+                // we create an own session for concurrency issues
+                Session s = null;
+                try {
+                    s = this.createSession();
+                    final Node parentNode = (Node)s.getItem(this.repositoryPath);
+                    logger.debug("Executing query {}", queryString);
+                    final Query q = s.getWorkspace().getQueryManager().createQuery(queryString,
Query.XPATH);
+                    final NodeIterator iter = q.execute().getNodes();
+                    int count = 0;
+                    while ( iter.hasNext() ) {
+                        final Node eventNode = iter.nextNode();
+                        eventNode.remove();
+                        count++;
+                    }
+                    parentNode.save();
+                    logger.debug("Removed {} entries from the repository.", count);
 
-            } catch (RepositoryException e) {
-                // in the case of an error, we just log this as a warning
-                this.logger.warn("Exception during repository cleanup.", e);
-            } finally {
-                if ( s != null ) {
-                    s.logout();
+                } catch (RepositoryException e) {
+                    // in the case of an error, we just log this as a warning
+                    this.logger.warn("Exception during repository cleanup.", e);
+                } finally {
+                    if ( s != null ) {
+                        s.logout();
+                    }
                 }
             }
         }
     }
+
     /**
      * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
      */
@@ -343,10 +383,10 @@
         if ( this.running ) {
             this.loadJobs();
         } else {
-            logger.info("Deactivating component due to errors.");
-            // deactivate
             final ComponentContext ctx = this.componentContext;
+            // deactivate
             if ( ctx != null ) {
+                logger.info("Deactivating component {} due to errors during startup.", ctx.getProperties().get(Constants.SERVICE_ID));
                 final String name = (String) componentContext.getProperties().get(
                     ComponentConstants.COMPONENT_NAME);
                 ctx.disableComponent(name);
@@ -736,11 +776,18 @@
         final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
         boolean unlock = true;
         try {
-            final Event jobEvent = this.getJobEvent(event, eventNode.getPath());
+            final String nodePath = eventNode.getPath();
+            final Event jobEvent = this.getJobEvent(event, nodePath);
             eventNode.setProperty(EventHelper.NODE_PROPERTY_PROCESSOR, this.applicationId);
             eventNode.save();
             final EventAdmin localEA = this.eventAdmin;
             if ( localEA != null ) {
+                final StartedJobInfo jobInfo = new StartedJobInfo(jobEvent, nodePath, System.currentTimeMillis());
+                // let's add the event to our processing list
+                synchronized ( this.processingEventsList ) {
+                    this.processingEventsList.put(nodePath, jobInfo);
+                }
+
                 // we need async delivery, otherwise we might create a deadlock
                 // as this method runs inside a synchronized block and the finishedJob
                 // method as well!
@@ -931,11 +978,30 @@
     }
 
     /**
+     * @see org.apache.sling.event.EventUtil.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event,
java.lang.String)
+     */
+    public boolean sendAcknowledge(Event job, String eventNodePath) {
+        synchronized ( this.processingEventsList ) {
+            // if the event is still in the processing list, we confirm the ack
+            final Object ack = this.processingEventsList.remove(eventNodePath);
+            return ack != null;
+        }
+
+    }
+
+    /**
      * This is a notification from the component which processed the job.
      *
      * @see org.apache.sling.event.EventUtil.JobStatusNotifier#finishedJob(org.osgi.service.event.Event,
String, boolean)
      */
     public boolean finishedJob(Event job, String eventNodePath, boolean shouldReschedule)
{
+        // let's remove the event from our processing list
+        // this is just a sanity check, as usually the job should have been
+        // removed during sendAcknowledge.
+        synchronized ( this.processingEventsList ) {
+            this.processingEventsList.remove(eventNodePath);
+        }
+
         boolean reschedule = shouldReschedule;
         if ( shouldReschedule ) {
             // check if we exceeded the number of retries
@@ -1311,4 +1377,16 @@
             return lock;
         }
     }
+
+    private static final class StartedJobInfo {
+        public final Event event;
+        public final String nodePath;
+        public final long  started;
+
+        public StartedJobInfo(final Event e, final String path, final long started) {
+            this.event = e;
+            this.nodePath = path;
+            this.started = started;
+        }
+    }
 }

Modified: incubator/sling/trunk/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=708132&r1=708131&r2=708132&view=diff
==============================================================================
--- incubator/sling/trunk/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
(original)
+++ incubator/sling/trunk/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
Mon Oct 27 03:58:40 2008
@@ -17,11 +17,10 @@
 #  under the License.
 #
 
-
 #
 # This file contains localization strings for configuration labels and
 # descriptions as used in the metatype.xml descriptor generated by the
-# the Sling SCR plugin
+# the SCR plugin
 
 #
 # Distribution Event Handler
@@ -36,23 +35,41 @@
 # Job Event Handler
 job.events.name = Job Event Handler 
 job.events.description = Manages job scheduling on a single system as well \
- as a cluster to enable running jobs on a single node only or on all nodes. \
+ as on a cluster. A Job runs only on a single cluster node. \
  The respective scheduling is persisted in the repository and distributed \
- amongst the cluster nodes through Repository Events and locally in the nodes \
- through the OSGi Event Admin.
+ amongst the cluster nodes through repository events. The jobs are started \
+ locally on a single cluster node through the OSGi Event Admin.
 
 sleep.time.name = Retry Interval
 sleep.time.description = The number of milliseconds to sleep between two \
- consecutive retries of a Job which failed and was set to be retried. The \
- default value is 20 seconds. This value is only relevant if there is a single \
+ consecutive retries of a job which failed and was set to be retried. The \
+ default value is 30 seconds. This value is only relevant if there is a single \
  failed job in the queue. If there are multiple failed jobs, each job is \
- retried in turn without and intervening delay.
+ retried in turn without an intervening delay.
+ 
 max.job.retries.name = Maximum Retries
 max.job.retries.description = The maximum number of times a failed job slated \
  for retries is actually retried. If a job has been retried this number of \
  times and still fails, it is not rescheduled and assumed to have failed. The \
  default value is 10.
 
+jobscheduler.period.name = Event Cleanup Internal
+jobscheduler.period.description = Interval in seconds in which jobs older than \
+ a specific age (see Event Cleanup Age) are purged from the repository. \
+ The default value is 5 minutes (300 seconds).
+
+jobcleanup.period.name = Event Cleanup Age
+jobcleanup.period.description = The maximum age in minutes of persisted job to \
+ be purged from the repository during the cleanup run. The default is 5 \
+ minutes. Note that this setting defines the minimum time an event remains \
+ in the repository. 
+
+wait.for.ack.name = Acknowledge Waiting Time
+wait.for.ack.description = If a service is processing a job, it acknowledges this \
+ by sending a message to the Job Event Handler. If the Job Event Handler does not \
+ receive such a message in the configured time, it reschedules the job. The configured \
+ time is in seconds (default is 90 secs).
+ 
 #
 # Shared labels
 scheduler.period.name = Event Cleanup Internal



Mime
View raw message