ace-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r1521521 [2/3] - in /ace/trunk: org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/ org.apache.ace.agent/ org.apache.ace.agent/src/org/apache/ace/agent/ org.apache.ace.agent/src/org/apache/ace/agent/impl/ org.apache.ace.agent/test/or...
Date Tue, 10 Sep 2013 15:14:47 GMT
Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java?rev=1521521&r1=1521520&r2=1521521&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java Tue Sep 10 15:14:46 2013
@@ -24,21 +24,26 @@ import static org.apache.ace.agent.Agent
 import static org.apache.ace.agent.AgentConstants.CONFIG_CONTROLLER_STREAMING;
 import static org.apache.ace.agent.AgentConstants.CONFIG_CONTROLLER_SYNCDELAY;
 import static org.apache.ace.agent.AgentConstants.CONFIG_CONTROLLER_SYNCINTERVAL;
-
+import static org.apache.ace.agent.impl.InternalConstants.AGENT_CONFIG_CHANGED;
+import static org.apache.ace.agent.impl.ConnectionUtil.*;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.ace.agent.DeploymentHandler;
 import org.apache.ace.agent.DownloadHandle;
 import org.apache.ace.agent.DownloadResult;
 import org.apache.ace.agent.DownloadState;
+import org.apache.ace.agent.EventListener;
 import org.apache.ace.agent.FeedbackChannel;
 import org.apache.ace.agent.RetryAfterException;
 import org.osgi.framework.Version;
@@ -47,45 +52,100 @@ import org.osgi.service.deploymentadmin.
 /**
  * Default configurable controller
  */
-public class DefaultController extends ComponentBase implements Runnable {
+public class DefaultController extends ComponentBase implements Runnable, EventListener {
 
     private volatile ScheduledFuture<?> m_scheduledFuture;
     private volatile UpdateInstaller m_updateInstaller;
 
+    private final AtomicBoolean m_disabled;
+    private final AtomicBoolean m_updateStreaming;
+    private final AtomicBoolean m_fixPackage;
+    private final AtomicLong m_maxRetries;
+    private final AtomicLong m_interval;
+    private final AtomicLong m_syncDelay;
+
     public DefaultController() {
         super("controller");
-    }
 
-    @Override
-    protected void onStart() throws Exception {
-        long delay = getConfigurationHandler().getLong(CONFIG_CONTROLLER_SYNCDELAY, 5);
-        scheduleRun(delay);
-        logDebug("Controller scheduled to run in %d seconds", delay);
+        m_disabled = new AtomicBoolean(false);
+        m_interval = new AtomicLong(60);
+        m_syncDelay = new AtomicLong(5);
+
+        m_updateStreaming = new AtomicBoolean(true);
+        m_fixPackage = new AtomicBoolean(true);
+        m_maxRetries = new AtomicLong(1);
     }
 
     @Override
-    protected void onStop() throws Exception {
-        if (m_updateInstaller != null) {
-            m_updateInstaller.reset();
+    public void handle(String topic, Map<String, String> payload) {
+        if (AGENT_CONFIG_CHANGED.equals(topic)) {
+            String value = payload.get(CONFIG_CONTROLLER_DISABLED);
+            if (value != null && !"".equals(value)) {
+                m_disabled.set(Boolean.parseBoolean(value));
+            }
+
+            value = payload.get(CONFIG_CONTROLLER_STREAMING);
+            if (value != null && !"".equals(value)) {
+                m_updateStreaming.set(Boolean.parseBoolean(value));
+            }
+
+            value = payload.get(CONFIG_CONTROLLER_FIXPACKAGES);
+            if (value != null && !"".equals(value)) {
+                m_fixPackage.set(Boolean.parseBoolean(value));
+            }
+
+            value = payload.get(CONFIG_CONTROLLER_SYNCDELAY);
+            if (value != null && !"".equals(value)) {
+                try {
+                    m_syncDelay.set(Long.parseLong(value));
+                }
+                catch (NumberFormatException exception) {
+                    // Ignore...
+                }
+            }
+
+            value = payload.get(CONFIG_CONTROLLER_RETRIES);
+            if (value != null && !"".equals(value)) {
+                try {
+                    m_maxRetries.set(Long.parseLong(value));
+                }
+                catch (NumberFormatException exception) {
+                    // Ignore...
+                }
+            }
+
+            value = payload.get(CONFIG_CONTROLLER_SYNCINTERVAL);
+            if (value != null && !"".equals(value)) {
+                try {
+                    m_interval.set(Long.parseLong(value));
+                }
+                catch (NumberFormatException exception) {
+                    // Ignore...
+                }
+            }
+            
+            logDebug("Config changed: disabled: %s, update: %s, fixPkg: %s, syncDelay: %d, syncInterval: %d, maxRetries: %d", m_disabled.get(), m_updateStreaming.get(), m_fixPackage.get(), m_syncDelay.get(), m_interval.get(), m_maxRetries.get());
         }
-        unscheduleRun();
     }
 
     @Override
     public void run() {
-        boolean disabled = getConfigurationHandler().getBoolean(CONFIG_CONTROLLER_DISABLED, false);
-        long interval = getConfigurationHandler().getLong(CONFIG_CONTROLLER_SYNCINTERVAL, 60);
-        if (disabled) {
-            logDebug("Controller disabled by configuration. Skipping..");
-            scheduleRun(interval);
-            return;
-        }
+        boolean disabled = m_disabled.get();
+        long interval = m_interval.get();
 
-        logDebug("Controller syncing...");
         try {
+            if (disabled) {
+                logDebug("Controller disabled by configuration. Skipping...");
+                return;
+            }
+
+            logDebug("Controller syncing...");
+
             runFeedback();
             runAgentUpdate();
             runDeploymentUpdate();
+
+            logDebug("Sync completed. Rescheduled in %d seconds", interval);
         }
         catch (RetryAfterException e) {
             // any method may throw this causing the sync to abort. The server is busy so no sense in trying
@@ -98,12 +158,39 @@ public class DefaultController extends C
             // we can do but log it as an error and reschedule as usual.
             logError("Sync aborted due to Exception.", e);
         }
-        scheduleRun(interval);
-        logDebug("Sync completed. Rescheduled in %d seconds", interval);
+        finally {
+            scheduleRun(interval);
+        }
+    }
+
+    @Override
+    protected void onInit() throws Exception {
+        getEventsHandler().addListener(this);
+    }
+
+    @Override
+    protected void onStart() throws Exception {
+        long delay = m_syncDelay.get();
+
+        scheduleRun(delay);
+
+        logDebug("Controller scheduled to run in %d seconds", delay);
+    }
+
+    @Override
+    protected void onStop() throws Exception {
+        getEventsHandler().removeListener(this);
+
+        if (m_updateInstaller != null) {
+            m_updateInstaller.reset();
+        }
+
+        unscheduleRun();
     }
 
     private void runFeedback() throws RetryAfterException {
         logDebug("Synchronizing feedback channels");
+
         Set<String> names = getFeedbackChannelNames();
         for (String name : names) {
             FeedbackChannel channel = getFeedbackChannel(name);
@@ -128,7 +215,7 @@ public class DefaultController extends C
         catch (IOException e) {
             // Probably a serious problem due to local IO related to feedback. No cause to abort the sync so we just log
             // it as an error.
-            logError("Exception while Looking up feedback channelnames. This is ");
+            logError("Exception while looking up feedback channel names.");
         }
         return Collections.emptySet();
     }
@@ -147,6 +234,7 @@ public class DefaultController extends C
 
     private void runAgentUpdate() throws RetryAfterException {
         logDebug("Checking for agent update");
+
         Version current = getAgentUpdateHandler().getInstalledVersion();
         SortedSet<Version> available = getAvailableAgentVersions();
         Version highest = Version.emptyVersion;
@@ -160,6 +248,7 @@ public class DefaultController extends C
         }
 
         logInfo("Installing agent update %s => %s", current, highest);
+
         InputStream inputStream = null;
         try {
             inputStream = getAgentUpdateHandler().getInputStream(highest);
@@ -171,6 +260,9 @@ public class DefaultController extends C
             // FIXME Does not cover failed updates and should handle retries
             logWarning("Exception while installing agent update %s", e, highest);
         }
+        finally {
+            closeSilently(inputStream);
+        }
     }
 
     private SortedSet<Version> getAvailableAgentVersions() throws RetryAfterException {
@@ -186,8 +278,8 @@ public class DefaultController extends C
     }
 
     private void runDeploymentUpdate() throws RetryAfterException {
-
         logDebug("Checking for deployment update");
+
         Version current = getDeploymentHandler().getInstalledVersion();
         SortedSet<Version> available = getAvailableDeploymentVersions();
         Version highest = Version.emptyVersion;
@@ -200,9 +292,9 @@ public class DefaultController extends C
             return;
         }
 
-        boolean updateStreaming = getConfigurationHandler().getBoolean(CONFIG_CONTROLLER_STREAMING, true);
-        boolean fixPackage = getConfigurationHandler().getBoolean(CONFIG_CONTROLLER_FIXPACKAGES, true);
-        long maxRetries = getConfigurationHandler().getLong(CONFIG_CONTROLLER_RETRIES, 1);
+        boolean updateStreaming = m_updateStreaming.get();
+        boolean fixPackage = m_fixPackage.get();
+        long maxRetries = m_maxRetries.get();
 
         getUpdateInstaller(updateStreaming).installUpdate(current, highest, fixPackage, maxRetries);
     }
@@ -248,7 +340,7 @@ public class DefaultController extends C
 
     private void unscheduleRun() {
         if (m_scheduledFuture != null)
-            m_scheduledFuture.cancel(true);
+            m_scheduledFuture.cancel(false /* mayInterruptWhileRunning */);
     }
 
     /**
@@ -256,7 +348,6 @@ public class DefaultController extends C
      * delegates the rest to concrete implementations.
      */
     abstract static class UpdateInstaller {
-
         private final DefaultController m_controller;
         private Version m_lastVersion = null;
         private int m_failureCount = 0;
@@ -315,14 +406,12 @@ public class DefaultController extends C
      * UpdateInstaller that provides streaming deployment package install. The install is blocking.
      */
     static class StreamingUpdateInstaller extends UpdateInstaller {
-
         public StreamingUpdateInstaller(DefaultController controller) {
             super(controller);
         }
 
         @Override
         public void doInstallUpdate(Version from, Version to, boolean fix) throws RetryAfterException, DeploymentException, IOException {
-
             getController().logInfo("Installing streaming deployment update %s => %s", from, to);
 
             DeploymentHandler deploymentHandler = getController().getDeploymentHandler();
@@ -346,6 +435,7 @@ public class DefaultController extends C
 
         @Override
         protected void doReset() {
+            // Nop
         }
     }
 
@@ -354,7 +444,6 @@ public class DefaultController extends C
      * completion this installer will reschedule the controller.
      */
     static class DownloadUpdateInstaller extends UpdateInstaller implements DownloadHandle.ProgressListener, DownloadHandle.ResultListener {
-
         // active download state
         private volatile DownloadHandle m_downloadHandle;
         private volatile DownloadResult m_downloadResult = null;
@@ -431,7 +520,7 @@ public class DefaultController extends C
         @Override
         public void completed(DownloadResult result) {
             m_downloadResult = result;
-            getController().logInfo("Deployment package donwload completed for version %s. Rescheduling the controller to run in %d seconds", m_downloadVersion, 1);
+            getController().logInfo("Deployment package download completed for version %s. Rescheduling the controller to run in %d seconds", m_downloadVersion, 1);
             getController().scheduleRun(1);
         }
 

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DependencyTrackerImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DependencyTrackerImpl.java?rev=1521521&r1=1521520&r2=1521521&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DependencyTrackerImpl.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DependencyTrackerImpl.java Tue Sep 10 15:14:46 2013
@@ -18,10 +18,9 @@
  */
 package org.apache.ace.agent.impl;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
@@ -29,261 +28,278 @@ import org.osgi.framework.Filter;
 import org.osgi.framework.FrameworkUtil;
 import org.osgi.framework.ServiceReference;
 import org.osgi.util.tracker.ServiceTracker;
-import org.osgi.util.tracker.ServiceTrackerCustomizer;
 
 /**
  * Simple service dependency tracker that tracks a number of required dependencies and provides life-cycle.
  */
 public class DependencyTrackerImpl {
-
-    interface LifecycleCallbacks {
-        void started();
-
-        void stopped();
-    }
-
-    interface DependencyCallback {
+    /**
+     * Called when an individual dependency is (no longer) satisfied.
+     */
+    static interface DependencyCallback {
+        /**
+         * Called when a dependency is updated.
+         * 
+         * @param service
+         *            the new dependency, can be <code>null</code> in case the dependency is no longer available.
+         */
         void updated(Object service);
     }
 
-    private final Set<ServiceDependency> m_dependencies = new HashSet<ServiceDependency>();
-    private final BundleContext m_bundleContext;
-    private final LifecycleCallbacks m_callbacks;
-    private volatile boolean m_tracking = false;
-    private volatile boolean m_started = false;
-
-    public DependencyTrackerImpl(BundleContext bundleContext, LifecycleCallbacks callbacks) {
-        m_bundleContext = bundleContext;
-        m_callbacks = callbacks;
+    /**
+     * Callback interface for reporting the state of the tracked dependencies.
+     */
+    static interface LifecycleCallback {
+        /**
+         * Called when all dependencies are satisfied.
+         */
+        void componentStarted(BundleContext context) throws Exception;
+
+        /**
+         * Called when one or more dependencies are no longer satisfied.
+         */
+        void componentStopped(BundleContext context) throws Exception;
     }
 
-    public BundleContext getBundleContext() {
-        return m_bundleContext;
-    }
+    /**
+     * Represents an actual dependency on an OSGi service.
+     */
+    private static class ServiceDependency {
+        private final DependencyTrackerImpl m_manager;
+        private final DependencyCallback m_calback;
+        private final ServiceTracker m_tracker;
+        // the actual tracked service...
+        private final AtomicReference<Object> m_serviceRef;
 
-    public void addDependency(Class<?> iface, String extraFilter, DependencyCallback inject) throws Exception {
-        synchronized (this) {
-            if (m_tracking) {
-                throw new IllegalStateException("Can not add dependecies while tracking");
-            }
-        }
-        Filter filter = null;
-        if (extraFilter != null) {
-            filter = FrameworkUtil.createFilter("(&(" + Constants.OBJECTCLASS + "=" + iface.getName() + ")" + extraFilter + ")");
-        }
-        else {
-            filter = FrameworkUtil.createFilter("(" + Constants.OBJECTCLASS + "=" + iface.getName() + ")");
+        public ServiceDependency(DependencyTrackerImpl manager, String filterString, DependencyCallback callback) throws Exception {
+            m_manager = manager;
+            m_calback = callback;
+
+            m_tracker = new ServiceDependencyTracker(this, manager.getBundleContext(), FrameworkUtil.createFilter(filterString));
+            m_serviceRef = new AtomicReference<Object>();
         }
-        ServiceDependency dependency = new ServiceDependency(this, filter, inject);
-        m_dependencies.add(dependency);
-    }
 
-    public void startTracking() throws Exception {
-        synchronized (this) {
-            if (m_tracking) {
-                throw new IllegalStateException("Allready started tracking");
-            }
-            m_tracking = true;
+        public Object getService() {
+            return m_serviceRef.get();
         }
-        for (ServiceDependency dependency : m_dependencies) {
-            dependency.startTracking();
+
+        public boolean isServiceAvailable() {
+            return getService() != null;
         }
-    }
 
-    public void stopTracking() {
-        synchronized (this) {
-            if (!m_tracking) {
-                throw new IllegalStateException("Did not start tracking yet");
-            }
-            m_tracking = false;
+        public void startTracking() {
+            m_tracker.open();
         }
-        for (ServiceDependency dependency : m_dependencies) {
-            dependency.stopTracking();
+
+        public void stopTracking() {
+            m_tracker.close();
         }
-    }
 
-    private void update() {
-        // As this is a simple internal implementation we assume we can safely invoke
-        // callbacks while holding locks.
-        synchronized (this) {
-            if (dependenciesAvailable()) {
-                if (m_started) {
-                    stopCallback();
+        void changed(ServiceReference ref) {
+            Object service = (ref == null) ? null : m_manager.getBundleContext().getService(ref);
+            Object oldService;
+            do {
+                oldService = m_serviceRef.get();
+            }
+            while (!m_serviceRef.compareAndSet(oldService, service));
+
+            // Check on reference(!) to determine whether the service is changed...
+            if (oldService != service) {
+                if (m_calback != null) {
+                    m_calback.updated(service);
                 }
-                serviceCallbacks();
-                startCallback();
-            }
-            else {
-                if (m_started) {
-                    stopCallback();
-                    serviceCallbacks();
-                }
-            }
-        }
-    }
 
-    private boolean dependenciesAvailable() {
-        boolean available = true;
-        for (ServiceDependency dependency : m_dependencies) {
-            if (dependency.getService() == null) {
-                available = false;
-                break;
+                m_manager.update();
             }
         }
-        return available;
     }
 
-    private void startCallback() {
-        try {
-            m_callbacks.started();
-            m_started = true;
-        }
-        catch (Exception e) {
-            // really must not happen
-            e.printStackTrace();
+    /**
+     * Tracker customizer that calls AgentContextDependency#changed with the highest matching service whenever something
+     * changes.
+     */
+    private static class ServiceDependencyTracker extends ServiceTracker {
+        private final CopyOnWriteArrayList<ServiceReference> m_trackedServiceRefs;
+        private final ServiceDependency m_dependency;
+
+        public ServiceDependencyTracker(ServiceDependency dependency, BundleContext context, Filter filter) {
+            super(context, filter, null);
+            m_dependency = dependency;
+            m_trackedServiceRefs = new CopyOnWriteArrayList<ServiceReference>();
         }
-    }
 
-    private void stopCallback() {
-        try {
-            m_callbacks.stopped();
-            m_started = false;
+        @Override
+        public Object addingService(ServiceReference reference) {
+            if (m_trackedServiceRefs.addIfAbsent(reference)) {
+                checkForUpdate();
+            }
+            return super.addingService(reference);
         }
-        catch (Exception e) {
-            // really must not happen
-            e.printStackTrace();
+
+        @Override
+        public void modifiedService(ServiceReference reference, Object service) {
+            checkForUpdate();
         }
-    }
 
-    private void serviceCallbacks() {
-        for (ServiceDependency dependency : m_dependencies) {
-            try {
-                dependency.invokeCallback();
+        @Override
+        public void removedService(ServiceReference reference, Object service) {
+            if (m_trackedServiceRefs.remove(reference)) {
+                checkForUpdate();
             }
-            catch (Exception e) {
-                // really must not happen
-                e.printStackTrace();
+        }
+
+        private void checkForUpdate() {
+            ServiceReference highestReference = null;
+            for (ServiceReference reference : m_trackedServiceRefs) {
+                if (highestReference == null || highestReference.compareTo(reference) < 1) {
+                    highestReference = reference;
+                }
             }
+
+            m_dependency.changed(highestReference);
         }
     }
 
-    private static class ServiceDependency {
-
-        private final DependencyTrackerImpl m_manager;
-        private final Filter m_filter;
-        private final DependencyCallback m_calback;
-        private final ServiceTracker m_tracker;
-        private volatile Object m_service;
+    private final BundleContext m_bundleContext;
+    private final LifecycleCallback m_callback;
+    private final CopyOnWriteArrayList<ServiceDependency> m_dependencies;
+    private final AtomicBoolean m_tracking;
+    private final AtomicBoolean m_started;
 
-        public ServiceDependency(DependencyTrackerImpl manager, Filter filter, DependencyCallback callback) throws Exception {
-            m_manager = manager;
-            m_filter = filter;
-            m_calback = callback;
-            m_tracker = new ServiceDependencyTracker(this);
-        }
+    /**
+     * Creates a new {@link DependencyTrackerImpl} instance.
+     * 
+     * @param bundleContext
+     *            the bundle context;
+     * @param callback
+     *            the component callback.
+     */
+    public DependencyTrackerImpl(BundleContext bundleContext, LifecycleCallback callback) {
+        m_bundleContext = bundleContext;
+        m_callback = callback;
 
-        public BundleContext getBundleContext() {
-            return m_manager.getBundleContext();
-        }
+        m_dependencies = new CopyOnWriteArrayList<ServiceDependency>();
+        m_tracking = new AtomicBoolean(false);
+        m_started = new AtomicBoolean(false);
+    }
 
-        public Filter getFilter() {
-            return m_filter;
+    /**
+     * Adds a dependency to track.
+     * 
+     * @param iface
+     *            the interface of the dependency to track;
+     * @param extraFilter
+     *            an optional filter for the tracked dependency;
+     * @param callback
+     *            the callback to call when the dependency comes (un)available.
+     */
+    public void addDependency(Class<?> iface, String extraFilter, DependencyCallback callback) throws Exception {
+        if (m_tracking.get()) {
+            throw new IllegalStateException("Can not add new dependency while tracking is started!");
         }
 
-        public Object getService() {
-            return m_service;
+        String filter = String.format("(%s=%s)", Constants.OBJECTCLASS, iface.getName());
+        if (extraFilter != null) {
+            filter = String.format("(&%s%s)", filter, extraFilter);
         }
 
-        public void startTracking() {
-            if (m_tracker == null) {
-            }
-            m_tracker.open();
-        }
+        m_dependencies.addIfAbsent(new ServiceDependency(this, filter, callback));
+    }
 
-        public void stopTracking() {
-            m_tracker.close();
-        }
+    public BundleContext getBundleContext() {
+        return m_bundleContext;
+    }
 
-        void invokeCallback() {
-            if (m_calback != null) {
-                m_calback.updated(m_service);
-            }
+    /**
+     * Starts tracking all dependencies, if all dependencies are satisfied,
+     * {@link LifecycleCallback#componentStarted(BundleContext)} will be called. For each satisfied dependency,
+     * {@link DependencyCallback#updated(Object)} is called.
+     * 
+     * @throws IllegalStateException
+     *             in case this tracker is already started.
+     */
+    public void startTracking() throws Exception {
+        // This method should be called once and only once...
+        if (!m_tracking.compareAndSet(false, true)) {
+            throw new IllegalStateException("Already started tracking!");
         }
 
-        void changed(Object service) {
-            // Sync on manager to ensure all dependency updates happen in order
-            synchronized (m_manager) {
-                m_service = service;
-                m_manager.update();
-            }
+        for (ServiceDependency dependency : m_dependencies) {
+            dependency.startTracking();
         }
     }
 
     /**
-     * Custom service tracker to simply construction.
+     * Stops tracking of dependencies. For each tracked dependency, {@link DependencyCallback#updated(Object)} is called
+     * with a <code>null</code> value.
      * 
+     * @throws IllegalStateException
+     *             in case this tracker is already started.
      */
-    private static class ServiceDependencyTracker extends ServiceTracker {
-
-        public ServiceDependencyTracker(ServiceDependency dependency) {
-            super(dependency.getBundleContext(), dependency.getFilter(), new ServiceDependencyTrackerCustomizer(dependency));
+    public void stopTracking() {
+        // This method should be called once and only once...
+        if (!m_tracking.compareAndSet(true, false)) {
+            throw new IllegalStateException("Did not start tracking yet");
+        }
 
+        for (ServiceDependency dependency : m_dependencies) {
+            dependency.stopTracking();
         }
     }
 
     /**
-     * Tracker customizer that calls AgentContextDependency#changed with the highest matching service whenever something
-     * changes.
+     * Called for each change in the dependency set. It will call
+     * {@link LifecycleCallback#componentStopped(BundleContext)} if needed, and
+     * {@link LifecycleCallback#componentStarted(BundleContext)} when all dependencies are met.
      */
-    private static class ServiceDependencyTrackerCustomizer implements ServiceTrackerCustomizer {
+    public void update() {
+        stopComponent();
 
-        private final Map<ServiceReference, Object> m_trackedServices = new HashMap<ServiceReference, Object>();
-        private final ServiceDependency m_dependency;
-        private volatile Object m_service;
-
-        public ServiceDependencyTrackerCustomizer(ServiceDependency dependency) {
-            m_dependency = dependency;
+        if (allDependenciesAvailable()) {
+            startComponent();
         }
+    }
 
-        @Override
-        public Object addingService(ServiceReference reference) {
-            Object service = m_dependency.getBundleContext().getService(reference);
-            synchronized (m_trackedServices) {
-                m_trackedServices.put(reference, service);
-                checkForUpdate();
-                return service;
+    /**
+     * @return <code>true</code> if all dependencies are available, <code>false</code> otherwise.
+     */
+    final boolean allDependenciesAvailable() {
+        for (ServiceDependency dependency : m_dependencies) {
+            if (!dependency.isServiceAvailable()) {
+                return false;
             }
         }
+        return true;
+    }
 
-        @Override
-        public void modifiedService(ServiceReference reference, Object service) {
-            synchronized (m_trackedServices) {
-                m_trackedServices.put(reference, service);
-                checkForUpdate();
+    /**
+     * Tries to start the component, if it is not already started.
+     */
+    final void startComponent() {
+        // Only call our callback when we're actually starting tracking dependencies...
+        if (m_started.compareAndSet(false, true)) {
+            try {
+                m_callback.componentStarted(m_bundleContext);
             }
-        }
-
-        @Override
-        public void removedService(ServiceReference reference, Object service) {
-            synchronized (m_trackedServices) {
-                m_trackedServices.remove(reference);
-                checkForUpdate();
+            catch (Exception e) {
+                // really must not happen
+                e.printStackTrace();
             }
         }
+    }
 
-        private void checkForUpdate() {
-            ServiceReference highestReference = null;
-            if (!m_trackedServices.isEmpty()) {
-                for (ServiceReference reference : m_trackedServices.keySet()) {
-                    if (highestReference == null || highestReference.compareTo(reference) < 1) {
-                        highestReference = reference;
-                    }
-                }
+    /**
+     * Tries to stop the component, if it is not already stopped.
+     */
+    final void stopComponent() {
+        // Only call our callback when we're actually started tracking dependencies...
+        if (m_started.compareAndSet(true, false)) {
+            try {
+                m_callback.componentStopped(m_bundleContext);
             }
-            Object service = highestReference == null ? null : m_trackedServices.get(highestReference);
-            if (m_service == null || m_service != service) {
-                m_service = service;
-                m_dependency.changed(service);
+            catch (Exception e) {
+                // really must not happen
+                e.printStackTrace();
             }
         }
     }

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DeploymentHandlerImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DeploymentHandlerImpl.java?rev=1521521&r1=1521520&r2=1521521&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DeploymentHandlerImpl.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DeploymentHandlerImpl.java Tue Sep 10 15:14:46 2013
@@ -18,11 +18,11 @@
  */
 package org.apache.ace.agent.impl;
 
+import static org.apache.ace.agent.impl.ReflectionUtil.configureField;
+import static org.apache.ace.agent.impl.ReflectionUtil.invokeMethod;
+
 import java.io.IOException;
 import java.io.InputStream;
-import java.lang.reflect.AccessibleObject;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.HashMap;
@@ -82,11 +82,15 @@ public class DeploymentHandlerImpl exten
     @Override
     public Version getInstalledVersion() {
         Version highestVersion = Version.emptyVersion;
+        String identification = getIdentification();
+
         DeploymentPackage[] installedPackages = m_deploymentAdmin.listDeploymentPackages();
         for (DeploymentPackage installedPackage : installedPackages) {
-            if (installedPackage.getName().equals(getIdentification())
-                && installedPackage.getVersion().compareTo(highestVersion) > 0) {
-                highestVersion = installedPackage.getVersion();
+            String packageId = installedPackage.getName();
+            Version packageVersion = installedPackage.getVersion();
+
+            if (identification.equals(packageId) && packageVersion.compareTo(highestVersion) > 0) {
+                highestVersion = packageVersion;
             }
         }
         return highestVersion;
@@ -118,8 +122,7 @@ public class DeploymentHandlerImpl exten
     };
 
     private URL getPackageURL(Version version, boolean fixPackage) throws RetryAfterException, IOException {
-        URL url = getEndpoint(getServerURL(), getIdentification(), fixPackage ? getInstalledVersion() : Version.emptyVersion, version);
-        return url;
+        return getEndpoint(getServerURL(), getIdentification(), fixPackage ? getInstalledVersion() : Version.emptyVersion, version);
     }
 
     private URL getEndpoint(URL serverURL, String identification) {
@@ -145,41 +148,10 @@ public class DeploymentHandlerImpl exten
         }
     }
 
-    private static void configureField(Object object, Class<?> iface, Object instance) {
-        // Note: Does not check super classes!
-        Field[] fields = object.getClass().getDeclaredFields();
-        AccessibleObject.setAccessible(fields, true);
-        for (int j = 0; j < fields.length; j++) {
-            if (fields[j].getType().equals(iface)) {
-                try {
-                    fields[j].set(object, instance);
-                }
-                catch (Exception e) {
-                    e.printStackTrace();
-                    throw new IllegalStateException("Coudld not set field " + fields[j].getName() + " on " + object);
-                }
-            }
-        }
-    }
-
-    private static Object invokeMethod(Object object, String methodName, Class<?>[] signature, Object[] parameters) {
-        // Note: Does not check super classes!
-        Class<?> clazz = object.getClass();
-        try {
-            Method method = clazz.getDeclaredMethod(methodName, signature);
-            return method.invoke(object, parameters);
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-        }
-        return null;
-    }
-
     /**
      * Internal EventAdmin that delegates to actual InternalEvents. Used to inject into the DeploymentAdmin only.
      */
-    class EventAdminBridge implements EventAdmin {
-
+    final class EventAdminBridge implements EventAdmin {
         @Override
         public void postEvent(Event event) {
             getEventsHandler().postEvent(event.getTopic(), getPayload(event));
@@ -202,8 +174,7 @@ public class DeploymentHandlerImpl exten
     /**
      * Internal LogService that wraps delegates to actual InternalLogger. Used to inject into the DeploymentAdmin only.
      */
-    class LogServiceBridge implements LogService {
-
+    final class LogServiceBridge implements LogService {
         @Override
         public void log(int level, String message) {
             log(level, message, null);
@@ -237,5 +208,4 @@ public class DeploymentHandlerImpl exten
             log(level, message, exception);
         }
     }
-
 }

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DiscoveryHandlerImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DiscoveryHandlerImpl.java?rev=1521521&r1=1521520&r2=1521521&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DiscoveryHandlerImpl.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DiscoveryHandlerImpl.java Tue Sep 10 15:14:46 2013
@@ -20,111 +20,203 @@ package org.apache.ace.agent.impl;
 
 import static org.apache.ace.agent.AgentConstants.CONFIG_DISCOVERY_CHECKING;
 import static org.apache.ace.agent.AgentConstants.CONFIG_DISCOVERY_SERVERURLS;
+import static org.apache.ace.agent.impl.InternalConstants.AGENT_CONFIG_CHANGED;
 
 import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLConnection;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.ace.agent.DiscoveryHandler;
+import org.apache.ace.agent.EventListener;
 
 /**
  * Default thread-safe {@link DiscoveryHandler} implementation that reads the serverURL(s) from the configuration using
  * key {@link CONFIG_DISCOVERY_SERVERURLS}. If the {@link CONFIG_DISCOVERY_CHECKING} flag is a connection is opened to
  * test whether a serverURL is available before it is returned.
  */
-public class DiscoveryHandlerImpl extends ComponentBase implements DiscoveryHandler {
+public class DiscoveryHandlerImpl extends ComponentBase implements DiscoveryHandler, EventListener {
 
-    private final Map<String, CheckedURL> m_availableURLs = new HashMap<String, DiscoveryHandlerImpl.CheckedURL>();
-    private final Map<String, CheckedURL> m_blacklistedURLs = new HashMap<String, DiscoveryHandlerImpl.CheckedURL>();
+    private static class CheckedURL {
+        /** cache timeout in milliseconds. */
+        private static final long CACHE_TIME = 30000;
+
+        public final URL m_url;
+        private final AtomicLong m_timestamp;
+        private final AtomicBoolean m_blackListed;
+
+        public CheckedURL(URL url) {
+            m_url = url;
+            m_blackListed = new AtomicBoolean(false);
+            m_timestamp = new AtomicLong(0L);
+        }
 
-    private static final long CACHE_TIME = 2000;
+        public void available() {
+            m_blackListed.set(false);
+            m_timestamp.set(System.currentTimeMillis());
+        }
+
+        public void blacklist() {
+            m_blackListed.set(true);
+            m_timestamp.set(System.currentTimeMillis());
+        }
+
+        public boolean isBlacklisted() {
+            boolean result = m_blackListed.get();
+            if (result) {
+                if (!isRecentlyChecked()) {
+                    // lift the ban...
+                    m_blackListed.compareAndSet(result, false);
+                    result = false;
+                }
+            }
+            return result;
+        }
+
+        public boolean isRecentlyChecked() {
+            return m_timestamp.get() > (System.currentTimeMillis() - CACHE_TIME);
+        }
+    }
+
+    /** default server URL. */
+    private static final String DEFAULT_SERVER_URL = "http://localhost:8080";
+    /** whether or not to test server URLs. */
+    private static final boolean DEFAULT_CHECK_SERVER_ULRS = false;
+
+    private final List<String> m_urls;
+    private final AtomicBoolean m_checkURLs;
+    private final ConcurrentMap<String, CheckedURL> m_availableURLs;
 
     public DiscoveryHandlerImpl() {
         super("discovery");
-    }
 
-    @Override
-    protected void onStop() throws Exception {
-        m_availableURLs.clear();
-        m_blacklistedURLs.clear();
+        m_availableURLs = new ConcurrentHashMap<String, CheckedURL>();
+        m_checkURLs = new AtomicBoolean(DEFAULT_CHECK_SERVER_ULRS);
+        m_urls = new ArrayList<String>(Arrays.asList(DEFAULT_SERVER_URL));
     }
 
-    // TODO Pretty naive implementation below. It always takes the first configured URL it can connect to and is not
-    // thread-safe.
+    /**
+     * Returns the first available URL, based on the order specified in the configuration.
+     * 
+     * @return a (valid) server URL, or <code>null</code> in case no server URL was valid.
+     */
     @Override
     public URL getServerUrl() {
-
-        String configValue = getConfigurationHandler().get(CONFIG_DISCOVERY_SERVERURLS, "http://localhost:8080");
-        boolean checking = getConfigurationHandler().getBoolean(CONFIG_DISCOVERY_CHECKING, false);
+        String[] urls;
+        synchronized (m_urls) {
+            urls = new String[m_urls.size()];
+            m_urls.toArray(urls);
+        }
+        boolean checking = m_checkURLs.get();
 
         URL url = null;
-        if (configValue.indexOf(",") == -1) {
-            url = getURL(configValue.trim(), checking);
-        }
-        else {
-            for (String configValuePart : configValue.split(",")) {
-                url = getURL(configValuePart.trim(), checking);
-                if (url != null) {
-                    break;
-                }
+        for (String urlValue : urls) {
+            if ((url = getURL(urlValue, checking)) != null) {
+                break;
             }
         }
+
         if (url == null) {
-            logWarning("No connectable serverUrl available");
+            logWarning("No valid server URL discovered?!");
         }
+
         return url;
     }
 
-    private static class CheckedURL {
-        URL url;
-        long timestamp;
+    @Override
+    public void handle(String topic, Map<String, String> payload) {
+        if (AGENT_CONFIG_CHANGED.equals(topic)) {
+            String value = payload.get(CONFIG_DISCOVERY_SERVERURLS);
+            if (value != null && !"".equals(value.trim())) {
+                String[] urls = value.trim().split("\\s*,\\s*");
+
+                synchronized (m_urls) {
+                    m_urls.clear();
+                    m_urls.addAll(Arrays.asList(urls));
+                }
+                // Assume nothing about the newly configured URLs...
+                m_availableURLs.clear();
+            }
 
-        public CheckedURL(URL url, long timestamp) {
-            this.url = url;
-            this.timestamp = timestamp;
+            value = payload.get(CONFIG_DISCOVERY_CHECKING);
+            if (value != null) {
+                boolean checkURLs = Boolean.parseBoolean(value);
+                // last one wins...
+                m_checkURLs.set(checkURLs);
+            }
         }
     }
 
-    private URL getURL(String serverURL, boolean checking) {
+    @Override
+    protected void onInit() throws Exception {
+        getEventsHandler().addListener(this);
+    }
+
+    @Override
+    protected void onStop() throws Exception {
+        getEventsHandler().removeListener(this);
+
+        m_availableURLs.clear();
+    }
+
+    private URL getURL(String serverURL, boolean checkURL) {
+        CheckedURL checkedURL = null;
+        URL result = null;
 
-        URL url = null;
         try {
-            CheckedURL blackListed = m_blacklistedURLs.get(serverURL);
-            if (blackListed != null && blackListed.timestamp > (System.currentTimeMillis() - CACHE_TIME)) {
-                logDebug("Ignoring blacklisted serverURL: " + serverURL);
-                return null;
-            }
+            logDebug("Start getting URL for : %s", serverURL);
 
-            url = new URL(serverURL);
-            if (!checking) {
-                return url;
+            checkedURL = m_availableURLs.get(serverURL);
+            if (checkedURL == null) {
+                checkedURL = new CheckedURL(new URL(serverURL));
+
+                CheckedURL putResult = m_availableURLs.putIfAbsent(serverURL, checkedURL);
+                if (putResult != null) {
+                    // lost the put, make sure to use the correct object...
+                    checkedURL = putResult;
+                }
             }
 
-            CheckedURL available = m_availableURLs.get(serverURL);
-            if (available != null && available.timestamp > (System.currentTimeMillis() - CACHE_TIME)) {
-                logDebug("Returning available serverURL: " + available.url.toExternalForm());
-                return available.url;
+            if (checkedURL.isBlacklisted()) {
+                logDebug("Ignoring blacklisted serverURL: %s", serverURL);
+                // Take the short way home...
+                return null;
             }
 
-            tryConnect(url);
-            logDebug("Succesfully connected to  serverURL: %s", serverURL);
-            m_availableURLs.put(serverURL, new CheckedURL(url, System.currentTimeMillis()));
-            return url;
+            result = checkedURL.m_url;
+            if (checkURL && !checkedURL.isRecentlyChecked()) {
+                logDebug("Trying to connect to serverURL: %s", serverURL);
+
+                tryConnect(checkedURL.m_url);
+                // no exception was thrown trying to connect to the URL, so assume it's available...
+                checkedURL.available();
+
+                logDebug("Succesfully connected to serverURL: %s", serverURL);
+            }
         }
         catch (MalformedURLException e) {
-            logError("Temporarily blacklisting malformed serverURL: " + serverURL);
-            m_blacklistedURLs.put(serverURL, new CheckedURL(url, System.currentTimeMillis()));
-            return null;
+            logWarning("Ignoring invalid/malformed serverURL: %s", serverURL);
+            // No need to blacklist for this case, we're trying to create a CheckedURL which isn't present...
+            result = null;
         }
         catch (IOException e) {
-            logWarning("Temporarily blacklisting unavailable serverURL: " + serverURL);
-            m_blacklistedURLs.put(serverURL, new CheckedURL(url, System.currentTimeMillis()));
-            return null;
+            logWarning("Temporarily blacklisting unavailable serverURL: %s", serverURL);
+            if (checkedURL != null) {
+                checkedURL.blacklist();
+            }
+            result = null;
         }
+
+        return result;
     }
 
     private void tryConnect(URL serverURL) throws IOException {
@@ -134,8 +226,9 @@ public class DiscoveryHandlerImpl extend
             connection.connect();
         }
         finally {
-            if (connection != null && connection instanceof HttpURLConnection)
+            if (connection instanceof HttpURLConnection) {
                 ((HttpURLConnection) connection).disconnect();
+            }
         }
     }
 }

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadHandleImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadHandleImpl.java?rev=1521521&r1=1521520&r2=1521521&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadHandleImpl.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadHandleImpl.java Tue Sep 10 15:14:46 2013
@@ -32,7 +32,6 @@ import org.apache.ace.agent.DownloadStat
 /**
  * A {@link DownloadHandle} implementation that supports pause/resume semantics based on HTTP Range headers assuming the
  * server supports this feature.
- * 
  */
 class DownloadHandleImpl implements DownloadHandle {
 
@@ -80,8 +79,9 @@ class DownloadHandleImpl implements Down
     }
 
     DownloadHandle start(int failAtPosition) {
-        if (m_started)
-            throw new IllegalStateException("Can not call start on a handle that is allready started");
+        if (m_started) {
+            throw new IllegalStateException("Can not call start on a handle that is already started");
+        }
         if (m_file == null) {
             try {
                 m_file = File.createTempFile("download", ".bin");
@@ -96,8 +96,9 @@ class DownloadHandleImpl implements Down
 
     @Override
     public DownloadHandle stop() {
-        if (!m_started && !m_completed)
+        if (!m_started && !m_completed) {
             throw new IllegalStateException("Can not call stop on a handle that is not yet started");
+        }
         m_started = false;
         stopDownload();
         return this;
@@ -174,22 +175,24 @@ class DownloadHandleImpl implements Down
     }
 
     private static void callProgressListener(ProgressListener listener, long contentLength, long progress) {
-        if (listener != null)
+        if (listener != null) {
             try {
                 listener.progress(contentLength, progress);
             }
             catch (Exception e) {
                 // ignore
             }
+        }
     }
 
     private static void callCompletionListener(ResultListener listener, DownloadResult result) {
-        if (listener != null && result != null)
+        if (listener != null && result != null) {
             try {
                 listener.completed(result);
             }
             catch (Exception e) {
                 // ignore
             }
+        }
     }
 }

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java?rev=1521521&r1=1521520&r2=1521521&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java Tue Sep 10 15:14:46 2013
@@ -21,6 +21,7 @@ package org.apache.ace.agent.impl;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.ace.agent.EventListener;
 import org.apache.ace.agent.FeedbackChannel;
@@ -45,50 +46,58 @@ public class EventLoggerImpl extends Com
     public static final String TOPIC_COMPLETE = "org/osgi/service/deployment/COMPLETE";
 
     private final BundleContext m_bundleContext;
-    private volatile boolean m_isStarted = false;
+    private final AtomicBoolean m_isStarted;
 
     public EventLoggerImpl(BundleContext bundleContext) {
         super("auditlogger");
+
         m_bundleContext = bundleContext;
+        m_isStarted = new AtomicBoolean(false);
     }
 
     @Override
-    protected void onStart() throws Exception {
+    protected void onInit() throws Exception {
         getEventsHandler().addListener(this);
-        m_bundleContext.addBundleListener(this);
-        m_bundleContext.addFrameworkListener(this);
-        m_isStarted = true;
+    }
+
+    @Override
+    protected void onStart() throws Exception {
+        if (m_isStarted.compareAndSet(false, true)) {
+            m_bundleContext.addBundleListener(this);
+            m_bundleContext.addFrameworkListener(this);
+        }
     }
 
     @Override
     protected void onStop() throws Exception {
-        m_isStarted = false;
-        getEventsHandler().removeListener(this);
-        m_bundleContext.removeBundleListener(this);
-        m_bundleContext.removeFrameworkListener(this);
+        if (m_isStarted.compareAndSet(true, false)) {
+            getEventsHandler().removeListener(this);
+
+            m_bundleContext.removeBundleListener(this);
+            m_bundleContext.removeFrameworkListener(this);
+        }
     }
 
     @Override
     public void handle(String topic, Map<String, String> payload) {
-        if (!m_isStarted) {
+        if (!m_isStarted.get()) {
             return;
         }
 
         int eventType = AuditEvent.DEPLOYMENTADMIN_BASE;
         Map<String, String> props = new HashMap<String, String>();
 
-        if (topic.equals(TOPIC_INSTALL)) {
+        if (TOPIC_INSTALL.equals(topic)) {
             String deplPackName = payload.get("deploymentpackage.name");
             eventType = AuditEvent.DEPLOYMENTADMIN_INSTALL;
             props.put(AuditEvent.KEY_NAME, deplPackName);
         }
-
-        else if (topic.equals(TOPIC_UNINSTALL)) {
+        else if (TOPIC_UNINSTALL.equals(topic)) {
             String deplPackName = payload.get("deploymentpackage.name");
             eventType = AuditEvent.DEPLOYMENTADMIN_UNINSTALL;
             props.put(AuditEvent.KEY_NAME, deplPackName);
         }
-        else if (topic.equals(TOPIC_COMPLETE)) {
+        else if (TOPIC_COMPLETE.equals(topic)) {
             eventType = AuditEvent.DEPLOYMENTADMIN_COMPLETE;
             props.put(AuditEvent.KEY_NAME, payload.get("deploymentpackage.name"));
             props.put(AuditEvent.KEY_VERSION, getDeploymentHandler().getInstalledVersion().toString());
@@ -99,7 +108,7 @@ public class EventLoggerImpl extends Com
 
     @Override
     public void bundleChanged(BundleEvent event) {
-        if (!m_isStarted) {
+        if (!m_isStarted.get()) {
             return;
         }
 
@@ -155,7 +164,7 @@ public class EventLoggerImpl extends Com
 
     @Override
     public void frameworkEvent(FrameworkEvent event) {
-        if (!m_isStarted) {
+        if (!m_isStarted.get()) {
             return;
         }
         int eventType = AuditEvent.FRAMEWORK_BASE;
@@ -221,10 +230,12 @@ public class EventLoggerImpl extends Com
             if (channel != null) {
                 channel.write(eventType, payload);
             }
+            else {
+                logWarning("Feedback event *not* written as no channel is available!");
+            }
         }
         catch (IOException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
+            logWarning("Failed to write feedback event!", e);
         }
     }
 }

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventsHandlerImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventsHandlerImpl.java?rev=1521521&r1=1521520&r2=1521521&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventsHandlerImpl.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventsHandlerImpl.java Tue Sep 10 15:14:46 2013
@@ -18,15 +18,12 @@
  */
 package org.apache.ace.agent.impl;
 
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.ace.agent.EventListener;
 import org.apache.ace.agent.EventsHandler;
 import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.framework.Filter;
 import org.osgi.framework.ServiceReference;
 import org.osgi.util.tracker.ServiceTracker;
 import org.osgi.util.tracker.ServiceTrackerCustomizer;
@@ -37,45 +34,20 @@ import org.osgi.util.tracker.ServiceTrac
  * {@link #removeListener(EventListener)}.
  */
 public class EventsHandlerImpl extends ComponentBase implements EventsHandler {
-
-    private final List<EventListener> m_listeners = new CopyOnWriteArrayList<EventListener>();
+    private final CopyOnWriteArrayList<EventListener> m_listeners = new CopyOnWriteArrayList<EventListener>();
     private final BundleContext m_bundleContext;
-
-    private ServiceTracker m_tracker;
+    //
+    private volatile ServiceTracker m_tracker;
 
     public EventsHandlerImpl(BundleContext bundleContext) throws Exception {
         super("events");
-        m_bundleContext = bundleContext;
-        Filter listenerFilter = m_bundleContext.createFilter("(" + Constants.OBJECTCLASS + "=" + EventListener.class.getName() + ")");
-        m_tracker = new ServiceTracker(m_bundleContext, listenerFilter, new ServiceTrackerCustomizer() {
-
-            @Override
-            public Object addingService(ServiceReference reference) {
-                Object service = m_bundleContext.getService(reference);
-                addListener((EventListener) service);
-                return service;
-            }
-
-            @Override
-            public void removedService(ServiceReference reference, Object service) {
-                removeListener((EventListener) service);
-            }
 
-            @Override
-            public void modifiedService(ServiceReference reference, Object service) {
-            }
-        });
-    }
-
-    @Override
-    protected void onStart() throws Exception {
-        m_tracker.open();
+        m_bundleContext = bundleContext;
     }
 
     @Override
-    protected void onStop() throws Exception {
-        m_tracker.close();
-        m_listeners.clear();
+    public void addListener(EventListener listener) {
+        m_listeners.addIfAbsent(listener);
     }
 
     @Override
@@ -96,8 +68,13 @@ public class EventsHandlerImpl extends C
     }
 
     @Override
-    public void sendEvent(final String topic, final Map<String, String> payload) {
-        for (final EventListener listener : m_listeners) {
+    public void removeListener(EventListener listener) {
+        m_listeners.remove(listener);
+    }
+
+    @Override
+    public void sendEvent(String topic, Map<String, String> payload) {
+        for (EventListener listener : m_listeners) {
             try {
                 listener.handle(topic, payload);
             }
@@ -108,12 +85,30 @@ public class EventsHandlerImpl extends C
     }
 
     @Override
-    public void addListener(EventListener listener) {
-        m_listeners.add(listener);
+    protected void onInit() throws Exception {
+        m_tracker = new ServiceTracker(m_bundleContext, EventListener.class.getName(), new ServiceTrackerCustomizer() {
+            @Override
+            public Object addingService(ServiceReference reference) {
+                Object service = m_bundleContext.getService(reference);
+                addListener((EventListener) service);
+                return service;
+            }
+
+            @Override
+            public void modifiedService(ServiceReference reference, Object service) {
+            }
+
+            @Override
+            public void removedService(ServiceReference reference, Object service) {
+                removeListener((EventListener) service);
+            }
+        });
+        m_tracker.open();
     }
 
     @Override
-    public void removeListener(EventListener listener) {
-        m_listeners.remove(listener);
+    protected void onStop() throws Exception {
+        m_tracker.close();
+        m_listeners.clear();
     }
 }



Mime
View raw message