aries-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject [44/50] [abbrv] aries-rsa git commit: Switching to aries package names
Date Fri, 11 Mar 2016 19:43:43 GMT
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
new file mode 100644
index 0000000..02d9674
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/Activator.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.aries.rsa.spi.ExportPolicy;
+import org.apache.aries.rsa.topologymanager.exporter.DefaultExportPolicy;
+import org.apache.aries.rsa.topologymanager.exporter.EndpointListenerNotifier;
+import org.apache.aries.rsa.topologymanager.exporter.EndpointRepository;
+import org.apache.aries.rsa.topologymanager.exporter.TopologyManagerExport;
+import org.apache.aries.rsa.topologymanager.importer.TopologyManagerImport;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceEvent;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Activator implements BundleActivator {
+    public static final String RSA_EXPORT_POLICY_FILTER = "rsa.export.policy.filter";
+    static final String DOSGI_SERVICES = "(" + RemoteConstants.SERVICE_EXPORTED_INTERFACES + "=*)";
+    private static final Logger LOG = LoggerFactory.getLogger(Activator.class);
+
+    private TopologyManagerExport exportManager;
+    private TopologyManagerImport importManager;
+    private EndpointListenerNotifier notifier;
+    private ServiceTracker<RemoteServiceAdmin, RemoteServiceAdmin> rsaTracker;
+    private ThreadPoolExecutor exportExecutor;
+    private ServiceTracker<EndpointListener, EndpointListener> epListenerTracker;
+    private ServiceTracker<ExportPolicy, ExportPolicy> policyTracker;
+
+    public void start(final BundleContext bc) throws Exception {
+        Dictionary<String, String> props = new Hashtable<String, String>();
+        props.put("name", "default");
+        bc.registerService(ExportPolicy.class, new DefaultExportPolicy(), props);
+
+        Filter policyFilter = exportPolicyFilter(bc);
+        policyTracker = new ServiceTracker<ExportPolicy, ExportPolicy>(bc, policyFilter, null) {
+
+            @Override
+            public ExportPolicy addingService(ServiceReference<ExportPolicy> reference) {
+                ExportPolicy policy = super.addingService(reference);
+                if (exportManager == null) {
+                    doStart(bc, policy);
+                }
+                return policy;
+            }
+
+            @Override
+            public void removedService(ServiceReference<ExportPolicy> reference, ExportPolicy service) {
+                if (exportManager != null) {
+                    doStop(bc);
+                }
+                super.removedService(reference, service);
+            }
+        };
+        policyTracker.open();
+    }
+
+    private Filter exportPolicyFilter(BundleContext bc) throws InvalidSyntaxException {
+        String filter = bc.getProperty(RSA_EXPORT_POLICY_FILTER);
+        if (filter == null) {
+            filter = "(name=default)";
+        }
+        return FrameworkUtil.createFilter(String.format("(&(objectClass=%s)%s)", ExportPolicy.class.getName(), filter));
+    }
+
+    public void doStart(final BundleContext bc, ExportPolicy policy) {
+        LOG.debug("TopologyManager: start()");
+        EndpointRepository endpointRepo = new EndpointRepository();
+        notifier = new EndpointListenerNotifier(endpointRepo);
+        epListenerTracker = new EndpointListenerTracker(bc);
+        endpointRepo.setNotifier(notifier);
+        exportExecutor = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+        exportManager = new TopologyManagerExport(endpointRepo, exportExecutor, policy);
+        importManager = new TopologyManagerImport(bc);
+        rsaTracker = new RSATracker(bc, RemoteServiceAdmin.class, null);
+        bc.addServiceListener(exportManager);
+        rsaTracker.open();
+        epListenerTracker.open();
+        exportExistingServices(bc);
+        importManager.start();
+    }
+
+    public void stop(BundleContext bc) throws Exception {
+        policyTracker.close();
+    }
+
+    public void doStop(BundleContext bc) {
+        LOG.debug("TopologyManager: stop()");
+        epListenerTracker.close();
+        bc.removeServiceListener(exportManager);
+        exportExecutor.shutdown();
+        importManager.stop();
+        rsaTracker.close();
+        exportManager = null;
+    }
+
+    public void exportExistingServices(BundleContext context) {
+        try {
+            // cast to String is necessary for compiling against OSGi core version >= 4.3
+            ServiceReference<?>[] references = context.getServiceReferences((String)null, DOSGI_SERVICES);
+            if (references != null) {
+                for (ServiceReference<?> sref : references) {
+                    exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref));
+                }
+            }
+        } catch (InvalidSyntaxException e) {
+            LOG.error("Error in filter {}. This should not occur!", DOSGI_SERVICES);
+        }
+    }
+    
+    private final class EndpointListenerTracker extends ServiceTracker<EndpointListener, EndpointListener> {
+        private EndpointListenerTracker(BundleContext context) {
+            super(context, EndpointListener.class, null);
+        }
+
+        @Override
+        public EndpointListener addingService(ServiceReference<EndpointListener> reference) {
+            EndpointListener listener = super.addingService(reference);
+            notifier.add(listener, EndpointListenerNotifier.getFiltersFromEndpointListenerScope(reference));
+            return listener;
+        }
+
+        @Override
+        public void modifiedService(ServiceReference<EndpointListener> reference,
+                                    EndpointListener listener) {
+            super.modifiedService(reference, listener);
+            notifier.add(listener, EndpointListenerNotifier.getFiltersFromEndpointListenerScope(reference));
+        }
+
+        @Override
+        public void removedService(ServiceReference<EndpointListener> reference,
+                                   EndpointListener listener) {
+            notifier.remove(listener);
+            super.removedService(reference, listener);
+        }
+    }
+
+    private final class RSATracker extends ServiceTracker<RemoteServiceAdmin, RemoteServiceAdmin> {
+        private RSATracker(BundleContext context, Class<RemoteServiceAdmin> clazz,
+                           ServiceTrackerCustomizer<RemoteServiceAdmin, RemoteServiceAdmin> customizer) {
+            super(context, clazz, customizer);
+        }
+
+        @Override
+        public RemoteServiceAdmin addingService(ServiceReference<RemoteServiceAdmin> reference) {
+            RemoteServiceAdmin rsa = super.addingService(reference);
+            LOG.debug("New RemoteServiceAdmin {} detected, trying to import and export services with it", rsa);
+            importManager.add(rsa);
+            exportManager.add(rsa);
+            return rsa;
+        }
+
+        @Override
+        public void removedService(ServiceReference<RemoteServiceAdmin> reference,
+                                   RemoteServiceAdmin rsa) {
+            exportManager.remove(rsa);
+            importManager.remove(rsa);
+            super.removedService(reference, rsa);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/DefaultExportPolicy.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/DefaultExportPolicy.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/DefaultExportPolicy.java
new file mode 100644
index 0000000..d867ccf
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/DefaultExportPolicy.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.exporter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.aries.rsa.spi.ExportPolicy;
+import org.osgi.framework.ServiceReference;
+
+/**
+ * The default is to not customize the way services are exported
+ */
+public class DefaultExportPolicy implements ExportPolicy {
+
+    @Override
+    public Map<String, ?> additionalParameters(ServiceReference<?> sref) {
+        return new HashMap<String, Object>();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/EndpointListenerNotifier.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/EndpointListenerNotifier.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/EndpointListenerNotifier.java
new file mode 100644
index 0000000..9b0386a
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/EndpointListenerNotifier.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.exporter;
+
+import java.util.Dictionary;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.aries.rsa.util.StringPlus;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracks EndpointListeners and allows to notify them of endpoints.
+ */
+public class EndpointListenerNotifier implements EndpointListener {
+    private static final Logger LOG = LoggerFactory.getLogger(EndpointListenerNotifier.class);
+    private enum NotifyType { ADDED, REMOVED };
+    private Map<EndpointListener, Set<Filter>> listeners;
+    private EndpointRepository endpointRepo;
+
+    public EndpointListenerNotifier(final EndpointRepository endpointRepo) {
+        this.endpointRepo = endpointRepo;
+        this.listeners = new ConcurrentHashMap<EndpointListener, Set<Filter>>();
+    }
+    
+    public static Set<Filter> getFiltersFromEndpointListenerScope(ServiceReference<EndpointListener> sref) {
+        Set<Filter> filters = new HashSet<Filter>();
+        List<String> scopes = StringPlus.normalize(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE));
+        for (String scope : scopes) {
+            try {
+                filters.add(FrameworkUtil.createFilter(scope));
+            } catch (InvalidSyntaxException e) {
+                LOG.error("invalid endpoint listener scope: {}", scope, e);
+            }
+        }
+        return filters;
+    }
+
+    public void add(EndpointListener ep, Set<Filter> filters) {
+        LOG.debug("new EndpointListener detected");
+        listeners.put(ep, filters);
+        for (EndpointDescription endpoint : endpointRepo.getAllEndpoints()) {
+            notifyListener(NotifyType.ADDED, ep, filters, endpoint);
+        }
+    }
+    
+    public void remove(EndpointListener ep) {
+        LOG.debug("EndpointListener modified");
+        listeners.remove(ep);
+    }
+    
+    @Override
+    public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
+        notifyListeners(NotifyType.ADDED, endpoint);
+    }
+
+    @Override
+    public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
+        notifyListeners(NotifyType.REMOVED, endpoint);
+    }
+
+    /**
+     * Notifies all endpoint listeners about endpoints being added or removed.
+     *
+     * @param added specifies whether endpoints were added (true) or removed (false)
+     * @param endpoints the endpoints the listeners should be notified about
+     */
+    private void notifyListeners(NotifyType type, EndpointDescription endpoint) {
+        for (EndpointListener listener : listeners.keySet()) {
+            notifyListener(type, listener, listeners.get(listener), endpoint);
+        }
+    }
+
+    /**
+     * Notifies an endpoint listener about endpoints being added or removed.
+     *
+     * @param type specifies whether endpoints were added (true) or removed (false)
+     * @param endpointListenerRef the ServiceReference of an EndpointListener to notify
+     * @param endpoints the endpoints the listener should be notified about
+     */
+    private void notifyListener(NotifyType type, EndpointListener listener, Set<Filter> filters, 
+                        EndpointDescription endpoint) {
+        LOG.debug("Endpoint {}", type);
+        Set<Filter> matchingFilters = getMatchingFilters(filters, endpoint);
+        for (Filter filter : matchingFilters) {
+            if (type == NotifyType.ADDED) {
+                listener.endpointAdded(endpoint, filter.toString());
+            } else {
+                listener.endpointRemoved(endpoint, filter.toString());
+            }
+        }
+    }
+    
+    private static Set<Filter> getMatchingFilters(Set<Filter> filters, EndpointDescription endpoint) {
+        Set<Filter> matchingFilters = new HashSet<Filter>();
+        Dictionary<String, Object> dict = new Hashtable<String, Object>(endpoint.getProperties());
+        for (Filter filter : filters) {
+            if (filter.match(dict)) {
+                LOG.debug("Filter {} matches endpoint {}", filter, dict);
+                matchingFilters.add(filter);
+            } else {
+                LOG.trace("Filter {} does not match endpoint {}", filter, dict);
+            }
+        }
+        return matchingFilters;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/EndpointRepository.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/EndpointRepository.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/EndpointRepository.java
new file mode 100644
index 0000000..71a9a29
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/EndpointRepository.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.exporter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds all endpoints that are exported by a TopologyManager. For each ServiceReference that is exported a
+ * map is maintained which contains information on the endpoints for each RemoteAdminService that created the
+ * endpoints.
+ */
+@SuppressWarnings("rawtypes")
+public class EndpointRepository {
+
+    private static final Logger LOG = LoggerFactory.getLogger(EndpointRepository.class);
+
+    private final Map<ServiceReference, Map<RemoteServiceAdmin, Collection<EndpointDescription>>> exportedServices
+        = new LinkedHashMap<ServiceReference, Map<RemoteServiceAdmin, Collection<EndpointDescription>>>();
+
+    private EndpointListener notifier;
+    
+    public void setNotifier(EndpointListener notifier) {
+        this.notifier = notifier;
+    }
+    
+    
+    /**
+     * Remove all services exported by the given rsa.
+     *
+     * @param rsa the RemoteServiceAdmin to remove
+     * @return list of removed endpoints
+     */
+    public synchronized List<EndpointDescription> removeRemoteServiceAdmin(RemoteServiceAdmin rsa) {
+        LOG.debug("RemoteServiceAdmin removed: {}", rsa.getClass().getName());
+        List<EndpointDescription> removedEndpoints = new ArrayList<EndpointDescription>();
+        for (Map<RemoteServiceAdmin, Collection<EndpointDescription>> exports : exportedServices.values()) {
+            Collection<EndpointDescription> endpoints = exports.get(rsa);
+            if (endpoints != null) {
+                removedEndpoints.addAll(endpoints);
+                exports.remove(rsa);
+            }
+        }
+        endpointsRemoved(removedEndpoints);
+        return removedEndpoints;
+    }
+
+    public synchronized void removeService(ServiceReference sref) {
+        List<EndpointDescription> removedEndpoints = new ArrayList<EndpointDescription>();
+        Map<RemoteServiceAdmin, Collection<EndpointDescription>> rsaToEndpoints = exportedServices.get(sref);
+        if (rsaToEndpoints != null) {
+            for (Collection<EndpointDescription> endpoints : rsaToEndpoints.values()) {
+                removedEndpoints.addAll(endpoints);
+            }
+            exportedServices.remove(sref);
+        }
+        endpointsRemoved(removedEndpoints);
+    }
+
+    public synchronized void addService(ServiceReference sref) {
+        if (!exportedServices.containsKey(sref)) {
+            LOG.info("Marking service from bundle {} for export", sref.getBundle().getSymbolicName());
+            exportedServices.put(sref, new LinkedHashMap<RemoteServiceAdmin, Collection<EndpointDescription>>());
+        }
+    }
+
+    public synchronized void addEndpoints(ServiceReference sref, RemoteServiceAdmin rsa,
+                                   List<EndpointDescription> endpoints) {
+        addService(sref);
+        Map<RemoteServiceAdmin, Collection<EndpointDescription>> exports = exportedServices.get(sref);
+        exports.put(rsa, endpoints);
+        endpointsAdded(endpoints);
+    }
+
+    synchronized boolean isAlreadyExportedForRsa(ServiceReference sref, RemoteServiceAdmin rsa) {
+        Map<RemoteServiceAdmin, Collection<EndpointDescription>> exports = exportedServices.get(sref);
+        return exports != null && exports.containsKey(rsa);
+    }
+
+    public synchronized Collection<EndpointDescription> getAllEndpoints() {
+        List<EndpointDescription> allEndpoints = new ArrayList<EndpointDescription>();
+        for (Map<RemoteServiceAdmin, Collection<EndpointDescription>> exports : exportedServices.values()) {
+            for (Collection<EndpointDescription> endpoints : exports.values()) {
+                allEndpoints.addAll(endpoints);
+            }
+        }
+        return allEndpoints;
+    }
+
+    public synchronized Set<ServiceReference> getServicesToBeExportedFor(RemoteServiceAdmin rsa) {
+        Set<ServiceReference> servicesToBeExported = new HashSet<ServiceReference>();
+        for (Map.Entry<ServiceReference, Map<RemoteServiceAdmin, Collection<EndpointDescription>>> entry
+                : exportedServices.entrySet()) {
+            if (!entry.getValue().containsKey(rsa)) {
+                servicesToBeExported.add(entry.getKey());
+            }
+        }
+        return servicesToBeExported;
+    }
+
+    private void endpointsAdded(List<EndpointDescription> endpoints) {
+        for (EndpointDescription epd : endpoints) {
+            notifier.endpointAdded(epd, null);
+        }
+    }
+    
+    private void endpointsRemoved(List<EndpointDescription> endpoints) {
+        for (EndpointDescription epd : endpoints) {
+            notifier.endpointRemoved(epd, null);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExport.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExport.java
new file mode 100644
index 0000000..0557116
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/exporter/TopologyManagerExport.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.exporter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+import org.apache.aries.rsa.spi.ExportPolicy;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.ServiceEvent;
+import org.osgi.framework.ServiceListener;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.ExportReference;
+import org.osgi.service.remoteserviceadmin.ExportRegistration;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages exported endpoints of DOSGi services and notifies EndpointListeners of changes.
+ *
+ * <li> Tracks local RemoteServiceAdmin instances by using a ServiceTracker
+ * <li> Uses a ServiceListener to track local OSGi services
+ * <li> When a service is published that is supported by DOSGi the
+ *      known RemoteServiceAdmins are instructed to export the service and
+ *      the EndpointListeners are notified
+ * <li> When a service is unpublished the EndpointListeners are notified.
+ *      The endpoints are not closed as the ExportRegistration takes care of this
+ */
+public class TopologyManagerExport implements ServiceListener {
+    private static final Logger LOG = LoggerFactory.getLogger(TopologyManagerExport.class);
+
+    private final Executor execService;
+    private final EndpointRepository endpointRepo;
+    private ExportPolicy policy;
+    private final Set<RemoteServiceAdmin> rsaSet;
+
+
+    public TopologyManagerExport(final EndpointRepository endpointRepo, Executor executor, ExportPolicy policy) {
+        this.endpointRepo = endpointRepo;
+        this.policy = policy;
+        this.rsaSet = new HashSet<RemoteServiceAdmin>();
+        this.execService = executor;
+    }
+
+    // track all service registrations so we can export any services that are configured to be exported
+    // ServiceListener events may be delivered out of order, concurrently, re-entrant, etc. (see spec or docs)
+    public void serviceChanged(ServiceEvent event) {
+        ServiceReference<?> sref = event.getServiceReference();
+        if (event.getType() == ServiceEvent.REGISTERED) {
+            LOG.debug("Received REGISTERED ServiceEvent: {}", event);
+            export(sref);
+        } else if (event.getType() == ServiceEvent.UNREGISTERING) {
+            LOG.debug("Received UNREGISTERING ServiceEvent: {}", event);
+            endpointRepo.removeService(sref);
+        }
+    }
+
+    public void add(RemoteServiceAdmin rsa) {
+        rsaSet.add(rsa);
+        for (ServiceReference<?> serviceRef : endpointRepo.getServicesToBeExportedFor(rsa)) {
+            export(serviceRef);
+        }
+    };
+    
+    public void remove(RemoteServiceAdmin rsa) {
+        rsaSet.remove(rsa);
+        endpointRepo.removeRemoteServiceAdmin(rsa);
+    };
+
+    private void export(final ServiceReference<?> sref) {
+        execService.execute(new Runnable() {
+            public void run() {
+                doExport(sref);
+            }
+        });
+    }
+
+    private void doExport(final ServiceReference<?> sref) {
+        Map<String, ?> addProps = policy.additionalParameters(sref);
+        if (!shouldExport(sref, addProps)) {
+            LOG.debug("Skipping service {}", sref);
+            return;
+        }
+        LOG.debug("Exporting service {}", sref);
+        endpointRepo.addService(sref); // mark for future export even if there are currently no RSAs
+        if (rsaSet.size() == 0) {
+            LOG.error("No RemoteServiceAdmin available! Unable to export service from bundle {}, interfaces: {}",
+                    getSymbolicName(sref.getBundle()),
+                    sref.getProperty(org.osgi.framework.Constants.OBJECTCLASS));
+            return;
+        }
+
+        for (RemoteServiceAdmin remoteServiceAdmin : rsaSet) {
+            LOG.info("TopologyManager: handling remoteServiceAdmin " + remoteServiceAdmin);
+            if (endpointRepo.isAlreadyExportedForRsa(sref, remoteServiceAdmin)) {
+                // already handled by this remoteServiceAdmin
+                LOG.debug("already handled by this remoteServiceAdmin -> skipping");
+            } else {
+                
+                exportServiceUsingRemoteServiceAdmin(sref, remoteServiceAdmin, addProps);
+            }
+        }
+    }
+
+    private boolean shouldExport(ServiceReference<?> sref, Map<String, ?> addProps) {
+        String exported = (String)sref.getProperty(RemoteConstants.SERVICE_EXPORTED_INTERFACES);
+        String addExported = (String)addProps.get(RemoteConstants.SERVICE_EXPORTED_INTERFACES);
+        String effectiveExported = addExported != null ? addExported : exported;
+        return (effectiveExported != null) && !effectiveExported.isEmpty();
+    }
+
+    private Object getSymbolicName(Bundle bundle) {
+        return bundle == null ? null : bundle.getSymbolicName();
+    }
+
+    private void exportServiceUsingRemoteServiceAdmin(final ServiceReference<?> sref,
+                                                      final RemoteServiceAdmin remoteServiceAdmin, 
+                                                      Map<String, ?> addProps) {
+        // abort if the service was unregistered by the time we got here
+        // (we check again at the end, but this optimization saves unnecessary heavy processing)
+        if (sref.getBundle() == null) {
+            LOG.info("TopologyManager: export aborted for {} since it was unregistered", sref);
+            endpointRepo.removeService(sref);
+            return;
+        }
+        // do the export
+        LOG.debug("exporting {}...", sref);
+        // TODO: additional parameter Map?
+        Collection<ExportRegistration> exportRegs = remoteServiceAdmin.exportService(sref, addProps);
+        // process successful/failed registrations
+        List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>();
+        for (ExportRegistration reg : exportRegs) {
+            if (reg.getException() == null) {
+                EndpointDescription endpoint = getExportedEndpoint(reg);
+                LOG.info("TopologyManager: export succeeded for {}, endpoint ", sref, endpoint);
+                endpoints.add(endpoint);
+            } else {
+                LOG.error("TopologyManager: export failed for {}", sref);
+                reg.close();
+            }
+        }
+        // abort export if service was unregistered in the meanwhile (since we have a race
+        // with the unregister event which may have already been handled, so we'll miss it)
+        if (sref.getBundle() == null) {
+            LOG.info("TopologyManager: export reverted for {} since service was unregistered", sref);
+            endpointRepo.removeService(sref);
+            for (ExportRegistration reg : exportRegs) {
+                reg.close();
+            }
+            return;
+        }
+        // add the new exported endpoints
+        if (!endpoints.isEmpty()) {
+            LOG.info("TopologyManager: export successful for {}, endpoints: {}", sref, endpoints);
+            endpointRepo.addEndpoints(sref, remoteServiceAdmin, endpoints);
+        }
+    }
+
+    /**
+     * Retrieves an exported Endpoint (while safely handling nulls).
+     *
+     * @param exReg an export registration
+     * @return exported Endpoint or null if not present
+     */
+    private EndpointDescription getExportedEndpoint(ExportRegistration exReg) {
+        ExportReference ref = (exReg == null) ? null : exReg.getExportReference();
+        return (ref == null) ? null : ref.getExportedEndpoint();
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java
new file mode 100644
index 0000000..1207f9f
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointListenerManager.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer;
+
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages an EndpointListener and adjusts its scope according to requested service filters.
+ */
+public class EndpointListenerManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(EndpointListenerManager.class);
+
+    private final BundleContext bctx;
+    private volatile ServiceRegistration<EndpointListener> serviceRegistration;
+    private final List<String> filters = new ArrayList<String>();
+    private final EndpointListener endpointListener;
+
+    public EndpointListenerManager(BundleContext bc, EndpointListener endpointListener) {
+        this.bctx = bc;
+        this.endpointListener = endpointListener;
+    }
+
+    protected void start() {
+        serviceRegistration = bctx.registerService(EndpointListener.class, endpointListener,
+                                                   getRegistrationProperties());
+    }
+
+    public void stop() {
+        if (serviceRegistration != null) {
+            serviceRegistration.unregister();
+        }
+    }
+
+    protected void extendScope(String filter) {
+        if (filter == null) {
+            return;
+        }
+        LOG.debug("EndpointListener: extending scope by {}", filter);
+        synchronized (filters) {
+            filters.add(filter);
+        }
+        updateRegistration();
+    }
+
+    protected void reduceScope(String filter) {
+        if (filter == null) {
+            return;
+        }
+        LOG.debug("EndpointListener: reducing scope by {}", filter);
+        synchronized (filters) {
+            filters.remove(filter);
+        }
+        updateRegistration();
+    }
+
+    private Dictionary<String, Object> getRegistrationProperties() {
+        Dictionary<String, Object> p = new Hashtable<String, Object>();
+
+        synchronized (filters) {
+            LOG.debug("Current filter: {}", filters);
+            p.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, new ArrayList<String>(filters));
+        }
+
+        return p;
+    }
+
+    private void updateRegistration() {
+        if (serviceRegistration != null) {
+            serviceRegistration.setProperties(getRegistrationProperties());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/FilterHelper.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/FilterHelper.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/FilterHelper.java
new file mode 100644
index 0000000..480ae46
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/FilterHelper.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.osgi.framework.Constants;
+
+public final class FilterHelper {
+    private static final String OBJECTCLASS_EXPRESSION = ".*\\(" + Constants.OBJECTCLASS + "=([a-zA-Z_0-9.]+)\\).*";
+    private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(OBJECTCLASS_EXPRESSION);
+
+    private FilterHelper() {
+        // prevent instantiation
+    }
+
+    public static String getObjectClass(String filter) {
+        if (filter != null) {
+            Matcher matcher = OBJECTCLASS_PATTERN.matcher(filter);
+            if (matcher.matches() && matcher.groupCount() >= 1) {
+                return matcher.group(1);
+            }
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ListenerHookImpl.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ListenerHookImpl.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ListenerHookImpl.java
new file mode 100644
index 0000000..6766fc1
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ListenerHookImpl.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.hooks.service.ListenerHook;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listens for service listeners and informs ServiceInterestListener about added and removed interest
+ * in services
+ */
+public class ListenerHookImpl implements ListenerHook {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ListenerHookImpl.class);
+
+    // From the old impl.
+    private static final Set<String> SYSTEM_PACKAGES;
+    static {
+        SYSTEM_PACKAGES = new HashSet<String>();
+        SYSTEM_PACKAGES.add("org.osgi.service");
+        SYSTEM_PACKAGES.add("org.apache.felix");
+        SYSTEM_PACKAGES.add("org.ops4j.pax.logging");
+        SYSTEM_PACKAGES.add("ch.ethz.iks.slp");
+        SYSTEM_PACKAGES.add("org.ungoverned.osgi.service");
+        SYSTEM_PACKAGES.add("org.springframework.osgi.context.event.OsgiBundleApplicationContextListener");
+        SYSTEM_PACKAGES.add("java.net.ContentHandler");
+    }
+
+    private final BundleContext bctx;
+    private final ServiceInterestListener serviceInterestListener;
+    private final String frameworkUUID;
+
+    public ListenerHookImpl(BundleContext bc, ServiceInterestListener serviceInterestListener) {
+        this.bctx = bc;
+        this.frameworkUUID = bctx.getProperty(Constants.FRAMEWORK_UUID);
+        this.serviceInterestListener = serviceInterestListener;
+    }
+
+    @Override
+    public void added(Collection<ListenerInfo> listeners) {
+        LOG.debug("added listeners {}", listeners);
+        for (ListenerInfo listenerInfo : listeners) {
+            LOG.debug("Filter {}", listenerInfo.getFilter());
+
+            String className = FilterHelper.getObjectClass(listenerInfo.getFilter());
+
+            if (listenerInfo.getBundleContext().equals(bctx)) {
+                LOG.debug("ListenerHookImpl: skipping request from myself");
+                continue;
+            }
+
+            if (listenerInfo.getFilter() == null) {
+                LOG.debug("skipping empty filter");
+                continue;
+            }
+
+            if (isClassExcluded(className)) {
+                LOG.debug("Skipping import request for excluded class [{}]", className);
+                continue;
+            }
+            String exFilter = extendFilter(listenerInfo.getFilter());
+            serviceInterestListener.addServiceInterest(exFilter);
+        }
+    }
+
+    @Override
+    public void removed(Collection<ListenerInfo> listeners) {
+        LOG.debug("removed listeners {}", listeners);
+
+        for (ListenerInfo listenerInfo : listeners) {
+            LOG.debug("Filter {}", listenerInfo.getFilter());
+
+            // TODO: determine if service was handled?
+            String exFilter = extendFilter(listenerInfo.getFilter());
+            serviceInterestListener.removeServiceInterest(exFilter);
+        }
+    }
+
+    private static boolean isClassExcluded(String className) {
+        if (className == null) {
+            return true;
+        }
+
+        for (String p : SYSTEM_PACKAGES) {
+            if (className.startsWith(p)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    String extendFilter(String filter) {
+        return "(&" + filter + "(!(" + RemoteConstants.ENDPOINT_FRAMEWORK_UUID + "=" + frameworkUUID + ")))";
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/RSATracker.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/RSATracker.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/RSATracker.java
new file mode 100644
index 0000000..4aa648f
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/RSATracker.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer;
+
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
+
+public interface RSATracker {
+    void added(RemoteServiceAdmin rsa);
+    void removed(RemoteServiceAdmin rsa);
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ReferenceCounter.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ReferenceCounter.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ReferenceCounter.java
new file mode 100644
index 0000000..71e796c
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ReferenceCounter.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Manages a reference count per key.
+ *
+ * @param <K> the key type
+ */
+public class ReferenceCounter<K> {
+
+    private final ConcurrentMap<K, Integer> counts = new ConcurrentHashMap<K, Integer>();
+
+    /**
+     * Increases the reference count for the given key,
+     * or sets it to 1 if the key has no existing count.
+     *
+     * @param key a key
+     * @return the updated reference count
+     */
+    public int add(K key) {
+        while (true) {
+            Integer count = counts.get(key);
+            if (count == null) {
+                if (counts.putIfAbsent(key, 1) == null) {
+                    return 1;
+                }
+            } else if (counts.replace(key, count, count + 1)) {
+                return count + 1;
+            }
+        }
+    }
+
+    /**
+     * Decreases the reference count for the given key,
+     * and removes it if it reaches 0.
+     * If the key has no existing count, -1 is returned.
+     *
+     * @param key a key
+     * @return the updated reference count, or -1 if the key has no existing count
+     */
+    public int remove(K key) {
+        while (true) {
+            Integer count = counts.get(key);
+            if (count == null) {
+                return -1;
+            }
+            if (count == 1) {
+                if (counts.remove(key, 1)) {
+                    return 0;
+                }
+            } else if (counts.replace(key, count, count - 1)) {
+                return count - 1;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ServiceInterestListener.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ServiceInterestListener.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ServiceInterestListener.java
new file mode 100644
index 0000000..9e7b70c
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ServiceInterestListener.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer;
+
+public interface ServiceInterestListener {
+
+    void addServiceInterest(String filter);
+
+    void removeServiceInterest(String filter);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
new file mode 100644
index 0000000..e548288
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.hooks.service.ListenerHook;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.ImportReference;
+import org.osgi.service.remoteserviceadmin.ImportRegistration;
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdminEvent;
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdminListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listens for remote endpoints using the EndpointListener interface and the EndpointListenerManager.
+ * Listens for local service interests using the ListenerHookImpl that calls back through the
+ * ServiceInterestListener interface.
+ * Manages local creation and destruction of service imports using the available RemoteServiceAdmin services.
+ */
+public class TopologyManagerImport implements EndpointListener, RemoteServiceAdminListener, ServiceInterestListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TopologyManagerImport.class);
+    private ExecutorService execService;
+
+    private final EndpointListenerManager endpointListenerManager;
+    private final BundleContext bctx;
+    private Set<RemoteServiceAdmin> rsaSet;
+    private final ListenerHookImpl listenerHook;
+
+    /**
+     * If set to false only one service is imported for each import interest even it multiple services are
+     * available. If set to true, all available services are imported.
+     *
+     * TODO: Make this available as a configuration option
+     */
+    private boolean importAllAvailable = true;
+
+    /**
+     * Contains an instance of the Class Import Interest for each distinct import request. If the same filter
+     * is requested multiple times the existing instance of the Object increments an internal reference
+     * counter. If an interest is removed, the related ServiceInterest object is used to reduce the reference
+     * counter until it reaches zero. in this case the interest is removed.
+     */
+    private final ReferenceCounter<String> importInterestsCounter = new ReferenceCounter<String>();
+
+    /**
+     * List of Endpoints by matched filter that were reported by the EndpointListener and can be imported
+     */
+    private final Map<String /* filter */, List<EndpointDescription>> importPossibilities
+        = new HashMap<String, List<EndpointDescription>>();
+
+    /**
+     * List of already imported Endpoints by their matched filter
+     */
+    private final Map<String /* filter */, List<ImportRegistration>> importedServices
+        = new HashMap<String, List<ImportRegistration>>();
+    
+
+    public TopologyManagerImport(BundleContext bc) {
+        this.rsaSet = new HashSet<RemoteServiceAdmin>();
+        bctx = bc;
+        endpointListenerManager = new EndpointListenerManager(bctx, this);
+        execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+        listenerHook = new ListenerHookImpl(bc, this);
+    }
+    
+    public void start() {
+        bctx.registerService(RemoteServiceAdminListener.class, this, null);
+        bctx.registerService(ListenerHook.class, listenerHook, null);
+        endpointListenerManager.start();
+    }
+
+    public void stop() {
+        endpointListenerManager.stop();
+        execService.shutdown();
+        // this is called from Activator.stop(), which implicitly unregisters our registered services
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.cxf.dosgi.topologymanager.ServiceInterestListener#addServiceInterest(java.lang.String)
+     */
+    public void addServiceInterest(String filter) {
+        if (importInterestsCounter.add(filter) == 1) {
+            endpointListenerManager.extendScope(filter);
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.cxf.dosgi.topologymanager.ServiceInterestListener#removeServiceInterest(java.lang.String)
+     */
+    public void removeServiceInterest(String filter) {
+        if (importInterestsCounter.remove(filter) == 0) {
+            LOG.debug("last reference to import interest is gone -> removing interest filter: {}", filter);
+            endpointListenerManager.reduceScope(filter);
+            synchronized (importedServices) {
+                List<ImportRegistration> irs = importedServices.remove(filter);
+                if (irs != null) {
+                    for (ImportRegistration ir : irs) {
+                        ir.close();
+                    }
+                }
+            }
+        }
+    }
+
+    public void endpointAdded(EndpointDescription endpoint, String filter) {
+        if (filter == null) {
+            LOG.error("Endpoint is not handled because no matching filter was provided!");
+            return;
+        }
+        LOG.debug("importable service added for filter {}, endpoint {}", filter, endpoint);
+        addImportPossibility(endpoint, filter);
+        triggerImport(filter);
+    }
+
+    public void endpointRemoved(EndpointDescription endpoint, String filter) {
+        LOG.debug("EndpointRemoved {}", endpoint);
+        removeImportPossibility(endpoint, filter);
+        triggerImport(filter);
+    }
+
+    private void addImportPossibility(EndpointDescription endpoint, String filter) {
+        synchronized (importPossibilities) {
+            List<EndpointDescription> endpoints = importPossibilities.get(filter);
+            if (endpoints == null) {
+                endpoints = new ArrayList<EndpointDescription>();
+                importPossibilities.put(filter, endpoints);
+            }
+            // prevent adding the same endpoint multiple times, which can happen sometimes,
+            // and which causes imports to remain available even when services are actually down
+            if (!endpoints.contains(endpoint)) {
+                endpoints.add(endpoint);
+            }
+        }
+    }
+
+    private void removeImportPossibility(EndpointDescription endpoint, String filter) {
+        synchronized (importPossibilities) {
+            List<EndpointDescription> endpoints = importPossibilities.get(filter);
+            if (endpoints != null) {
+                endpoints.remove(endpoint);
+                if (endpoints.isEmpty()) {
+                    importPossibilities.remove(filter);
+                }
+            }
+        }
+    }
+
+    public void add(RemoteServiceAdmin rsa) {
+        rsaSet.add(rsa);
+        synchronized (importPossibilities) {
+            for (String filter : importPossibilities.keySet()) {
+                triggerImport(filter);
+            }
+        }
+    }
+    
+    public void remove(RemoteServiceAdmin rsa) {
+        rsaSet.remove(rsa);
+    }
+
+
+    private void triggerImport(final String filter) {
+        LOG.debug("Import of a service for filter {} was queued", filter);
+
+        execService.execute(new Runnable() {
+            public void run() {
+                try {
+                    unexportNotAvailableServices(filter);
+                    importServices(filter);
+                } catch (Exception e) {
+                    LOG.error(e.getMessage(), e);
+                }
+                // Notify EndpointListeners? NO!
+            }
+        });
+    }
+
+    private void unexportNotAvailableServices(String filter) {
+        synchronized (importedServices) {
+            List<ImportRegistration> importRegistrations = importedServices.get(filter);
+            if (importRegistrations != null) {
+                // iterate over a copy
+                for (ImportRegistration ir : new ArrayList<ImportRegistration>(importRegistrations)) {
+                    EndpointDescription endpoint = ir.getImportReference().getImportedEndpoint();
+                    if (!isImportPossibilityAvailable(endpoint, filter)) {
+                        removeImport(ir, null); // also unexports the service
+                    }
+                }
+            }
+        }
+    }
+
+    private boolean isImportPossibilityAvailable(EndpointDescription endpoint, String filter) {
+        synchronized (importPossibilities) {
+            List<EndpointDescription> endpoints = importPossibilities.get(filter);
+            return endpoints != null && endpoints.contains(endpoint);
+        }
+    }
+
+    // return a copy to prevent sync issues
+    private List<EndpointDescription> getImportPossibilitiesCopy(String filter) {
+        synchronized (importPossibilities) {
+            List<EndpointDescription> possibilities = importPossibilities.get(filter);
+            return possibilities == null
+                ? Collections.<EndpointDescription>emptyList()
+                : new ArrayList<EndpointDescription>(possibilities);
+        }
+    }
+
+    private void importServices(String filter) {
+        synchronized (importedServices) {
+            List<ImportRegistration> importRegistrations = importedServices.get(filter);
+            for (EndpointDescription endpoint : getImportPossibilitiesCopy(filter)) {
+                // TODO but optional: if the service is already imported and the endpoint is still
+                // in the list of possible imports check if a "better" endpoint is now in the list
+                if (!alreadyImported(endpoint, importRegistrations)) {
+                    // service not imported yet -> import it now
+                    ImportRegistration ir = importService(endpoint);
+                    if (ir != null) {
+                        // import was successful
+                        if (importRegistrations == null) {
+                            importRegistrations = new ArrayList<ImportRegistration>();
+                            importedServices.put(filter, importRegistrations);
+                        }
+                        importRegistrations.add(ir);
+                        if (!importAllAvailable) {
+                            return;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private boolean alreadyImported(EndpointDescription endpoint, List<ImportRegistration> importRegistrations) {
+        if (importRegistrations != null) {
+            for (ImportRegistration ir : importRegistrations) {
+                if (endpoint.equals(ir.getImportReference().getImportedEndpoint())) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Tries to import the service with each rsa until one import is successful
+     *
+     * @param endpoint endpoint to import
+     * @return import registration of the first successful import
+     */
+    private ImportRegistration importService(EndpointDescription endpoint) {
+        for (RemoteServiceAdmin rsa : rsaSet) {
+            ImportRegistration ir = rsa.importService(endpoint);
+            if (ir != null) {
+                if (ir.getException() == null) {
+                    LOG.debug("Service import was successful {}", ir);
+                    return ir;
+                } else {
+                    LOG.info("Error importing service " + endpoint, ir.getException());
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Remove and close (unexport) the given import. The import is specified either
+     * by its ImportRegistration or by its ImportReference (only one of them must
+     * be specified).
+     * <p>
+     * If this method is called from within iterations on the underlying data structure,
+     * the iterations must be made on copies of the structures rather than the original
+     * references in order to prevent ConcurrentModificationExceptions.
+     *
+     * @param reg the import registration to remove
+     * @param ref the import reference to remove
+     */
+    private void removeImport(ImportRegistration reg, ImportReference ref) {
+        // this method may be called recursively by calling ImportRegistration.close()
+        // and receiving a RemoteServiceAdminEvent for its unregistration, which results
+        // in a ConcurrentModificationException. We avoid this by closing the registrations
+        // only after data structure manipulation is done, and being re-entrant.
+        synchronized (importedServices) {
+            List<ImportRegistration> removed = new ArrayList<ImportRegistration>();
+            for (Iterator<List<ImportRegistration>> it1 = importedServices.values().iterator(); it1.hasNext();) {
+                Collection<ImportRegistration> irs = it1.next();
+                for (Iterator<ImportRegistration> it2 = irs.iterator(); it2.hasNext();) {
+                    ImportRegistration ir = it2.next();
+                    if (ir.equals(reg) || ir.getImportReference().equals(ref)) {
+                        removed.add(ir);
+                        it2.remove();
+                    }
+                }
+                if (irs.isEmpty()) {
+                    it1.remove();
+                }
+            }
+            for (ImportRegistration ir : removed) {
+                ir.close();
+            }
+        }
+    }
+
+    public void remoteAdminEvent(RemoteServiceAdminEvent event) {
+        if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION) {
+            removeImport(null, event.getImportReference());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java b/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java
deleted file mode 100644
index 62ec1a9..0000000
--- a/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.topologymanager;
-
-import java.util.Dictionary;
-import java.util.Hashtable;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cxf.dosgi.dsw.api.ExportPolicy;
-import org.apache.cxf.dosgi.topologymanager.exporter.DefaultExportPolicy;
-import org.apache.cxf.dosgi.topologymanager.exporter.EndpointListenerNotifier;
-import org.apache.cxf.dosgi.topologymanager.exporter.EndpointRepository;
-import org.apache.cxf.dosgi.topologymanager.exporter.TopologyManagerExport;
-import org.apache.cxf.dosgi.topologymanager.importer.TopologyManagerImport;
-import org.osgi.framework.BundleActivator;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Filter;
-import org.osgi.framework.FrameworkUtil;
-import org.osgi.framework.InvalidSyntaxException;
-import org.osgi.framework.ServiceEvent;
-import org.osgi.framework.ServiceReference;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.service.remoteserviceadmin.RemoteConstants;
-import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
-import org.osgi.util.tracker.ServiceTracker;
-import org.osgi.util.tracker.ServiceTrackerCustomizer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class Activator implements BundleActivator {
-    public static final String RSA_EXPORT_POLICY_FILTER = "rsa.export.policy.filter";
-    static final String DOSGI_SERVICES = "(" + RemoteConstants.SERVICE_EXPORTED_INTERFACES + "=*)";
-    private static final Logger LOG = LoggerFactory.getLogger(Activator.class);
-
-    private TopologyManagerExport exportManager;
-    private TopologyManagerImport importManager;
-    private EndpointListenerNotifier notifier;
-    private ServiceTracker<RemoteServiceAdmin, RemoteServiceAdmin> rsaTracker;
-    private ThreadPoolExecutor exportExecutor;
-    private ServiceTracker<EndpointListener, EndpointListener> epListenerTracker;
-    private ServiceTracker<ExportPolicy, ExportPolicy> policyTracker;
-
-    public void start(final BundleContext bc) throws Exception {
-        Dictionary<String, String> props = new Hashtable<String, String>();
-        props.put("name", "default");
-        bc.registerService(ExportPolicy.class, new DefaultExportPolicy(), props);
-
-        Filter policyFilter = exportPolicyFilter(bc);
-        policyTracker = new ServiceTracker<ExportPolicy, ExportPolicy>(bc, policyFilter, null) {
-
-            @Override
-            public ExportPolicy addingService(ServiceReference<ExportPolicy> reference) {
-                ExportPolicy policy = super.addingService(reference);
-                if (exportManager == null) {
-                    doStart(bc, policy);
-                }
-                return policy;
-            }
-
-            @Override
-            public void removedService(ServiceReference<ExportPolicy> reference, ExportPolicy service) {
-                if (exportManager != null) {
-                    doStop(bc);
-                }
-                super.removedService(reference, service);
-            }
-        };
-        policyTracker.open();
-    }
-
-    private Filter exportPolicyFilter(BundleContext bc) throws InvalidSyntaxException {
-        String filter = bc.getProperty(RSA_EXPORT_POLICY_FILTER);
-        if (filter == null) {
-            filter = "(name=default)";
-        }
-        return FrameworkUtil.createFilter(String.format("(&(objectClass=%s)%s)", ExportPolicy.class.getName(), filter));
-    }
-
-    public void doStart(final BundleContext bc, ExportPolicy policy) {
-        LOG.debug("TopologyManager: start()");
-        EndpointRepository endpointRepo = new EndpointRepository();
-        notifier = new EndpointListenerNotifier(endpointRepo);
-        epListenerTracker = new EndpointListenerTracker(bc);
-        endpointRepo.setNotifier(notifier);
-        exportExecutor = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
-        exportManager = new TopologyManagerExport(endpointRepo, exportExecutor, policy);
-        importManager = new TopologyManagerImport(bc);
-        rsaTracker = new RSATracker(bc, RemoteServiceAdmin.class, null);
-        bc.addServiceListener(exportManager);
-        rsaTracker.open();
-        epListenerTracker.open();
-        exportExistingServices(bc);
-        importManager.start();
-    }
-
-    public void stop(BundleContext bc) throws Exception {
-        policyTracker.close();
-    }
-
-    public void doStop(BundleContext bc) {
-        LOG.debug("TopologyManager: stop()");
-        epListenerTracker.close();
-        bc.removeServiceListener(exportManager);
-        exportExecutor.shutdown();
-        importManager.stop();
-        rsaTracker.close();
-        exportManager = null;
-    }
-
-    public void exportExistingServices(BundleContext context) {
-        try {
-            // cast to String is necessary for compiling against OSGi core version >= 4.3
-            ServiceReference<?>[] references = context.getServiceReferences((String)null, DOSGI_SERVICES);
-            if (references != null) {
-                for (ServiceReference<?> sref : references) {
-                    exportManager.serviceChanged(new ServiceEvent(ServiceEvent.REGISTERED, sref));
-                }
-            }
-        } catch (InvalidSyntaxException e) {
-            LOG.error("Error in filter {}. This should not occur!", DOSGI_SERVICES);
-        }
-    }
-    
-    private final class EndpointListenerTracker extends ServiceTracker<EndpointListener, EndpointListener> {
-        private EndpointListenerTracker(BundleContext context) {
-            super(context, EndpointListener.class, null);
-        }
-
-        @Override
-        public EndpointListener addingService(ServiceReference<EndpointListener> reference) {
-            EndpointListener listener = super.addingService(reference);
-            notifier.add(listener, EndpointListenerNotifier.getFiltersFromEndpointListenerScope(reference));
-            return listener;
-        }
-
-        @Override
-        public void modifiedService(ServiceReference<EndpointListener> reference,
-                                    EndpointListener listener) {
-            super.modifiedService(reference, listener);
-            notifier.add(listener, EndpointListenerNotifier.getFiltersFromEndpointListenerScope(reference));
-        }
-
-        @Override
-        public void removedService(ServiceReference<EndpointListener> reference,
-                                   EndpointListener listener) {
-            notifier.remove(listener);
-            super.removedService(reference, listener);
-        }
-    }
-
-    private final class RSATracker extends ServiceTracker<RemoteServiceAdmin, RemoteServiceAdmin> {
-        private RSATracker(BundleContext context, Class<RemoteServiceAdmin> clazz,
-                           ServiceTrackerCustomizer<RemoteServiceAdmin, RemoteServiceAdmin> customizer) {
-            super(context, clazz, customizer);
-        }
-
-        @Override
-        public RemoteServiceAdmin addingService(ServiceReference<RemoteServiceAdmin> reference) {
-            RemoteServiceAdmin rsa = super.addingService(reference);
-            LOG.debug("New RemoteServiceAdmin {} detected, trying to import and export services with it", rsa);
-            importManager.add(rsa);
-            exportManager.add(rsa);
-            return rsa;
-        }
-
-        @Override
-        public void removedService(ServiceReference<RemoteServiceAdmin> reference,
-                                   RemoteServiceAdmin rsa) {
-            exportManager.remove(rsa);
-            importManager.remove(rsa);
-            super.removedService(reference, rsa);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/DefaultExportPolicy.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/DefaultExportPolicy.java b/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/DefaultExportPolicy.java
deleted file mode 100644
index 689ebab..0000000
--- a/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/DefaultExportPolicy.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.topologymanager.exporter;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.cxf.dosgi.dsw.api.ExportPolicy;
-import org.osgi.framework.ServiceReference;
-
-/**
- * The default is to not customize the way services are exported
- */
-public class DefaultExportPolicy implements ExportPolicy {
-
-    @Override
-    public Map<String, ?> additionalParameters(ServiceReference<?> sref) {
-        return new HashMap<String, Object>();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/5f4c6604/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java b/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java
deleted file mode 100644
index 13d7dab..0000000
--- a/topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/exporter/EndpointListenerNotifier.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.dosgi.topologymanager.exporter;
-
-import java.util.Dictionary;
-import java.util.HashSet;
-import java.util.Hashtable;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.osgi.framework.Filter;
-import org.osgi.framework.FrameworkUtil;
-import org.osgi.framework.InvalidSyntaxException;
-import org.osgi.framework.ServiceReference;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Tracks EndpointListeners and allows to notify them of endpoints.
- */
-public class EndpointListenerNotifier implements EndpointListener {
-    private static final Logger LOG = LoggerFactory.getLogger(EndpointListenerNotifier.class);
-    private enum NotifyType { ADDED, REMOVED };
-    private Map<EndpointListener, Set<Filter>> listeners;
-    private EndpointRepository endpointRepo;
-
-    public EndpointListenerNotifier(final EndpointRepository endpointRepo) {
-        this.endpointRepo = endpointRepo;
-        this.listeners = new ConcurrentHashMap<EndpointListener, Set<Filter>>();
-    }
-    
-    public static Set<Filter> getFiltersFromEndpointListenerScope(ServiceReference<EndpointListener> sref) {
-        Set<Filter> filters = new HashSet<Filter>();
-        String[] scopes = StringPlus.parse(sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE));
-        for (String scope : scopes) {
-            try {
-                filters.add(FrameworkUtil.createFilter(scope));
-            } catch (InvalidSyntaxException e) {
-                LOG.error("invalid endpoint listener scope: {}", scope, e);
-            }
-        }
-        return filters;
-    }
-
-    public void add(EndpointListener ep, Set<Filter> filters) {
-        LOG.debug("new EndpointListener detected");
-        listeners.put(ep, filters);
-        for (EndpointDescription endpoint : endpointRepo.getAllEndpoints()) {
-            notifyListener(NotifyType.ADDED, ep, filters, endpoint);
-        }
-    }
-    
-    public void remove(EndpointListener ep) {
-        LOG.debug("EndpointListener modified");
-        listeners.remove(ep);
-    }
-    
-    @Override
-    public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
-        notifyListeners(NotifyType.ADDED, endpoint);
-    }
-
-    @Override
-    public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
-        notifyListeners(NotifyType.REMOVED, endpoint);
-    }
-
-    /**
-     * Notifies all endpoint listeners about endpoints being added or removed.
-     *
-     * @param added specifies whether endpoints were added (true) or removed (false)
-     * @param endpoints the endpoints the listeners should be notified about
-     */
-    private void notifyListeners(NotifyType type, EndpointDescription endpoint) {
-        for (EndpointListener listener : listeners.keySet()) {
-            notifyListener(type, listener, listeners.get(listener), endpoint);
-        }
-    }
-
-    /**
-     * Notifies an endpoint listener about endpoints being added or removed.
-     *
-     * @param type specifies whether endpoints were added (true) or removed (false)
-     * @param endpointListenerRef the ServiceReference of an EndpointListener to notify
-     * @param endpoints the endpoints the listener should be notified about
-     */
-    private void notifyListener(NotifyType type, EndpointListener listener, Set<Filter> filters, 
-                        EndpointDescription endpoint) {
-        LOG.debug("Endpoint {}", type);
-        Set<Filter> matchingFilters = getMatchingFilters(filters, endpoint);
-        for (Filter filter : matchingFilters) {
-            if (type == NotifyType.ADDED) {
-                listener.endpointAdded(endpoint, filter.toString());
-            } else {
-                listener.endpointRemoved(endpoint, filter.toString());
-            }
-        }
-    }
-    
-    private static Set<Filter> getMatchingFilters(Set<Filter> filters, EndpointDescription endpoint) {
-        Set<Filter> matchingFilters = new HashSet<Filter>();
-        Dictionary<String, Object> dict = new Hashtable<String, Object>(endpoint.getProperties());
-        for (Filter filter : filters) {
-            if (filter.match(dict)) {
-                LOG.debug("Filter {} matches endpoint {}", filter, dict);
-                matchingFilters.add(filter);
-            } else {
-                LOG.trace("Filter {} does not match endpoint {}", filter, dict);
-            }
-        }
-        return matchingFilters;
-    }
-
-}


Mime
View raw message