sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cziege...@apache.org
Subject svn commit: r627381 - in /incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl: JobEventHandler.java TimedEventHandler.java
Date Wed, 13 Feb 2008 12:28:11 GMT
Author: cziegeler
Date: Wed Feb 13 04:28:05 2008
New Revision: 627381

URL: http://svn.apache.org/viewvc?rev=627381&view=rev
Log:
SLING-177: Handle lazy loading of jobs and events if required bundles are not started yet.

Modified:
    incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
    incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java

Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=627381&r1=627380&r2=627381&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Wed Feb 13 04:28:05 2008
@@ -42,6 +42,7 @@
 import org.apache.jackrabbit.JcrConstants;
 import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.JobStatusProvider;
+import org.osgi.framework.BundleEvent;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
@@ -52,13 +53,18 @@
  *
  * @scr.component
  * @scr.service interface="org.apache.sling.event.JobStatusProvider"
- * @scr.property name="event.topics" valueRef="EventUtil.TOPIC_JOB"
+ * @scr.property name="event.topics" refValues="EventUtil.TOPIC_JOB"
+ *               values.updated="org/osgi/framework/BundleEvent/UPDATED"
+ *               values.started="org/osgi/framework/BundleEvent/STARTED"
  * @scr.property name="repository.path" value="/sling/jobs"
  */
 public class JobEventHandler
     extends AbstractRepositoryEventHandler
     implements EventUtil.JobStatusNotifier, JobStatusProvider {
 
+    /** The topic prefix for bundle events. */
+    protected static final String BUNDLE_EVENT_PREFIX = BundleEvent.class.getName().replace('.',
'/') + '/';
+
     /** A map for keeping track of currently processed job topics. */
     protected final Map<String, Boolean> processingMap = new HashMap<String, Boolean>();
 
@@ -87,7 +93,7 @@
         if ( context.getProperties().get(CONFIG_PROPERTY_SLEEP_TIME) != null ) {
             this.sleepTime = (Long)context.getProperties().get(CONFIG_PROPERTY_SLEEP_TIME)
* 1000;
         } else {
-            this.sleepTime = DEFAULT_SLEEP_TIME;
+            this.sleepTime = DEFAULT_SLEEP_TIME * 1000;
         }
         this.backgroundSession = this.createSession();
         super.activate(context);
@@ -334,21 +340,84 @@
     public void handleEvent(final Event event) {
         // we ignore remote job events
         if ( EventUtil.isLocal(event) ) {
-            final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
-
-            //  job topic must be set, otherwise we ignore this event!
-            if ( jobTopic != null ) {
-                // queue the event in order to respond quickly
-                final EventInfo info = new EventInfo();
-                info.event = event;
-                try {
-                    this.writeQueue.put(info);
-                } catch (InterruptedException e) {
-                    // this should never happen
-                    this.ignoreException(e);
+            // check for bundle event
+            if ( event.getTopic().equals(EventUtil.TOPIC_JOB)) {
+                // job event
+                final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+
+                //  job topic must be set, otherwise we ignore this event!
+                if ( jobTopic != null ) {
+                    // queue the event in order to respond quickly
+                    final EventInfo info = new EventInfo();
+                    info.event = event;
+                    try {
+                        this.writeQueue.put(info);
+                    } catch (InterruptedException e) {
+                        // this should never happen
+                        this.ignoreException(e);
+                    }
+                } else {
+                    this.logger.warn("Event does not contain job topic: {}", event);
                 }
+
             } else {
-                this.logger.warn("Event does not contain job topic: {}", event);
+                // bundle event started or updated
+                boolean doIt = false;
+                synchronized ( this.unloadedJobs ) {
+                    if ( this.unloadedJobs.size() > 0 ) {
+                        doIt = true;
+                    }
+                }
+                if ( doIt ) {
+                    final Thread t = new Thread() {
+
+                        public void run() {
+                            synchronized (unloadedJobs) {
+                                Session s = null;
+                                final Set<String> newUnloadedJobs = new HashSet<String>();
+                                newUnloadedJobs.addAll(unloadedJobs);
+                                try {
+                                    for(String path : unloadedJobs ) {
+                                        newUnloadedJobs.remove(path);
+                                        try {
+                                            if ( s.itemExists(path) ) {
+                                                final Node eventNode = (Node) s.getItem(path);
+                                                if ( !eventNode.isLocked() ) {
+                                                    try {
+                                                        final EventInfo info = new EventInfo();
+                                                        info.event = readEvent(eventNode);
+                                                        info.nodePath = path;
+                                                        try {
+                                                            queue.put(info);
+                                                        } catch (InterruptedException e)
{
+                                                            // we ignore this exception as
this should never occur
+                                                            ignoreException(e);
+                                                        }
+                                                    } catch (ClassNotFoundException cnfe)
{
+                                                        newUnloadedJobs.add(path);
+                                                        ignoreException(cnfe);
+                                                    }
+                                                }
+                                            }
+                                        } catch (RepositoryException re) {
+                                            // we ignore this and readd
+                                            newUnloadedJobs.add(path);
+                                            ignoreException(re);
+                                        }
+                                    }
+                                } finally {
+                                    if ( s != null ) {
+                                        s.logout();
+                                    }
+                                    unloadedJobs.clear();
+                                    unloadedJobs.addAll(newUnloadedJobs);
+                                }
+                            }
+                        }
+
+                    };
+                    t.start();
+                }
             }
         }
     }
@@ -473,7 +542,9 @@
                                     }
                                 } catch (ClassNotFoundException cnfe) {
                                     // store path for lazy loading
-                                    this.unloadedJobs.add(nodePath);
+                                    synchronized ( this.unloadedJobs ) {
+                                        this.unloadedJobs.add(nodePath);
+                                    }
                                     this.ignoreException(cnfe);
                                 }
                             }
@@ -523,7 +594,9 @@
                         }
                     } catch (ClassNotFoundException cnfe) {
                         // store path for lazy loading
-                        this.unloadedJobs.add(nodePath);
+                        synchronized ( this.unloadedJobs ) {
+                            this.unloadedJobs.add(nodePath);
+                        }
                         this.ignoreException(cnfe);
                     } catch (RepositoryException re) {
                         this.logger.error("Unable to load stored job from " + nodePath, re);

Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java?rev=627381&r1=627380&r2=627381&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
(original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
Wed Feb 13 04:28:05 2008
@@ -52,7 +52,9 @@
  * An event handler for timed events.
  *
  * @scr.component inherit="true"
- * @scr.property name="event.topics" value="org/apache/sling/event/timed"
+ * @scr.property name="event.topics" refValues="EventUtil.TOPIC_TIMED_EVENT"
+ *               values.updated="org/osgi/framework/BundleEvent/UPDATED"
+ *               values.started="org/osgi/framework/BundleEvent/STARTED"
  * @scr.property name="repository.path" value="/sling/timed-events"
  */
 public class TimedEventHandler
@@ -325,7 +327,9 @@
                                 }
                             } catch (ClassNotFoundException cnfe) {
                                 // add it to the unloaded set
-                                this.unloadedEvents.add(nodePath);
+                                synchronized (unloadedEvents) {
+                                    this.unloadedEvents.add(nodePath);
+                                }
                                 this.ignoreException(cnfe);
                             }
                         }
@@ -347,14 +351,74 @@
      * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
      */
     public void handleEvent(Event event) {
-        // queue the event in order to respond quickly
-        final EventInfo info = new EventInfo();
-        info.event = event;
-        try {
-            this.queue.put(info);
-        } catch (InterruptedException e) {
-            // this should never happen
-            this.ignoreException(e);
+        if ( event.getTopic().equals(EventUtil.TOPIC_TIMED_EVENT) ) {
+            // queue the event in order to respond quickly
+            final EventInfo info = new EventInfo();
+            info.event = event;
+            try {
+                this.queue.put(info);
+            } catch (InterruptedException e) {
+                // this should never happen
+                this.ignoreException(e);
+            }
+        } else {
+            // bundle event started or updated
+            boolean doIt = false;
+            synchronized ( this.unloadedEvents ) {
+                if ( this.unloadedEvents.size() > 0 ) {
+                    doIt = true;
+                }
+            }
+            if ( doIt ) {
+                final Thread t = new Thread() {
+
+                    public void run() {
+                        synchronized (unloadedEvents) {
+                            Session s = null;
+                            final Set<String> newUnloadedEvents = new HashSet<String>();
+                            newUnloadedEvents.addAll(unloadedEvents);
+                            try {
+                                for(String path : unloadedEvents ) {
+                                    newUnloadedEvents.remove(path);
+                                    try {
+                                        if ( s.itemExists(path) ) {
+                                            final Node eventNode = (Node) s.getItem(path);
+                                            if ( !eventNode.isLocked() ) {
+                                                try {
+                                                    final EventInfo info = new EventInfo();
+                                                    info.event = readEvent(eventNode);
+                                                    info.nodePath = path;
+                                                    try {
+                                                        queue.put(info);
+                                                    } catch (InterruptedException e) {
+                                                        // we ignore this exception as this
should never occur
+                                                        ignoreException(e);
+                                                    }
+                                                } catch (ClassNotFoundException cnfe) {
+                                                    newUnloadedEvents.add(path);
+                                                    ignoreException(cnfe);
+                                                }
+                                            }
+                                        }
+                                    } catch (RepositoryException re) {
+                                        // we ignore this and readd
+                                        newUnloadedEvents.add(path);
+                                        ignoreException(re);
+                                    }
+                                }
+                            } finally {
+                                if ( s != null ) {
+                                    s.logout();
+                                }
+                                unloadedEvents.clear();
+                                unloadedEvents.addAll(newUnloadedEvents);
+                            }
+                        }
+                    }
+
+                };
+                t.start();
+            }
         }
     }
 
@@ -452,7 +516,9 @@
                         }
                     } catch (ClassNotFoundException cnfe) {
                         // add it to the unloaded set
-                        this.unloadedEvents.add(nodePath);
+                        synchronized (unloadedEvents) {
+                            this.unloadedEvents.add(nodePath);
+                        }
                         this.ignoreException(cnfe);
                     } catch (RepositoryException re) {
                         // if reading an event fails, we ignore this



Mime
View raw message