incubator-aries-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r821564 - /incubator/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintEventDispatcher.java
Date Sun, 04 Oct 2009 17:28:36 GMT
Author: adc
Date: Sun Oct  4 17:28:36 2009
New Revision: 821564

URL: http://svn.apache.org/viewvc?rev=821564&view=rev
Log:
ARIES-12 Event delivery is now synchronous.  New listeners will now not loose events after
replays.  Tardy or ill-behaved listeners are now ignored.

Modified:
    incubator/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintEventDispatcher.java

Modified: incubator/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintEventDispatcher.java
URL: http://svn.apache.org/viewvc/incubator/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintEventDispatcher.java?rev=821564&r1=821563&r2=821564&view=diff
==============================================================================
--- incubator/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintEventDispatcher.java
(original)
+++ incubator/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintEventDispatcher.java
Sun Oct  4 17:28:36 2009
@@ -18,21 +18,26 @@
  */
 package org.apache.aries.blueprint.container;
 
+
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Dictionary;
 import java.util.Hashtable;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
-import org.apache.aries.blueprint.utils.JavaUtils;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceReference;
-import org.osgi.framework.Version;
 import org.osgi.service.blueprint.container.BlueprintEvent;
 import org.osgi.service.blueprint.container.BlueprintListener;
 import org.osgi.service.blueprint.container.EventConstants;
@@ -43,50 +48,68 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.aries.blueprint.utils.JavaUtils;
+
 /**
- * TODO: javadoc
+ * The delivery of {@link BlueprintEvent}s is complicated.  The blueprint extender and its
containers use this class to
+ * deliver {@link BlueprintEvent}s.
  *
  * @version $Rev: 760378 $, $Date: 2009-03-31 11:31:38 +0200 (Tue, 31 Mar 2009) $
  */
-public class BlueprintEventDispatcher implements BlueprintListener {
+class BlueprintEventDispatcher implements BlueprintListener {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(BlueprintEventDispatcher.class);
 
+    private final Set<BlueprintListener> listeners = new CopyOnWriteArraySet<BlueprintListener>();
+    private final Map<Bundle, BlueprintEvent> states = new ConcurrentHashMap<Bundle,
BlueprintEvent>();
+    private final ExecutorService executor = Executors.newSingleThreadExecutor();
     private final EventAdminListener eventAdminListener;
     private final ServiceTracker containerListenerTracker;
-    private final Map<Bundle, BlueprintEvent> states;
-    private final ExecutorService executor;
 
-    public BlueprintEventDispatcher(final BundleContext bundleContext) {
-        this.states = new ConcurrentHashMap<Bundle, BlueprintEvent>();
-        this.executor = Executors.newSingleThreadExecutor();
+    BlueprintEventDispatcher(final BundleContext bundleContext) {
+
+        assert bundleContext != null;
+
         EventAdminListener listener = null;
         try {
             getClass().getClassLoader().loadClass("org.osgi.service.event.EventAdmin");
             listener = new EventAdminListener(bundleContext);
         } catch (Throwable t) {
             // Ignore, if the EventAdmin package is not available, just don't use it
+            LOGGER.debug("EventAdmin package is not available, just don't use it");
         }
         this.eventAdminListener = listener;
+
         this.containerListenerTracker = new ServiceTracker(bundleContext, BlueprintListener.class.getName(),
new ServiceTrackerCustomizer() {
             public Object addingService(ServiceReference reference) {
                 BlueprintListener listener = (BlueprintListener) bundleContext.getService(reference);
-                sendInitialEvents(listener);
+
+                synchronized (listeners) {
+                    sendInitialEvents(listener);
+                    listeners.add(listener);
+                }
+
                 return listener;
             }
+
             public void modifiedService(ServiceReference reference, Object service) {
             }
+
             public void removedService(ServiceReference reference, Object service) {
+                listeners.remove(service);
                 bundleContext.ungetService(reference);
             }
         });
         this.containerListenerTracker.open();
     }
 
-    protected void sendInitialEvents(BlueprintListener listener) {
-        if (states != null) {
-            for (Map.Entry<Bundle, BlueprintEvent> entry : states.entrySet()) {
-                listener.blueprintEvent(new BlueprintEvent(entry.getValue(), true));
+    private void sendInitialEvents(BlueprintListener listener) {
+        for (Map.Entry<Bundle, BlueprintEvent> entry : states.entrySet()) {
+            try {
+                callListener(listener, new BlueprintEvent(entry.getValue(), true));
+            } catch (RejectedExecutionException ree) {
+                LOGGER.warn("Executor shut down", ree);
+                break;
             }
         }
     }
@@ -95,25 +118,34 @@
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Sending blueprint container event {} for bundle {}", toString(event),
event.getBundle().getSymbolicName());
         }
-        states.put(event.getBundle(), event);
-        executor.submit(new Runnable() {
-            public void run() {
-                callListeners(event);
-                if (eventAdminListener != null) {
-                    eventAdminListener.blueprintEvent(event);
-                }
+
+        synchronized (listeners) {
+            callListeners(event);
+            states.put(event.getBundle(), event);
+        }
+
+        if (eventAdminListener != null) {
+            try {
+                executor.submit(new Runnable() {
+                    public void run() {
+                        eventAdminListener.blueprintEvent(event);
+                    }
+                });
+            } catch (RejectedExecutionException ree) {
+                LOGGER.warn("Executor shut down", ree);
             }
-        });
+        }
     }
 
-    private String toString(BlueprintEvent event) {
+    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
+    private static String toString(BlueprintEvent event) {
         return "BlueprintEvent[type=" + getEventType(event.getType())
-                        + (event.getDependencies() != null ? ", dependencies=" + Arrays.asList(event.getDependencies())
: "")
-                        + (event.getCause() != null ? ", exception=" + event.getCause().getMessage()
: "")
-                        + "]";
+                + (event.getDependencies() != null ? ", dependencies=" + Arrays.asList(event.getDependencies())
: "")
+                + (event.getCause() != null ? ", exception=" + event.getCause().getMessage()
: "")
+                + "]";
     }
 
-    private String getEventType(int type) {
+    private static String getEventType(int type) {
         switch (type) {
             case BlueprintEvent.CREATING:
                 return "CREATING";
@@ -135,54 +167,72 @@
     }
 
     private void callListeners(BlueprintEvent event) {
-        Object[] listeners = containerListenerTracker.getServices();
-        if (listeners != null) {
-            for (Object listener : listeners) {
-                try {
-                    ((BlueprintListener) listener).blueprintEvent(event);
-                } catch (Throwable e) {
-                    // an exception in a BlueprintListener should not terminate the
-                    // dispatch chain to other listeners
-                }
+        for (final BlueprintListener listener : listeners) {
+            try {
+                callListener(listener, event);
+            } catch (RejectedExecutionException ree) {
+                LOGGER.warn("Executor shut down", ree);
+                break;
             }
         }
     }
 
-    protected void contextDestroyed(Bundle bundle) {
+    private void callListener(final BlueprintListener listener, final BlueprintEvent event)
throws RejectedExecutionException {
+        try {
+            executor.invokeAny(Collections.<Callable<Void>>singleton(new Callable<Void>()
{
+                public Void call() throws Exception {
+                    listener.blueprintEvent(event);
+                    return null;
+                }
+            }), 60L, TimeUnit.SECONDS);
+        } catch (InterruptedException ie) {
+            LOGGER.warn("Thread interrupted", ie);
+            Thread.currentThread().interrupt();
+        } catch (TimeoutException te) {
+            LOGGER.warn("Listener timed out, will be ignored", te);
+            listeners.remove(listener);
+        } catch (ExecutionException ee) {
+            LOGGER.warn("Listener caused an exception, will be ignored", ee);
+            listeners.remove(listener);
+        }
+    }
+
+    void contextDestroyed(Bundle bundle) {
         states.remove(bundle);
     }
 
-    public void destroy() {
-        this.executor.shutdown();
+    void destroy() {
+        executor.shutdown();
         // wait for the queued tasks to execute
         try {
-            this.executor.awaitTermination(60, TimeUnit.SECONDS);
+            executor.awaitTermination(60, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
             // ignore
         }
-        this.containerListenerTracker.close();
+        containerListenerTracker.close();
         // clean up the EventAdmin tracker if we're using that
-        if (this.eventAdminListener != null) {
+        if (eventAdminListener != null) {
             eventAdminListener.destroy();
         }
     }
 
-    static class EventAdminListener implements BlueprintListener {
+    private static class EventAdminListener implements BlueprintListener {
 
-        private ServiceTracker tracker;
+        private final ServiceTracker tracker;
 
-        public EventAdminListener(BundleContext context) {
+        EventAdminListener(BundleContext context) {
             tracker = new ServiceTracker(context, EventAdmin.class.getName(), null);
             tracker.open();
         }
 
+        @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
         public void blueprintEvent(BlueprintEvent event) {
             EventAdmin eventAdmin = (EventAdmin) tracker.getService();
             if (eventAdmin == null) {
                 return;
             }
 
-            Dictionary<String,Object> props = new Hashtable<String,Object>();
+            Dictionary<String, Object> props = new Hashtable<String, Object>();
             props.put(EventConstants.TYPE, event.getType());
             props.put(EventConstants.EVENT, event);
             props.put(EventConstants.TIMESTAMP, event.getTimestamp());



Mime
View raw message