incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cziege...@apache.org
Subject svn commit: r581153 - in /incubator/sling/trunk/event/src/main: java/org/apache/sling/event/impl/ resources/SLING-INF/nodetypes/
Date Tue, 02 Oct 2007 08:07:54 GMT
Author: cziegeler
Date: Tue Oct  2 01:07:54 2007
New Revision: 581153

URL: http://svn.apache.org/viewvc?rev=581153&view=rev
Log:
First implementation of persisting timed events for clustered applications.

Modified:
    incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/EventHelper.java
    incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
    incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
    incubator/sling/trunk/event/src/main/resources/SLING-INF/nodetypes/event.cnd

Modified: incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/EventHelper.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/EventHelper.java?rev=581153&r1=581152&r2=581153&view=diff
==============================================================================
--- incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/EventHelper.java
(original)
+++ incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/EventHelper.java
Tue Oct  2 01:07:54 2007
@@ -37,4 +37,6 @@
     public static final String EVENT_NODE_TYPE = "event:Event";
     public static final String JOBS_NODE_TYPE = "event:Jobs";
     public static final String JOB_NODE_TYPE = "event:Job";
+    public static final String TIMED_EVENTS_NODE_TYPE = "event:TimedEvents";
+    public static final String TIMED_EVENT_NODE_TYPE = "event:TimedEvent";
 }

Modified: incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=581153&r1=581152&r2=581153&view=diff
==============================================================================
--- incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++ incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Tue Oct  2 01:07:54 2007
@@ -462,7 +462,7 @@
     }
 
     /**
-     * Search for all a node with the corresponding topic and unique key.
+     * Search for a node with the corresponding topic and unique key.
      * @param topic
      * @param key
      * @return The node or null.

Modified: incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java?rev=581153&r1=581152&r2=581153&view=diff
==============================================================================
--- incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
(original)
+++ incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
Tue Oct  2 01:07:54 2007
@@ -24,15 +24,18 @@
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
 import javax.jcr.Node;
 import javax.jcr.NodeIterator;
 import javax.jcr.RepositoryException;
 import javax.jcr.lock.Lock;
+import javax.jcr.lock.LockException;
 import javax.jcr.observation.EventIterator;
 import javax.jcr.query.Query;
 import javax.jcr.query.QueryManager;
 
+import org.apache.jackrabbit.util.Locked;
 import org.apache.sling.event.EventUtil;
 import org.apache.sling.scheduler.Job;
 import org.apache.sling.scheduler.JobContext;
@@ -101,103 +104,180 @@
                 this.ignoreException(e);
             }
             if ( info != null && this.running ) {
+                ScheduleInfo scheduleInfo = null;
                 try {
-                    if ( info.nodePath == null ) {
-                        this.processEvent(info.event);
-                    } else {
-                        this.session.refresh(true);
-                        final Node eventNode = (Node) this.session.getItem(info.nodePath);
-                        if ( !eventNode.isLocked() ) {
-                            // lock node
-                            Lock lock = null;
-                            try {
-                                lock = eventNode.lock(false, true);
-                            } catch (RepositoryException re) {
-                                // lock failed which means that the node is locked by someone
else, so we don't have to requeue
+                    scheduleInfo = new ScheduleInfo(info.event);
+                } catch (IllegalArgumentException iae) {
+                    this.logger.error(iae.getMessage());
+                }
+                if ( scheduleInfo != null ) {
+                    try {
+                        // if the node path is null, this is a new event
+                        if ( info.nodePath == null ) {
+                            // write event and update path
+                            // if something went wrong we get the node path and reschedule
+                            info.nodePath = this.persistEvent(info.event, scheduleInfo);
+                            if ( info.nodePath != null ) {
+                                try {
+                                    this.queue.put(info);
+                                } catch (InterruptedException e) {
+                                    // this should never happen, so we ignore it
+                                    this.ignoreException(e);
+                                }
                             }
-                            if ( lock != null ) {
-                                this.processEvent(info.event);
+                        } else {
+                            this.session.refresh(true);
+                            final Node eventNode = (Node) this.session.getItem(info.nodePath);
+                            if ( !eventNode.isLocked() ) {
+                                // lock node
+                                Lock lock = null;
+                                try {
+                                    lock = eventNode.lock(false, true);
+                                } catch (RepositoryException re) {
+                                    // lock failed which means that the node is locked by
someone else, so we don't have to requeue
+                                }
+                                if ( lock != null ) {
+                                    // if something went wrong, we reschedule
+                                    if ( !this.processEvent(info.event, scheduleInfo) ) {
+                                        try {
+                                            this.queue.put(info);
+                                        } catch (InterruptedException e) {
+                                            // this should never happen, so we ignore it
+                                            this.ignoreException(e);
+                                        }
+                                    }
+                                }
                             }
                         }
+                    } catch (RepositoryException e) {
+                        // ignore
+                        this.ignoreException(e);
                     }
-                } catch (RepositoryException e) {
-                    // ignore
-                    this.ignoreException(e);
                 }
             }
         }
     }
 
-    protected boolean processEvent(Event event) {
-        if ( this.scheduler != null ) {
-            final Map<String, Serializable> config = new HashMap<String, Serializable>();
-            try {
-                // if the event contains a timed event id or a job id we'll use that as the
name
-                String jobName = (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_ID);
-                if ( jobName == null && event.getProperty(EventUtil.PROPERTY_JOB_ID)
!= null ) {
-                    jobName = "Timed job " + event.getProperty(EventUtil.PROPERTY_JOB_ID);
-                }
-
-                // let's see if a schedule information is specified or if the job should
be stopped
-                final String expression = (String) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_SCHEDULE);
-                final Long period = (Long) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_PERIOD);
-                final Date date = (Date) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_DATE);
-                int count = 0;
-                if ( expression != null) {
-                    count++;
+    protected String persistEvent(final Event event, final ScheduleInfo scheduleInfo) {
+        try {
+            final Node parentNode = (Node)this.session.getItem(this.repositoryPath);
+            Lock lock = (Lock) new Locked() {
+
+                protected Object run(Node node) throws RepositoryException {
+                    final String jobId = scheduleInfo.getJobId();
+                    // if there is a node, we know that there is exactly one node
+                    final Node foundNode = queryJob(jobId);
+                    if ( scheduleInfo.isStopEvent() ) {
+                        // if this is a stop event, we should remove the node from the repository
+                        // if there is no node someone else was faster and we can ignore
this
+                        if ( foundNode != null ) {
+                            try {
+                                foundNode.remove();
+                                parentNode.save();
+                            } catch (LockException le) {
+                                // if someone else has the lock this is fine
+                            }
+                        }
+                        // stop the scheduler
+                        processEvent(event, scheduleInfo);
+                        return null;
+                    }
+                    boolean writeAndSend = false;
+                    // if node is not present, we'll write it, lock it and schedule the event
+                    if ( foundNode == null ) {
+                        writeAndSend = true;
+                    } else {
+                        // node is already in repository, let's check the application id
+                        if ( foundNode.getProperty(EventHelper.NODE_PROPERTY_APPLICATION).getString().equals(applicationId)
) {
+                            // delete old node (deleting is easier than updating...)
+                            foundNode.remove();
+                            parentNode.save();
+                            writeAndSend = true;
+                        }
+                    }
+                    if ( writeAndSend ) {
+                        final Node eventNode = writeEvent(event);
+                        return eventNode.lock(false, true);
+                    }
+                    return null;
                 }
-                if ( period != null ) {
-                    count++;
+            }.with(parentNode, false);
+            if ( lock != null ) {
+                // if something went wrong, we reschedule
+                if ( !this.processEvent(event, scheduleInfo) ) {
+                    final String path = lock.getNode().getPath();
+                    lock.getNode().unlock();
+                    return path;
                 }
-                if ( date != null ) {
-                    count++;
+            }
+        } catch (RepositoryException re ) {
+            // something went wrong, so let's log it
+            this.logger.error("Exception during writing new job to repository.", re);
+        } catch (InterruptedException e) {
+            // This should never happen from the lock, so we ignore it
+            this.ignoreException(e);
+        }
+        return null;
+    }
+
+    /**
+     * Process the event.
+     * If a scheduler is available, a job is scheduled or stopped.
+     * @param event The incomming event.
+     * @return
+     */
+    protected boolean processEvent(final Event event, final ScheduleInfo scheduleInfo) {
+        if ( this.scheduler != null ) {
+            // is this a stop event?
+            if ( scheduleInfo.isStopEvent() ) {
+                if ( this.logger.isDebugEnabled() ) {
+                    this.logger.debug("Stopping timed event " + event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC)
+ "(" + scheduleInfo.getJobId() + ")");
                 }
-                if ( count > 1 ) {
-                    this.logger.error("Only one configuration property from " + EventUtil.PROPERTY_TIMED_EVENT_SCHEDULE
+
-                                      ", " + EventUtil.PROPERTY_TIMED_EVENT_PERIOD +
-                                      ", or " + EventUtil.PROPERTY_TIMED_EVENT_DATE + " should
be used.");
-                    return true;
+                try {
+                    this.scheduler.removeJob(scheduleInfo.getJobId());
+                } catch (NoSuchElementException nsee) {
+                    // this can happen if the job is scheduled on another node
+                    // so we can just ignore this
+                }
+                return true;
+            }
+            // we ignore remote job events
+            if ( EventUtil.isLocal(event) ) {
+                return true;
+            }
+
+            // Create configuration for scheduled job
+            final Map<String, Serializable> config = new HashMap<String, Serializable>();
+            // copy properties
+            final Hashtable<String, Object> properties = new Hashtable<String, Object>();
+            config.put("topic", (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC));
+            final String[] names = event.getPropertyNames();
+            if ( names != null ) {
+                for(int i=0; i<names.length; i++) {
+                    properties.put(names[i], event.getProperty(names[i]));
                 }
-                if ( count == 0 ) {
-                    // let's stop the event
+            }
+            config.put("config", properties);
+
+            try {
+                if ( scheduleInfo.expression != null ) {
                     if ( this.logger.isDebugEnabled() ) {
-                        this.logger.debug("Stopping timed event " + config.get("topic") +
"(" + jobName + ")");
+                        this.logger.debug("Adding timed event " + config.get("topic") + "("
+ scheduleInfo.getJobId() + ")" + " with cron expression " + scheduleInfo.expression);
                     }
-                    if ( jobName == null ) {
-                        this.logger.error("Unable to stop timed event without proper job
name.");
-                    } else {
-                        this.scheduler.removeJob(jobName);
+                    this.scheduler.addJob(scheduleInfo.getJobId(), this, config, scheduleInfo.expression,
false);
+                } else if ( scheduleInfo.period != null ) {
+                    if ( this.logger.isDebugEnabled() ) {
+                        this.logger.debug("Adding timed event " + config.get("topic") + "("
+ scheduleInfo.getJobId() + ")" + " with period " + scheduleInfo.period);
                     }
-                    return true;
+                    this.scheduler.addPeriodicJob(scheduleInfo.getJobId(), this, config,
scheduleInfo.period, false);
                 } else {
-                    // copy properties
-                    final Hashtable properties = new Hashtable();
-                    config.put("topic", (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC));
-                    final String[] names = event.getPropertyNames();
-                    if ( names != null ) {
-                        for(int i=0; i<names.length; i++) {
-                            properties.put(names[i], event.getProperty(names[i]));
-                        }
-                    }
-                    config.put("config", properties);
-                    if ( expression != null ) {
-                        if ( this.logger.isDebugEnabled() ) {
-                            this.logger.debug("Adding timed event " + config.get("topic")
+ "(" + jobName + ")" + " with cron expression " + expression);
-                        }
-                        this.scheduler.addJob(jobName, this, config, expression, false);
-                    } else if ( period != null ) {
-                        if ( this.logger.isDebugEnabled() ) {
-                            this.logger.debug("Adding timed event " + config.get("topic")
+ "(" + jobName + ")" + " with period " + period);
-                        }
-                        this.scheduler.addPeriodicJob(jobName, this, config, period, false);
-                    } else {
-                        // then it must be date
-                        if ( this.logger.isDebugEnabled() ) {
-                            this.logger.debug("Adding timed event " + config.get("topic")
+ "(" + jobName + ")" + " with date " + date);
-                        }
-                        this.scheduler.fireJobAt(jobName, this, config, date);
+                    // then it must be date
+                    if ( this.logger.isDebugEnabled() ) {
+                        this.logger.debug("Adding timed event " + config.get("topic") + "("
+ scheduleInfo.getJobId() + ")" + " with date " + scheduleInfo.date);
                     }
+                    this.scheduler.fireJobAt(scheduleInfo.getJobId(), this, config, scheduleInfo.date);
                 }
+                return true;
             } catch (Exception e) {
                 this.ignoreException(e);
             }
@@ -274,4 +354,93 @@
         }
     }
 
+    /**
+     * Search for a node with the corresponding topic and unique key.
+     * @param topic
+     * @param key
+     * @return The node or null.
+     * @throws RepositoryException
+     */
+    protected Node queryJob(String jobId) throws RepositoryException {
+        final QueryManager qManager = this.session.getWorkspace().getQueryManager();
+        final StringBuffer buffer = new StringBuffer("/jcr:root");
+        buffer.append(this.repositoryPath);
+        buffer.append("//element(*, ");
+        buffer.append(this.getEventNodeType());
+        buffer.append(") [");
+        buffer.append(EventHelper.NODE_PROPERTY_JOBID);
+        buffer.append(" = '");
+        buffer.append(jobId);
+        buffer.append("']");
+        final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
+        final NodeIterator result = q.execute().getNodes();
+        Node foundNode = null;
+        if ( result.hasNext() ) {
+            foundNode = result.nextNode();
+        }
+        return foundNode;
+    }
+
+    /**
+     * @see org.apache.sling.core.event.impl.JobPersistenceHandler#getContainerNodeType()
+     */
+    protected String getContainerNodeType() {
+        return EventHelper.TIMED_EVENTS_NODE_TYPE;
+    }
+
+    /**
+     * @see org.apache.sling.core.event.impl.JobPersistenceHandler#getEventNodeType()
+     */
+    protected String getEventNodeType() {
+        return EventHelper.TIMED_EVENT_NODE_TYPE;
+    }
+
+    protected static final class ScheduleInfo {
+
+        public final String expression;
+        public final Long   period;
+        public final Date   date;
+        public final String jobId;
+
+        public ScheduleInfo(final Event event)
+        throws IllegalArgumentException {
+            // let's see if a schedule information is specified or if the job should be stopped
+            this.expression = (String) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_SCHEDULE);
+            this.period = (Long) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_PERIOD);
+            this.date = (Date) event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_DATE);
+            int count = 0;
+            if ( this.expression != null) {
+                count++;
+            }
+            if ( this.period != null ) {
+                count++;
+            }
+            if ( this.date != null ) {
+                count++;
+            }
+            if ( count > 0 ) {
+                throw new IllegalArgumentException("Only one configuration property from
" + EventUtil.PROPERTY_TIMED_EVENT_SCHEDULE +
+                                      ", " + EventUtil.PROPERTY_TIMED_EVENT_PERIOD +
+                                      ", or " + EventUtil.PROPERTY_TIMED_EVENT_DATE + " should
be used.");
+            }
+            // we create a job id consisting of the real event topic and an (optional) id
+            // if the event contains a timed event id or a job id we'll append that to the
name
+            String topic = (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_TOPIC);
+            if ( topic == null ) {
+                throw new IllegalArgumentException("Timed event does not contain required
property " + EventUtil.PROPERTY_TIMED_EVENT_TOPIC);
+            }
+            String id = (String)event.getProperty(EventUtil.PROPERTY_TIMED_EVENT_ID);
+            String jId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
+
+            this.jobId = "TimedEvent: " + topic + ':' + (id != null ? id : "") + ':' + (jId
!= null ? jId : "");
+        }
+
+        public boolean isStopEvent() {
+            return this.expression == null && this.period == null && this.date
== null;
+        }
+
+        public String getJobId() {
+            return this.jobId;
+        }
+    }
 }

Modified: incubator/sling/trunk/event/src/main/resources/SLING-INF/nodetypes/event.cnd
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/event/src/main/resources/SLING-INF/nodetypes/event.cnd?rev=581153&r1=581152&r2=581153&view=diff
==============================================================================
--- incubator/sling/trunk/event/src/main/resources/SLING-INF/nodetypes/event.cnd (original)
+++ incubator/sling/trunk/event/src/main/resources/SLING-INF/nodetypes/event.cnd Tue Oct 
2 01:07:54 2007
@@ -21,7 +21,7 @@
 <nt='http://www.jcp.org/jcr/nt/1.0'>
 <mix='http://www.jcp.org/jcr/mix/1.0'>
 
-[event:Events] > nt:base
+[event:Events] > nt:base, mix:lockable
   + * (event:Event)
 
 [event:Event] > nt:base
@@ -38,5 +38,12 @@
   - event:active (boolean)
   - event:id (string)
   - event:finished (date)
+
+[event:TimedEvents] > event:Events, mix:lockable
+ + * (event:TimedEvent)
+ 
+[event:TimedEvent] > event:Event
+  - event:processor (string)
+  - event:id (string)
 
   



Mime
View raw message