karaf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject [1/2] karaf-decanter git commit: [KARAF-3613] Switch to EventAdmin
Date Wed, 18 Mar 2015 08:11:07 GMT
Repository: karaf-decanter
Updated Branches:
  refs/heads/master f19041a49 -> 8b9fd6484


[KARAF-3613] Switch to EventAdmin


Project: http://git-wip-us.apache.org/repos/asf/karaf-decanter/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-decanter/commit/fde7be57
Tree: http://git-wip-us.apache.org/repos/asf/karaf-decanter/tree/fde7be57
Diff: http://git-wip-us.apache.org/repos/asf/karaf-decanter/diff/fde7be57

Branch: refs/heads/master
Commit: fde7be57163a8475b111ad9267f89299c5ad8e2b
Parents: 36f4569
Author: Christian Schneider <chris@die-schneider.net>
Authored: Sat Mar 14 11:53:24 2015 +0100
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Sat Mar 14 11:53:24 2015 +0100

----------------------------------------------------------------------
 README                                          |  4 +-
 .../org/apache/karaf/decanter/api/Appender.java | 34 --------
 .../apache/karaf/decanter/api/Collector.java    | 23 ------
 .../apache/karaf/decanter/api/Dispatcher.java   | 34 --------
 .../karaf/decanter/api/PollingCollector.java    | 34 --------
 appender/elasticsearch/pom.xml                  | 11 ++-
 .../appender/elasticsearch/Activator.java       |  9 +-
 .../elasticsearch/ElasticsearchAppender.java    | 64 +++++++-------
 appender/log/pom.xml                            | 10 +--
 .../karaf/decanter/appender/log/Activator.java  | 23 +++---
 .../decanter/appender/log/LogAppender.java      | 22 ++---
 assembly/src/main/feature/feature.xml           |  2 +-
 collector/jmx/pom.xml                           |  4 +
 .../karaf/decanter/collector/jmx/Activator.java | 42 +++++++---
 .../decanter/collector/jmx/JmxCollector.java    | 70 +++++++++-------
 collector/log/pom.xml                           |  6 --
 .../karaf/decanter/collector/log/Activator.java | 18 ++--
 .../decanter/collector/log/LogAppender.java     | 17 ++--
 .../karaf/decanter/dispatcher/Activator.java    |  9 --
 .../decanter/dispatcher/DefaultDispatcher.java  | 69 ----------------
 scheduler/simple/pom.xml                        |  4 +
 .../decanter/scheduler/simple/Activator.java    | 12 +--
 .../scheduler/simple/SimpleScheduler.java       | 87 +++++++-------------
 .../scheduler/simple/SimpleSchedulerTest.java   | 41 +++++++++
 .../decanter/simple/SimpleSchedulerTest.java    | 38 ---------
 25 files changed, 236 insertions(+), 451 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/README
----------------------------------------------------------------------
diff --git a/README b/README
index f5a18e6..c7fc4ac 100644
--- a/README
+++ b/README
@@ -52,14 +52,14 @@ Apache Karaf Decanter is available as a Karaf features.
 
 You have to register the Decater features repository:
 
-karaf@root()> feature:repo-add mvn:org.apache.karaf.decanter/decanter/3.0.0-SNAPSHOT/xml/features
+karaf@root()> feature:repo-add mvn:org.apache.karaf.decanter/apache-karaf-decanter/3.0.0-SNAPSHOT/xml/features
 
 It's up to you to choose the features to install, depending of the systems that you want:
 
 * decanter-simple-scheduler
 This feature installs a very simple Decanter Scheduler using a Thread.
 
-karaf@root()> feature:install decanter-simple-schedular
+karaf@root()> feature:install decanter-simple-scheduler
 
 * decanter-collector-log
 This feature installs a Decanter Collector listening for all log messages happening in Karaf.

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/api/src/main/java/org/apache/karaf/decanter/api/Appender.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/karaf/decanter/api/Appender.java b/api/src/main/java/org/apache/karaf/decanter/api/Appender.java
deleted file mode 100644
index 37c8dd5..0000000
--- a/api/src/main/java/org/apache/karaf/decanter/api/Appender.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.decanter.api;
-
-import java.util.Map;
-
-/**
- * Append collected data to a target system
- */
-public interface Appender {
-
-    /**
-     * Append data to a target system.
-     *
-     * @param data the data to dispatch.
-     * @throws Exception in case of appending failure.
-     */
-    public void append(Map<Long, Map<String, Object>> data) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/api/src/main/java/org/apache/karaf/decanter/api/Collector.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/karaf/decanter/api/Collector.java b/api/src/main/java/org/apache/karaf/decanter/api/Collector.java
deleted file mode 100644
index 0a58ab4..0000000
--- a/api/src/main/java/org/apache/karaf/decanter/api/Collector.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.decanter.api;
-
-/**
- * Generic decanter collector (can be event driven or polling collector)/
- */
-public interface Collector {
-}

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/api/src/main/java/org/apache/karaf/decanter/api/Dispatcher.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/karaf/decanter/api/Dispatcher.java b/api/src/main/java/org/apache/karaf/decanter/api/Dispatcher.java
deleted file mode 100644
index ae822b8..0000000
--- a/api/src/main/java/org/apache/karaf/decanter/api/Dispatcher.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.decanter.api;
-
-import java.util.Map;
-
-/**
- * Describe the dispatcher service responsible to calling all appender services available.
- */
-public interface Dispatcher {
-
-    /**
-     * Call all appender services available to dispatch collected data.
-     *
-     * @param data the collected data to dispatch.
-     * @throws Exception in case of appending failure.
-     */
-    public void dispatch(Map<Long, Map<String, Object>> data) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/api/src/main/java/org/apache/karaf/decanter/api/PollingCollector.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/karaf/decanter/api/PollingCollector.java b/api/src/main/java/org/apache/karaf/decanter/api/PollingCollector.java
deleted file mode 100644
index 9bfa657..0000000
--- a/api/src/main/java/org/apache/karaf/decanter/api/PollingCollector.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.decanter.api;
-
-import java.util.Map;
-
-/**
- * Interface describing a Decanter polling collector service (for instance log messages, JMX metrics, etc)
- */
-public interface PollingCollector extends Collector {
-
-    /**
-     * Collect data to send to the appender.
-     *
-     * @return the list of collected data.
-     * @throws Exception in case of collection failure.
-     */
-    public Map<Long, Map<String, Object>> collect() throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/appender/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/appender/elasticsearch/pom.xml b/appender/elasticsearch/pom.xml
index 09291dc..9cf873f 100644
--- a/appender/elasticsearch/pom.xml
+++ b/appender/elasticsearch/pom.xml
@@ -29,12 +29,6 @@
 
 	<dependencies>
 
-		<!-- Decanter -->
-		<dependency>
-			<groupId>org.apache.karaf.decanter</groupId>
-			<artifactId>org.apache.karaf.decanter.api</artifactId>
-		</dependency>
-
 		<!-- Elasticsearch -->
 		<dependency>
 			<groupId>org.elasticsearch</groupId>
@@ -47,6 +41,11 @@
 			<groupId>org.osgi</groupId>
 			<artifactId>org.osgi.core</artifactId>
 		</dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.compendium</artifactId>
+        </dependency>
+        
 
 		<!-- JSON builder -->
 		<dependency>

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java
----------------------------------------------------------------------
diff --git a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java
index f43d137..553d778 100644
--- a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java
+++ b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java
@@ -19,9 +19,10 @@ package org.apache.karaf.decanter.appender.elasticsearch;
 import java.util.Dictionary;
 import java.util.Hashtable;
 
-import org.apache.karaf.decanter.api.Appender;
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
 
 public class Activator implements BundleActivator {
 
@@ -32,12 +33,12 @@ public class Activator implements BundleActivator {
         appender = new ElasticsearchAppender("localhost", 9300);
         appender.open();
         Dictionary<String, String> properties = new Hashtable<>();
-        properties.put("name", "elasticsearch");
-        bundleContext.registerService(Appender.class, appender, properties);
+        properties.put(EventConstants.EVENT_TOPIC, "decanter/*");
+        bundleContext.registerService(EventHandler.class, appender, properties);
     }
 
     public void stop(BundleContext bundleContext) {
-        appender.close();;
+        appender.close();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
----------------------------------------------------------------------
diff --git a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
index ab1d791..ce9f517 100644
--- a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
+++ b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
@@ -20,6 +20,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde
 
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TimeZone;
@@ -28,18 +29,19 @@ import javax.json.Json;
 import javax.json.JsonObject;
 import javax.json.JsonObjectBuilder;
 
-import org.apache.karaf.decanter.api.Appender;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Karaf Decanter appender which inserts into Elasticsearch
  */
-public class ElasticsearchAppender implements Appender {
+public class ElasticsearchAppender implements EventHandler {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchAppender.class);
 
@@ -75,27 +77,29 @@ public class ElasticsearchAppender implements Appender {
         client.close();
     }
 
-    public void append(Map<Long, Map<String, Object>> data) throws Exception {
+    @Override
+    public void handleEvent(Event event) {
         try {
-            for (Entry<Long, Map<String, Object>> entry : data.entrySet()) {
-                send(client, new Date(entry.getKey()), entry.getValue());
-            }
+            send(client, event);
         } catch (Exception e) {
             LOGGER.warn("Can't append into Elasticsearch", e);
         }
     }
 
-    private void send(Client client, Date date, Map<String, Object> props) {
-        props.put("@timestamp", tsFormat.format(date));
+    @SuppressWarnings("unchecked")
+    private void send(Client client, Event event) {
+        Map<String, Object> props = new HashMap<>();
+        Long ts = (Long)event.getProperty("timestamp");
+        Date date = ts != null ? new Date((Long)ts) : new Date();
+        
         JsonObjectBuilder jsonObjectBuilder = Json.createObjectBuilder();
-        for (Entry<String, Object> valueEntry : props.entrySet()) {
-            Object value = valueEntry.getValue();
+        jsonObjectBuilder.add("@timestamp", tsFormat.format(date));
+        for (String key : event.getPropertyNames()) {
+            Object value = event.getProperty(key);
             if (value instanceof String) {
-                jsonObjectBuilder.add(valueEntry.getKey(), (String)value);
+                jsonObjectBuilder.add(key, (String) value);
             } else if (value instanceof Map) {
-                @SuppressWarnings("unchecked")
-                JsonObject jsonO = asJson(jsonObjectBuilder, (Map<String, Object>)value);
-                jsonObjectBuilder.add(valueEntry.getKey(), jsonO);
+                jsonObjectBuilder.add(key, build((Map<String, Object>) value));
             }
         }
         JsonObject jsonObject = jsonObjectBuilder.build();
@@ -103,25 +107,27 @@ public class ElasticsearchAppender implements Appender {
         client.prepareIndex(indexName, "karaf_event").setSource(jsonObject.toString()).execute().actionGet();
     }
 
-    private String getIndexName(String prefix, Date date) {
-        return prefix + "-" + indexDateFormat.format(date);
-    }
-
-    private JsonObject asJson(JsonObjectBuilder jsonObjectBuilder, Map<String, Object> value) {
+    private JsonObject build(Map<String, Object> value) {
         JsonObjectBuilder innerBuilder = Json.createObjectBuilder();
         for (Entry<String, Object> innerEntrySet : value.entrySet()) {
-            String key = innerEntrySet.getKey();
-            Object object = innerEntrySet.getValue();
-            if (object instanceof String)
-                innerBuilder.add(key, (String)object);
-            else if (object instanceof Long)
-                innerBuilder.add(key, (Long)object);
-            else if (object instanceof Integer)
-                innerBuilder.add(key, (Integer)object);
-            else if (object instanceof Float)
-                innerBuilder.add(key, (Float)object);
+            addProperty(innerBuilder, innerEntrySet.getKey(), innerEntrySet.getValue());
         }
         return innerBuilder.build();
     }
 
+    private void addProperty(JsonObjectBuilder innerBuilder, String innerKey, Object innerValue) {
+        if (innerValue instanceof String)
+            innerBuilder.add(innerKey, (String) innerValue);
+        else if (innerValue instanceof Long)
+            innerBuilder.add(innerKey, (Long) innerValue);
+        else if (innerValue instanceof Integer)
+            innerBuilder.add(innerKey, (Integer) innerValue);
+        else if (innerValue instanceof Float)
+            innerBuilder.add(innerKey, (Float) innerValue);
+    }
+
+    private String getIndexName(String prefix, Date date) {
+        return prefix + "-" + indexDateFormat.format(date);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/appender/log/pom.xml
----------------------------------------------------------------------
diff --git a/appender/log/pom.xml b/appender/log/pom.xml
index 2e93e11..788555f 100644
--- a/appender/log/pom.xml
+++ b/appender/log/pom.xml
@@ -35,17 +35,15 @@
 
     <dependencies>
 
-        <!-- Decanter API -->
-        <dependency>
-            <groupId>org.apache.karaf.decanter</groupId>
-            <artifactId>org.apache.karaf.decanter.api</artifactId>
-        </dependency>
-
         <!-- OSGi -->
         <dependency>
             <groupId>org.osgi</groupId>
             <artifactId>org.osgi.core</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.compendium</artifactId>
+        </dependency>
 
         <!-- SLF4J -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/Activator.java
----------------------------------------------------------------------
diff --git a/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/Activator.java b/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/Activator.java
index c244d84..789ce8c 100644
--- a/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/Activator.java
+++ b/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/Activator.java
@@ -16,29 +16,26 @@
  */
 package org.apache.karaf.decanter.appender.log;
 
-import org.apache.karaf.decanter.api.Appender;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.Properties;
+
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
-
-import java.util.Dictionary;
-import java.util.Properties;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
 
 public class Activator implements BundleActivator {
 
-    private ServiceRegistration service;
-
     public void start(BundleContext bundleContext) {
-        Appender appender = new LogAppender();
-        Properties properties = new Properties();
-        properties.put("name", "log");
-        service = bundleContext.registerService(Appender.class, appender, (Dictionary) properties);
+        LogAppender appender = new LogAppender();
+        Dictionary<String, String> properties = new Hashtable<>();
+        properties.put(EventConstants.EVENT_TOPIC, "decanter/events/*");
+        bundleContext.registerService(EventHandler.class, appender, properties);
     }
 
     public void stop(BundleContext bundleContext) {
-        if (service != null) {
-            service.unregister();
-        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/LogAppender.java
----------------------------------------------------------------------
diff --git a/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/LogAppender.java b/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/LogAppender.java
index af60665..742dc18 100644
--- a/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/LogAppender.java
+++ b/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/LogAppender.java
@@ -16,29 +16,25 @@
  */
 package org.apache.karaf.decanter.appender.log;
 
-import org.apache.karaf.decanter.api.Appender;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 /**
  * Karaf Decanter Log Appender, logging the collected data using.
  */
-public class LogAppender implements Appender {
+public class LogAppender implements EventHandler {
 
     private final Logger LOGGER = LoggerFactory.getLogger(LogAppender.class);
 
-    public void append(Map<Long, Map<String, Object>> data) {
-        for (Long key : data.keySet()) {
-            Map<String, Object> inner = data.get(key);
-            StringBuilder builder = new StringBuilder();
-            builder.append(key).append(" - ");
-            for (String innerKey : inner.keySet()) {
-                builder.append(innerKey).append(":").append(inner.get(innerKey).toString()).append(" | ");
-            }
-            LOGGER.info(builder.toString());
+    @Override
+    public void handleEvent(Event event) {
+        StringBuilder builder = new StringBuilder();
+        for (String innerKey : event.getPropertyNames()) {
+            builder.append(innerKey).append(":").append(event.getProperty(innerKey).toString()).append(" | ");
         }
+        LOGGER.info(builder.toString());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/assembly/src/main/feature/feature.xml
----------------------------------------------------------------------
diff --git a/assembly/src/main/feature/feature.xml b/assembly/src/main/feature/feature.xml
index 98eba4a..933a08c 100644
--- a/assembly/src/main/feature/feature.xml
+++ b/assembly/src/main/feature/feature.xml
@@ -19,8 +19,8 @@
 <features name="karaf-decanter-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.2.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.2.1 http://karaf.apache.org/xmlns/features/v1.2.1">
 
     <feature name="decanter-common" version="${project.version}" description="Karaf Decanter API">
+        <feature>eventadmin</feature>
         <bundle>mvn:org.apache.karaf.decanter/org.apache.karaf.decanter.api/${project.version}</bundle>
-        <bundle>mvn:org.apache.karaf.decanter/org.apache.karaf.decanter.dispatcher/${project.version}</bundle>
     </feature>
 
     <feature name="decanter-simple-scheduler" version="${project.version}" description="Karaf Decanter Simple Scheduler">

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/collector/jmx/pom.xml
----------------------------------------------------------------------
diff --git a/collector/jmx/pom.xml b/collector/jmx/pom.xml
index fa74e18..bf4357b 100644
--- a/collector/jmx/pom.xml
+++ b/collector/jmx/pom.xml
@@ -46,6 +46,10 @@
             <groupId>org.osgi</groupId>
             <artifactId>org.osgi.core</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.compendium</artifactId>
+        </dependency>
 
         <!-- SLF4J -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/Activator.java
----------------------------------------------------------------------
diff --git a/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/Activator.java b/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/Activator.java
index 277cb7a..d55dd3f 100644
--- a/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/Activator.java
+++ b/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/Activator.java
@@ -16,29 +16,45 @@
  */
 package org.apache.karaf.decanter.collector.jmx;
 
-import org.apache.karaf.decanter.api.PollingCollector;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
 import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.util.tracker.ServiceTracker;
 
-import java.util.Dictionary;
-import java.util.Properties;
-
+@SuppressWarnings("rawtypes")
 public class Activator implements BundleActivator {
 
-    private ServiceRegistration service;
+    private ServiceTracker<EventAdmin, ServiceRegistration> tracker;
+
+    public void start(final BundleContext bundleContext) throws Exception {
+        tracker = new ServiceTracker<EventAdmin, ServiceRegistration>(bundleContext, EventAdmin.class, null) {
+
+            @Override
+            public ServiceRegistration<?> addingService(ServiceReference<EventAdmin> reference) {
+                EventAdmin eventAdmin = bundleContext.getService(reference);
+                JmxCollector collector = new JmxCollector(eventAdmin);
+                Dictionary<String, String> properties = new Hashtable<String, String>();
+                properties.put("decanter.collector.name", "jmx");
+                return bundleContext.registerService(Runnable.class, collector, properties);
+            }
 
-    public void start(BundleContext bundleContext) throws Exception {
-        JmxCollector collector = new JmxCollector();
-        Properties properties = new Properties();
-        properties.put("name", "jmx");
-        service = bundleContext.registerService(PollingCollector.class, collector, (Dictionary) properties);
+            @Override
+            public void removedService(ServiceReference<EventAdmin> reference, ServiceRegistration reg) {
+                reg.unregister();
+                super.removedService(reference, reg);
+            }
+            
+        };
+        tracker.open();
     }
 
     public void stop(BundleContext bundleContext) throws Exception {
-        if (service != null) {
-            service.unregister();
-        }
+        tracker.close();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java
----------------------------------------------------------------------
diff --git a/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java b/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java
index 6fb5dd9..5f04026 100644
--- a/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java
+++ b/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java
@@ -27,56 +27,66 @@ import javax.management.ObjectName;
 import javax.management.openmbean.CompositeDataSupport;
 import javax.management.openmbean.CompositeType;
 
-import org.apache.karaf.decanter.api.PollingCollector;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Decanter JMX Pooling Collector
  */
-public class JmxCollector implements PollingCollector {
+public class JmxCollector implements Runnable {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(JmxCollector.class);
+    private EventAdmin eventAdmin;
 
-    public Map<Long, Map<String, Object>> collect() throws Exception {
-        LOGGER.debug("Karaf Decanter JMX Collector starts harvesting ...");
+    public JmxCollector(EventAdmin eventAdmin) {
+        this.eventAdmin = eventAdmin;
+    }
 
-        Map<Long, Map<String, Object>> collected = new HashMap<>();
+    @Override
+    public void run() {
+        LOGGER.debug("Karaf Decanter JMX Collector starts harvesting ...");
 
         // TODO be able to pool remote JMX
         MBeanServer server = ManagementFactory.getPlatformMBeanServer();
         Set<ObjectName> names = server.queryNames(null, null);
         for (ObjectName name : names) {
-            MBeanAttributeInfo[] attributes = server.getMBeanInfo(name).getAttributes();
-            Map<String, Object> data = new HashMap<>();
-            data.put("mbean", name.toString());
-            for (MBeanAttributeInfo attribute : attributes) {
-                // TODO add SLA check on attributes and filtering
-                try {
-                    Object attributeObject = server.getAttribute(name, attribute.getName());
-                    if (attributeObject instanceof String) {
-                    	data.put(attribute.getName(), (String) attributeObject);
-                    } else if (attributeObject instanceof CompositeDataSupport) {
-                    	CompositeDataSupport cds = (CompositeDataSupport) attributeObject;
-                    	CompositeType compositeType = cds.getCompositeType();
-                    	Set<String> keySet = compositeType.keySet();
-                    	Map<String, Object> composite = new HashMap<String, Object>();
-                    	for (String key : keySet) {
-							Object cdsObject = cds.get(key);
-							composite.put(key, cdsObject);
-						}
-                    	data.put(attribute.getName(), composite);
-                    }
-                } catch (Exception e) {
-                    // LOGGER.warn("Can't put MBean {} attribute {} in collected data", name.toString(), attribute.getName(), e);
-                }
+            try {
+                Map<String, Object> data = harvestBean(server, name);
+                Event event = new Event("decanter/jmx", data);
+                eventAdmin.postEvent(event);
+            } catch (Exception e) {
+                LOGGER.warn("Error reading mbean " + name, e);
             }
-            collected.put(System.currentTimeMillis(), data);
         }
 
         LOGGER.debug("Karaf Decanter JMX Collector harvesting done");
+    }
 
-        return collected;
+    private Map<String, Object> harvestBean(MBeanServer server, ObjectName name) throws Exception {
+        MBeanAttributeInfo[] attributes = server.getMBeanInfo(name).getAttributes();
+        Map<String, Object> data = new HashMap<>();
+        data.put("mbean", name.toString());
+        for (MBeanAttributeInfo attribute : attributes) {
+            // TODO add SLA check on attributes and filtering
+            Object attributeObject = server.getAttribute(name, attribute.getName());
+            if (attributeObject instanceof String) {
+                data.put(attribute.getName(), (String)attributeObject);
+            } else if (attributeObject instanceof CompositeDataSupport) {
+                CompositeDataSupport cds = (CompositeDataSupport)attributeObject;
+                CompositeType compositeType = cds.getCompositeType();
+                Set<String> keySet = compositeType.keySet();
+                Map<String, Object> composite = new HashMap<String, Object>();
+                for (String key : keySet) {
+                    Object cdsObject = cds.get(key);
+                    composite.put(key, cdsObject);
+                }
+                data.put(attribute.getName(), composite);
+            }
+
+        }
+        return data;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/collector/log/pom.xml
----------------------------------------------------------------------
diff --git a/collector/log/pom.xml b/collector/log/pom.xml
index ee6fc59..898ce55 100644
--- a/collector/log/pom.xml
+++ b/collector/log/pom.xml
@@ -35,12 +35,6 @@
 
     <dependencies>
 
-        <!-- Decanter API -->
-        <dependency>
-            <groupId>org.apache.karaf.decanter</groupId>
-            <artifactId>org.apache.karaf.decanter.api</artifactId>
-        </dependency>
-
         <!-- Pax Logging -->
         <dependency>
             <groupId>org.ops4j.pax.logging</groupId>

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/Activator.java
----------------------------------------------------------------------
diff --git a/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/Activator.java b/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/Activator.java
index c358b4d..f1a581d 100644
--- a/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/Activator.java
+++ b/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/Activator.java
@@ -19,36 +19,34 @@ package org.apache.karaf.decanter.collector.log;
 import java.util.Dictionary;
 import java.util.Properties;
 
-import org.apache.karaf.decanter.api.Collector;
-import org.apache.karaf.decanter.api.Dispatcher;
 import org.ops4j.pax.logging.spi.PaxAppender;
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
 import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.EventAdmin;
 import org.osgi.util.tracker.ServiceTracker;
 
 @SuppressWarnings("rawtypes")
 public class Activator implements BundleActivator {
-    private ServiceTracker<Dispatcher, ServiceRegistration> tracker;
+    private ServiceTracker<EventAdmin, ServiceRegistration> tracker;
 
     public void start(final BundleContext bundleContext) {
-        tracker = new ServiceTracker<Dispatcher, ServiceRegistration>(bundleContext, Dispatcher.class, null) {
+        tracker = new ServiceTracker<EventAdmin, ServiceRegistration>(bundleContext, EventAdmin.class, null) {
 
             @SuppressWarnings("unchecked")
             @Override
-            public ServiceRegistration<?> addingService(ServiceReference<Dispatcher> reference) {
+            public ServiceRegistration<?> addingService(ServiceReference<EventAdmin> reference) {
                 Properties properties = new Properties();
                 properties.put("org.ops4j.pax.logging.appender.name", "DecanterLogCollectorAppender");
                 properties.put("name", "log");
-                String[] ifAr = new String[] { PaxAppender.class.getName(), Collector.class.getName() };
-                Dispatcher dispatcher = bundleContext.getService(reference);
-                LogAppender appender = new LogAppender(dispatcher);
-                return bundleContext.registerService(ifAr , appender, (Dictionary) properties);
+                EventAdmin eventAdmin = bundleContext.getService(reference);
+                LogAppender appender = new LogAppender(eventAdmin);
+                return bundleContext.registerService(PaxAppender.class , appender, (Dictionary) properties);
             }
 
             @Override
-            public void removedService(ServiceReference<Dispatcher> reference, ServiceRegistration reg) {
+            public void removedService(ServiceReference<EventAdmin> reference, ServiceRegistration reg) {
                 reg.unregister();
                 super.removedService(reference, reg);
             }

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogAppender.java
----------------------------------------------------------------------
diff --git a/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogAppender.java b/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogAppender.java
index 7b27aab..0b52e30 100644
--- a/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogAppender.java
+++ b/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogAppender.java
@@ -19,24 +19,24 @@ package org.apache.karaf.decanter.collector.log;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.karaf.decanter.api.Collector;
-import org.apache.karaf.decanter.api.Dispatcher;
 import org.apache.log4j.MDC;
 import org.ops4j.pax.logging.spi.PaxAppender;
 import org.ops4j.pax.logging.spi.PaxLoggingEvent;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Decanter log collector, event driven implementing a PaxAppender
  */
-public class LogAppender implements PaxAppender, Collector {
+public class LogAppender implements PaxAppender {
     private static final String MDC_IN_LOG_APPENDER = "inLogAppender";
     private final static String[] ignoredCategories = {"org.apache.karaf.decanter"};
     private final static Logger LOGGER = LoggerFactory.getLogger(LogAppender.class);
-    private Dispatcher dispatcher;
+    private EventAdmin dispatcher;
     
-    public LogAppender(Dispatcher dispatcher) {
+    public LogAppender(EventAdmin dispatcher) {
         this.dispatcher = dispatcher;
     }
 
@@ -58,8 +58,8 @@ public class LogAppender implements PaxAppender, Collector {
     private void appendInternal(PaxLoggingEvent event) throws Exception {
         LOGGER.debug("Karaf Decanter Log Collector hooked ...");
 
-        Map<Long, Map<String, Object>> collected = new HashMap<>();
         Map<String, Object> data = new HashMap<>();
+        data.put("timeStamp", event.getTimeStamp());
         data.put("loggerClass", event.getFQNOfLoggerClass());
         data.put("loggerName", event.getLoggerName());
         data.put("threadName", event.getThreadName());
@@ -67,11 +67,10 @@ public class LogAppender implements PaxAppender, Collector {
         data.put("level", event.getLevel().toString());
         data.put("renderedMessage", event.getRenderedMessage());
         data.put("MDC", event.getProperties());
-        collected.put(event.getTimeStamp(), data);
 
         if (!isIgnored(event.getLoggerName())) {
-            LOGGER.debug("Calling the Karaf Decanter Appender Controller ...");
-            this.dispatcher.dispatch(collected);
+            String topic = "decanter/log/" + event.getLoggerName().replace(".", "/");
+            this.dispatcher.postEvent(new Event(topic, data));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/dispatcher/src/main/java/org/apache/karaf/decanter/dispatcher/Activator.java
----------------------------------------------------------------------
diff --git a/dispatcher/src/main/java/org/apache/karaf/decanter/dispatcher/Activator.java b/dispatcher/src/main/java/org/apache/karaf/decanter/dispatcher/Activator.java
index d709892..f2a2919 100644
--- a/dispatcher/src/main/java/org/apache/karaf/decanter/dispatcher/Activator.java
+++ b/dispatcher/src/main/java/org/apache/karaf/decanter/dispatcher/Activator.java
@@ -16,24 +16,15 @@
  */
 package org.apache.karaf.decanter.dispatcher;
 
-import org.apache.karaf.decanter.api.Dispatcher;
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
 
 public class Activator implements BundleActivator {
 
-    private ServiceRegistration service;
-
     public void start(BundleContext bundleContext) {
-        Dispatcher dispatcher = new DefaultDispatcher(bundleContext);
-        service = bundleContext.registerService(Dispatcher.class, dispatcher, null);
     }
 
     public void stop(BundleContext bundleContext) {
-        if (service != null) {
-            service.unregister();
-        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/dispatcher/src/main/java/org/apache/karaf/decanter/dispatcher/DefaultDispatcher.java
----------------------------------------------------------------------
diff --git a/dispatcher/src/main/java/org/apache/karaf/decanter/dispatcher/DefaultDispatcher.java b/dispatcher/src/main/java/org/apache/karaf/decanter/dispatcher/DefaultDispatcher.java
deleted file mode 100644
index e8cb364..0000000
--- a/dispatcher/src/main/java/org/apache/karaf/decanter/dispatcher/DefaultDispatcher.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.decanter.dispatcher;
-
-import org.apache.karaf.decanter.api.Appender;
-import org.apache.karaf.decanter.api.Dispatcher;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * Default dispatcher
- */
-public class DefaultDispatcher implements Dispatcher {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(DefaultDispatcher.class);
-
-    private BundleContext bundleContext;
-
-    public DefaultDispatcher(BundleContext bundleContext) {
-        this.bundleContext = bundleContext;
-    }
-
-    public void dispatch(Map<Long, Map<String, Object>> data) throws Exception {
-        LOGGER.debug("Dispatching collected data");
-
-        Collection<ServiceReference<Appender>> references = bundleContext.getServiceReferences(Appender.class, null);
-        if (references != null) {
-            for (ServiceReference reference : references) {
-                try {
-                    Appender appender = (Appender) bundleContext.getService(reference);
-                    appender.append(data);
-                } catch (Exception e) {
-                    LOGGER.warn("Can't dispatch collected data", e);
-                } finally {
-                    bundleContext.ungetService(reference);
-                }
-            }
-        }
-        LOGGER.debug("Dispatching done");
-    }
-
-    public BundleContext getBundleContext() {
-        return bundleContext;
-    }
-
-    public void setBundleContext(BundleContext bundleContext) {
-        this.bundleContext = bundleContext;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/scheduler/simple/pom.xml
----------------------------------------------------------------------
diff --git a/scheduler/simple/pom.xml b/scheduler/simple/pom.xml
index 0ca78a6..fcc8a9f 100644
--- a/scheduler/simple/pom.xml
+++ b/scheduler/simple/pom.xml
@@ -50,6 +50,10 @@
             <groupId>org.osgi</groupId>
             <artifactId>org.osgi.core</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.compendium</artifactId>
+        </dependency>
 
         <!-- test -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/Activator.java
----------------------------------------------------------------------
diff --git a/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/Activator.java b/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/Activator.java
index d723eb5..2349e41 100644
--- a/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/Activator.java
+++ b/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/Activator.java
@@ -19,25 +19,19 @@ package org.apache.karaf.decanter.scheduler.simple;
 import org.apache.karaf.decanter.api.Scheduler;
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
 
 public class Activator implements BundleActivator {
 
     private SimpleScheduler scheduler;
-    private ServiceRegistration service;
 
     public void start(BundleContext bundleContext) {
         scheduler = new SimpleScheduler(bundleContext);
-        service = bundleContext.registerService(Scheduler.class, scheduler, null);
+        scheduler.start();
+        bundleContext.registerService(Scheduler.class, scheduler, null);
     }
 
     public void stop(BundleContext bundleContext) {
-        if (scheduler != null) {
-            scheduler.stop();
-        }
-        if (service != null) {
-            service.unregister();
-        }
+        scheduler.stop();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/SimpleScheduler.java
----------------------------------------------------------------------
diff --git a/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/SimpleScheduler.java b/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/SimpleScheduler.java
index e42cf98..dd1ee99 100644
--- a/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/SimpleScheduler.java
+++ b/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/SimpleScheduler.java
@@ -16,78 +16,52 @@
  */
 package org.apache.karaf.decanter.scheduler.simple;
 
-import org.apache.karaf.decanter.api.Dispatcher;
-import org.apache.karaf.decanter.api.PollingCollector;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.karaf.decanter.api.Scheduler;
 import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
+import org.osgi.framework.Filter;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.util.tracker.ServiceTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * Very simple Decanter scheduler using a single thread.
  */
 public class SimpleScheduler implements Runnable, Scheduler {
-
     private final static Logger LOGGER = LoggerFactory.getLogger(SimpleScheduler.class);
 
-    private BundleContext bundleContext;
     private AtomicBoolean running = new AtomicBoolean(false);
     private long interval = 30000L;
-
-    public SimpleScheduler() {
-        this.start();
+    ServiceTracker<Runnable, Runnable> collectors;
+    
+    SimpleScheduler() {
     }
-
+    
     public SimpleScheduler(BundleContext bundleContext) {
-        this.bundleContext = bundleContext;
-        this.start();
+        this.collectors = new ServiceTracker<>(bundleContext, collectorFilter(bundleContext), null);
+        this.collectors.open();
+    }
+
+    private Filter collectorFilter(BundleContext bundleContext) {
+        try {
+            return bundleContext.createFilter(String.format("(&(objectClass=%s)(decanter.collector.name=*))", Runnable.class.getName()));
+        } catch (InvalidSyntaxException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     public void run() {
         LOGGER.debug("Decanter SimpleScheduler thread started ...");
 
         while (running.get()) {
-            Map<Long, Map<String, Object>> collected = new HashMap<>();
-            try {
-                LOGGER.debug("Calling the collectors ...");
-                Collection<ServiceReference<PollingCollector>> references = bundleContext.getServiceReferences(PollingCollector.class, null);
-                if (references != null) {
-                    for (ServiceReference<PollingCollector> reference : references) {
-                        try {
-                            if (reference != null) {
-                                PollingCollector collector = bundleContext.getService(reference);
-                                Map<Long, Map<String, Object>> data = collector.collect();
-                                collected.putAll(data);
-                            }
-                        } catch (Exception e) {
-                            LOGGER.warn("Can't collect data", e);
-                        } finally {
-                            bundleContext.ungetService(reference);
-                        }
-                    }
-                }
-            } catch (Exception e) {
-                LOGGER.warn("Can't get polling collector services", e);
-            }
-            ServiceReference<Dispatcher> reference = null;
-            try {
-                LOGGER.debug("Calling the dispatcher ...");
-                reference = bundleContext.getServiceReference(Dispatcher.class);
-                if (reference != null) {
-                    Dispatcher dispatcher = bundleContext.getService(reference);
-                    dispatcher.dispatch(collected);
-                }
-            } catch (Exception e) {
-                LOGGER.warn("Can't dispatch using the controller", e);
-            } finally {
-                if (reference != null) {
-                    bundleContext.ungetService(reference);
+            LOGGER.debug("Calling the collectors ...");
+            for (Runnable collector : collectors.getServices(new Runnable[] {})) {
+                try {
+                    collector.run();
+                } catch (Exception e) {
+                    LOGGER.warn("Can't collect data", e);
                 }
             }
             try {
@@ -102,6 +76,9 @@ public class SimpleScheduler implements Runnable, Scheduler {
 
     public void stop() {
         running.set(false);
+        if (collectors != null) {
+            this.collectors.close();
+        }
     }
 
     public void start() {
@@ -123,12 +100,4 @@ public class SimpleScheduler implements Runnable, Scheduler {
         }
     }
 
-    public BundleContext getBundleContext() {
-        return bundleContext;
-    }
-
-    public void setBundleContext(BundleContext bundleContext) {
-        this.bundleContext = bundleContext;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/scheduler/simple/src/test/java/org/apache/karaf/decanter/scheduler/simple/SimpleSchedulerTest.java
----------------------------------------------------------------------
diff --git a/scheduler/simple/src/test/java/org/apache/karaf/decanter/scheduler/simple/SimpleSchedulerTest.java b/scheduler/simple/src/test/java/org/apache/karaf/decanter/scheduler/simple/SimpleSchedulerTest.java
new file mode 100644
index 0000000..3d86976
--- /dev/null
+++ b/scheduler/simple/src/test/java/org/apache/karaf/decanter/scheduler/simple/SimpleSchedulerTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.scheduler.simple;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.karaf.decanter.api.Scheduler;
+import org.apache.karaf.decanter.scheduler.simple.SimpleScheduler;
+import org.junit.Test;
+
+public class SimpleSchedulerTest {
+
+    @Test
+    public void testSimpleScheduler() throws Exception {
+        Scheduler scheduler = new SimpleScheduler();
+        scheduler.start();
+        assertTrue(scheduler.isStarted());
+        assertEquals("Started", scheduler.state());
+
+        scheduler.stop();
+        assertFalse(scheduler.isStarted());
+        assertEquals("Stopped", scheduler.state());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/fde7be57/scheduler/simple/src/test/java/org/apache/karaf/decanter/simple/SimpleSchedulerTest.java
----------------------------------------------------------------------
diff --git a/scheduler/simple/src/test/java/org/apache/karaf/decanter/simple/SimpleSchedulerTest.java b/scheduler/simple/src/test/java/org/apache/karaf/decanter/simple/SimpleSchedulerTest.java
deleted file mode 100644
index dda757a..0000000
--- a/scheduler/simple/src/test/java/org/apache/karaf/decanter/simple/SimpleSchedulerTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.decanter.simple;
-
-import org.apache.karaf.decanter.api.Scheduler;
-import org.apache.karaf.decanter.scheduler.simple.SimpleScheduler;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class SimpleSchedulerTest {
-
-    @Test
-    public void testSimpleScheduler() throws Exception {
-        Scheduler scheduler = new SimpleScheduler();
-        assertTrue(scheduler.isStarted());
-        assertEquals("Started", scheduler.state());
-
-        scheduler.stop();
-        assertFalse(scheduler.isStarted());
-        assertEquals("Stopped", scheduler.state());
-    }
-
-}


Mime
View raw message