cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject svn commit: r1403685 [1/2] - in /cxf/dosgi/trunk/dsw/cxf-topology-manager: ./ src/main/java/org/apache/cxf/dosgi/topologymanager/ src/test/java/org/apache/cxf/dosgi/topologymanager/
Date Tue, 30 Oct 2012 13:10:21 GMT
Author: cschneider
Date: Tue Oct 30 13:10:20 2012
New Revision: 1403685

URL: http://svn.apache.org/viewvc?rev=1403685&view=rev
Log:
DOSGI-134 Refactoring TopologyManager

Added:
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/EndpointListenerManager.java
      - copied, changed from r1401717, cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/EndpointListenerImpl.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/EndpointListenerNotifier.java   (with props)
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/ExportRepository.java   (with props)
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RefManager.java   (with props)
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminLifeCycleListener.java   (with props)
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminTracker.java
      - copied, changed from r1401717, cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminList.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/ServiceInterestListener.java   (with props)
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/EndpointListenerNotifierTest.java
      - copied, changed from r1401717, cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/TopologyManagerTest.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ListenerHookImplTest.java
      - copied, changed from r1401717, cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/UtilsTest.java
Removed:
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/EndpointListenerImpl.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminList.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminListenerImpl.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/ServiceListenerImpl.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Utils.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/ActivatorTest.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminListTest.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminListenerTest.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/TopologyManagerTest.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/UtilsTest.java
Modified:
    cxf/dosgi/trunk/dsw/cxf-topology-manager/pom.xml
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/ListenerHookImpl.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/TopologyManager.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/TopologyManagerImport.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/EndpointListenerImplTest.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/TopologyManagerImportTest.java

Modified: cxf/dosgi/trunk/dsw/cxf-topology-manager/pom.xml
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/pom.xml?rev=1403685&r1=1403684&r2=1403685&view=diff
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/pom.xml (original)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/pom.xml Tue Oct 30 13:10:20 2012
@@ -16,11 +16,9 @@
     -->
 
     <modelVersion>4.0.0</modelVersion>
-    <groupId>org.apache.cxf.dosgi</groupId>
     <artifactId>cxf-dosgi-ri-topology-manager</artifactId>
     <packaging>bundle</packaging>
     <name>Distributed OSGi Topology Manager implementation</name>
-    <version>1.4-SNAPSHOT</version>
 
     <parent>
         <groupId>org.apache.cxf.dosgi</groupId>
@@ -36,7 +34,7 @@
             *
         </bundle.import.package>
         <bundle.export.package>
-            org.apache.cxf.dosgi.*;version="${project.version}"
+            !*
         </bundle.export.package>
     </properties>
 
@@ -65,7 +63,6 @@
         <dependency>
             <groupId>org.easymock</groupId>
             <artifactId>easymockclassextension</artifactId>
-            <version>2.4</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
@@ -92,11 +89,6 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
-                <configuration>
-                    <excludes>
-                        <exclude>**/TestUtils*</exclude>
-                    </excludes>
-                </configuration>
             </plugin>
         </plugins>
     </build>

Modified: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java?rev=1403685&r1=1403684&r2=1403685&view=diff
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java (original)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/Activator.java Tue Oct 30 13:10:20 2012
@@ -24,64 +24,29 @@ import org.osgi.framework.BundleActivato
 import org.osgi.framework.BundleContext;
 
 public class Activator implements BundleActivator {
-
     private static final Logger LOG = Logger.getLogger(Activator.class.getName());
 
     private TopologyManager topManager;
     private TopologyManagerImport topManagerImport;
 
-    private RemoteServiceAdminList remoteServiceAdminList;
-    
-    protected RemoteServiceAdminListenerImpl remoteServiceAdminListener;
-    
-    // separated for testing
-    protected TopologyManager createTopologyManager(BundleContext bc,RemoteServiceAdminList rl) {
-        return new TopologyManager(bc,rl);
-    }
+    private RemoteServiceAdminTracker rsaTracker;
 
- // separated for testing
-    protected TopologyManagerImport createTopologyManagerImport(BundleContext bc,RemoteServiceAdminList rl) {
-        return new TopologyManagerImport(bc,rl);
-    }
-    
- // separated for testing
-    protected RemoteServiceAdminList createRemoteServiceAdminList(BundleContext bc) {
-        return new RemoteServiceAdminList(bc);
-    }
-
-    // separated for testing
-    protected RemoteServiceAdminListenerImpl createRemoteServiceAdminListenerImpl(BundleContext bc,TopologyManager topManager,TopologyManagerImport topManagerImport) {
-        return new RemoteServiceAdminListenerImpl(bc, topManager, topManagerImport);
-    }
-    
     public void start(BundleContext bc) throws Exception {
         LOG.fine("TopologyManager: start()");
-        remoteServiceAdminList = createRemoteServiceAdminList(bc);
-        topManager = createTopologyManager(bc,remoteServiceAdminList);
-        topManagerImport = createTopologyManagerImport(bc,remoteServiceAdminList);
-
-        remoteServiceAdminList.setTopologyManager(topManager);
-        remoteServiceAdminList.setTopologyManagerImport(topManagerImport);
-        
-        remoteServiceAdminListener = createRemoteServiceAdminListenerImpl(bc, topManager, topManagerImport);        
-        
-        remoteServiceAdminListener.start();
-        
+        rsaTracker = new RemoteServiceAdminTracker(bc);
+        topManager = new TopologyManager(bc, rsaTracker);
+        topManagerImport = new TopologyManagerImport(bc, rsaTracker);
+
+        rsaTracker.open();
         topManager.start();
-        
-        remoteServiceAdminList.start();
-        
         topManagerImport.start();
-        
-        
     }
 
     public void stop(BundleContext bc) throws Exception {
         LOG.fine("TopologyManager: stop()");
         topManager.stop();
         topManagerImport.stop();
-        remoteServiceAdminList.stop();
-        remoteServiceAdminListener.stop();
+        rsaTracker.close();
     }
 
 }

Copied: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/EndpointListenerManager.java (from r1401717, cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/EndpointListenerImpl.java)
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/EndpointListenerManager.java?p2=cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/EndpointListenerManager.java&p1=cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/EndpointListenerImpl.java&r1=1401717&r2=1403685&rev=1403685&view=diff
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/EndpointListenerImpl.java (original)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/EndpointListenerManager.java Tue Oct 30 13:10:20 2012
@@ -20,38 +20,37 @@ package org.apache.cxf.dosgi.topologyman
 
 import java.util.ArrayList;
 import java.util.Dictionary;
+import java.util.Hashtable;
 import java.util.List;
-import java.util.Properties;
 import java.util.logging.Logger;
 
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointListener;
 
-// TODO: realize as ServiceFactory ! 
-public class EndpointListenerImpl implements EndpointListener {
+/**
+ * Manages an EndpointListener add adjusts its scope according to requested service filters
+ */
+public class EndpointListenerManager {
 
-    private static final Logger LOG = Logger.getLogger(EndpointListenerImpl.class.getName());
+    private static final Logger LOG = Logger.getLogger(EndpointListenerManager.class.getName());
     
     private final BundleContext bctx;
     private ServiceRegistration serviceRegistration;
     private List<String> filters = new ArrayList<String>();
-    private TopologyManagerImport topManager;
+    private EndpointListener endpointListener;
 
-    protected EndpointListenerImpl(BundleContext bc, TopologyManagerImport tm) {
-        bctx = bc;
-        topManager = tm;
+    protected EndpointListenerManager(BundleContext bc, EndpointListener endpointListener) {
+        this.bctx = bc;
+        this.endpointListener = endpointListener;
     }
 
     protected void start() {
-        serviceRegistration = bctx.registerService(EndpointListener.class.getName(), this,
+        serviceRegistration = bctx.registerService(EndpointListener.class.getName(), endpointListener,
                                                    getRegistrationProperties());
     }
-
-
-
-    protected void stop() {
+    
+    public void stop() {
         serviceRegistration.unregister();
     }
 
@@ -68,8 +67,6 @@ public class EndpointListenerImpl implem
         updateRegistration();
     }
 
-   
-
     protected void reduceScope(String filter) {
         if (filter == null)
             return;
@@ -82,8 +79,8 @@ public class EndpointListenerImpl implem
         updateRegistration();
     }
 
-    private Dictionary getRegistrationProperties() {
-        Properties p = new Properties();
+    private Dictionary<String, Object> getRegistrationProperties() {
+        Dictionary<String, Object> p = new Hashtable<String, Object>();
 
         synchronized (filters) {
             LOG.finer("EndpointListener: current filter: " + filters);
@@ -97,26 +94,7 @@ public class EndpointListenerImpl implem
     private void updateRegistration() {
         // This tends to be verbose.
         LOG.finer("EndpointListenerImpl: filters: " + filters);
-
         serviceRegistration.setProperties(getRegistrationProperties());
     }
 
-    public void endpointAdded(EndpointDescription epd, String filter) {
-        LOG.fine("EndpointListenerImpl: EndpointAdded() filter:"+filter+"  EndpointDesc:"+epd);
-        
-        if(filter==null){
-            LOG.severe("Endpoint is not handled because no matching filter was provided! Filter: "+filter);
-            return;
-        }
-        // Decide if it is worth it ? 
-        
-        topManager.addImportableService(filter,epd);
-        
-    }
-
-    public void endpointRemoved(EndpointDescription epd, String filter) {
-        LOG.fine("EndpointListenerImpl: EndpointRemoved() -> "+epd);
-        topManager.removeImportableService(filter, epd);
-    }
-
 }

Added: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/EndpointListenerNotifier.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/EndpointListenerNotifier.java?rev=1403685&view=auto
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/EndpointListenerNotifier.java (added)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/EndpointListenerNotifier.java Tue Oct 30 13:10:20 2012
@@ -0,0 +1,225 @@
+/** 
+  * Licensed to the Apache Software Foundation (ASF) under one 
+  * or more contributor license agreements. See the NOTICE file 
+  * distributed with this work for additional information 
+  * regarding copyright ownership. The ASF licenses this file 
+  * to you under the Apache License, Version 2.0 (the 
+  * "License"); you may not use this file except in compliance 
+  * with the License. You may obtain a copy of the License at 
+  * 
+  * http://www.apache.org/licenses/LICENSE-2.0 
+  * 
+  * Unless required by applicable law or agreed to in writing, 
+  * software distributed under the License is distributed on an 
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+  * KIND, either express or implied. See the License for the 
+  * specific language governing permissions and limitations 
+  * under the License. 
+  */
+package org.apache.cxf.dosgi.topologymanager;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Filter;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.ExportReference;
+import org.osgi.service.remoteserviceadmin.ExportRegistration;
+import org.osgi.util.tracker.ServiceTracker;
+
+/**
+ * Tracks EndpointListeners and allows to notify them of endpoints
+ */
+public class EndpointListenerNotifier {
+    private final static Logger LOG = Logger.getLogger(EndpointListenerNotifier.class.getName());
+    private BundleContext bctx;
+    private ServiceTracker stEndpointListeners;
+    private ExportRepository exportRepository;
+
+    public EndpointListenerNotifier(BundleContext bctx, ExportRepository exportRepository) {
+        this.bctx = bctx;
+        this.exportRepository = exportRepository;
+        this.stEndpointListeners = new ServiceTracker(bctx, EndpointListener.class.getName(), null) {
+            @Override
+            public Object addingService(ServiceReference reference) {
+                LOG.fine("TopologyManager: new EndpointListener that wants to be informed about whats going on ... ");
+                notifyListenerOfAllExistingExports(reference);
+                return super.addingService(reference);
+            }
+
+            @Override
+            public void modifiedService(ServiceReference reference, Object service) {
+                LOG.fine("TopologyManager: EndpointListener changed ... ");
+                notifyListenerOfAllExistingExports(reference);
+                super.modifiedService(reference, service);
+            }
+
+        };
+
+    }
+    
+    public void start() {
+        stEndpointListeners.open();
+    }
+
+    public void stop() {
+        stEndpointListeners.close();
+    }
+    
+    private void notifyListenerOfAllExistingExports(
+            ServiceReference reference) {
+        Collection<ExportRegistration> registrations = exportRepository.getAllExportRegistrations();
+        notifyListenerOfAdding(reference, registrations );
+    }
+    
+    void nofifyEndpointListenersOfAdding(Collection<ExportRegistration> exportRegistrations) {
+        ServiceReference[] epListeners = getEndpointListeners(bctx);
+        for (ServiceReference sref : epListeners) {
+            notifyListenerOfAdding(sref, exportRegistrations);
+        }
+    }
+    
+    void notifyAllListenersOfRemoval(Collection<ExportRegistration> endpoints) {
+        ServiceReference[] refs = getEndpointListeners(bctx);
+        for (ServiceReference epListenerReference : refs) {
+            notifyListenersOfRemoval(epListenerReference, endpoints);
+        }
+    }
+    
+    void notifyListenersOfRemoval(Collection<ExportRegistration> registrations) {
+        for (ServiceReference epListenerReference : stEndpointListeners.getServiceReferences()) {
+            notifyListenersOfRemoval(epListenerReference, registrations);
+        }
+    }
+    
+    /**
+     * Notifies the listener if he is interested in the provided registrations
+     * 
+     * @param sref The ServiceReference for an EndpointListener
+     * @param exportRegistrations the registrations, the listener should be informed about
+     */
+    private void notifyListenerOfAdding(ServiceReference epListenerReference,
+                                        Collection<ExportRegistration> exportRegistrations) {
+        EndpointListener epl = (EndpointListener)bctx.getService(epListenerReference);
+        List<Filter> filters = getFiltersFromEndpointListenerScope(epListenerReference, bctx);
+
+        LOG.finer("TopologyManager: notifyListenerOfAddingIfAppropriate() ");
+        for (ExportRegistration exReg : exportRegistrations) {
+            EndpointDescription endpoint = getExportedEndpoint(exReg);
+            List<Filter> matchingFilters = getMatchingFilters(filters, endpoint);
+            for (Filter filter : matchingFilters) {
+                epl.endpointAdded(endpoint, filter.toString());
+            }
+        }
+
+    }
+
+    void notifyListenersOfRemoval(ServiceReference epListenerReference,
+                                          Collection<ExportRegistration> exportRegistrations) {
+        EndpointListener epl = (EndpointListener)bctx.getService(epListenerReference);
+        List<Filter> filters = getFiltersFromEndpointListenerScope(epListenerReference, bctx);
+        LOG.finer("TopologyManager: notifyListenerOfREMOVALIfAppropriate() ");
+        for (ExportRegistration exReg : exportRegistrations) {
+            EndpointDescription endpoint = getExportedEndpoint(exReg);
+            List<Filter> matchingFilters = getMatchingFilters(filters, endpoint);
+            for (Filter filter : matchingFilters) {
+                epl.endpointRemoved(endpoint, filter.toString());
+            }
+        }
+    }
+    
+    static List<Filter> getFiltersFromEndpointListenerScope(ServiceReference sref,BundleContext bctx) {
+        List<Filter> filters = new ArrayList<Filter>();
+        try {
+            Object fo = sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE);
+            if (fo instanceof String) {
+                filters.add(bctx.createFilter((String) fo));
+            } else if (fo instanceof String[]) {
+                String[] foArray = (String[]) fo;
+                for (String f : foArray) {
+                    filters.add(bctx.createFilter(f));
+                }
+            } else if (fo instanceof Collection) {
+                @SuppressWarnings("rawtypes")
+                Collection c = (Collection) fo;
+                for (Object o : c) {
+                    if (o instanceof String) {
+                        filters.add(bctx.createFilter((String) o));
+                    } else {
+                        LOG.warning("Component of a filter is not a string -> skipped !");
+                    }
+                }
+            }
+        } catch (InvalidSyntaxException e) {
+            LOG.log(Level.SEVERE, e.getMessage(), e);
+        }
+        return filters;
+    }
+    
+    private List<Filter> getMatchingFilters(List<Filter> filters,
+            EndpointDescription endpoint) {
+        List<Filter> matchingFilters = new ArrayList<Filter>();
+        Dictionary<String, Object> d = getEndpointProperties(endpoint);
+
+        for (Filter filter : filters) {
+            if (LOG.isLoggable(Level.FINE)) {
+                LOG.fine("Matching: " + filter + "  against " + d);
+            }
+            if (filter.match(d)) {
+                LOG.fine("Listener matched one of the Endpoints !!!! --> calling removed() ...");
+                matchingFilters.add(filter);
+            }
+        }
+        return matchingFilters;
+    }
+
+    /** 
+     * Find all EndpointListeners; They must have the Scope property otherwise they have to be ignored
+     * @param bctx
+     * @return
+     * @throws InvalidSyntaxException
+     */
+   private static ServiceReference[] getEndpointListeners(BundleContext bctx) {
+       ServiceReference[] result = null;
+       try {
+           String filter = "(" + EndpointListener.ENDPOINT_LISTENER_SCOPE + "=*)";
+           result = bctx.getServiceReferences(EndpointListener.class.getName(), filter);
+       } catch (InvalidSyntaxException e) {
+           LOG.log(Level.SEVERE, e.getMessage(), e);
+       }
+       return (result == null) ? new ServiceReference[]{} : result;
+   }
+   
+   /**
+    * Retrieve exported Endpoint while handling null
+    * @param exReg
+    * @return exported Endpoint or null if not present
+    */
+   private EndpointDescription getExportedEndpoint(ExportRegistration exReg) {
+       ExportReference ref = (exReg == null) ? null : exReg.getExportReference();
+       return (ref == null) ? null : ref.getExportedEndpoint(); 
+   }
+   
+   /**
+    * Retrieve endpoint properties as Dictionary
+    * 
+    * @param ep
+    * @return endpoint properties (will never return null) 
+    */
+   private Dictionary<String, Object> getEndpointProperties(EndpointDescription ep) {
+       if (ep == null || ep.getProperties() == null) {
+           return new Hashtable<String, Object>();
+       } else {
+           return new Hashtable<String, Object>(ep.getProperties());
+       }
+   }
+}

Propchange: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/EndpointListenerNotifier.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/ExportRepository.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/ExportRepository.java?rev=1403685&view=auto
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/ExportRepository.java (added)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/ExportRepository.java Tue Oct 30 13:10:20 2012
@@ -0,0 +1,27 @@
+/** 
+  * Licensed to the Apache Software Foundation (ASF) under one 
+  * or more contributor license agreements. See the NOTICE file 
+  * distributed with this work for additional information 
+  * regarding copyright ownership. The ASF licenses this file 
+  * to you under the Apache License, Version 2.0 (the 
+  * "License"); you may not use this file except in compliance 
+  * with the License. You may obtain a copy of the License at 
+  * 
+  * http://www.apache.org/licenses/LICENSE-2.0 
+  * 
+  * Unless required by applicable law or agreed to in writing, 
+  * software distributed under the License is distributed on an 
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+  * KIND, either express or implied. See the License for the 
+  * specific language governing permissions and limitations 
+  * under the License. 
+  */
+package org.apache.cxf.dosgi.topologymanager;
+
+import java.util.Collection;
+
+import org.osgi.service.remoteserviceadmin.ExportRegistration;
+
+public interface ExportRepository {
+    Collection<ExportRegistration> getAllExportRegistrations();
+}

Propchange: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/ExportRepository.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/ListenerHookImpl.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/ListenerHookImpl.java?rev=1403685&r1=1403684&r2=1403685&view=diff
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/ListenerHookImpl.java (original)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/ListenerHookImpl.java Tue Oct 30 13:10:20 2012
@@ -21,6 +21,7 @@ package org.apache.cxf.dosgi.topologyman
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.UUID;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.regex.Matcher;
@@ -28,14 +29,17 @@ import java.util.regex.Pattern;
 
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceRegistration;
 import org.osgi.framework.hooks.service.ListenerHook;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
 
+/**
+ * Listens for service listeners and informs ServiceInterestListener about added and removed interest
+ * in services
+ */
 public class ListenerHookImpl implements ListenerHook {
     private static final Logger LOG = Logger.getLogger(ListenerHookImpl.class.getName());
     private BundleContext bctx;
-    private ServiceRegistration serviceRegistrations;
-    private TopologyManagerImport tm;
+    private ServiceInterestListener serviceInterestListener;
 
     private final static String CLASS_NAME_EXPRESSION = ".*\\(" + Constants.OBJECTCLASS
                                                         + "=([a-zA-Z_0-9.]+)\\).*";
@@ -54,21 +58,13 @@ public class ListenerHookImpl implements
         SYSTEM_PACKAGES.add("java.net.ContentHandler");
     }
 
-    public ListenerHookImpl(BundleContext bc, TopologyManagerImport tm) {
+    public ListenerHookImpl(BundleContext bc, ServiceInterestListener serviceInterestListener) {
         bctx = bc;
-        this.tm = tm;
+        this.serviceInterestListener = serviceInterestListener;
     }
 
-    protected void start() {
-        // TODO: properties ?
-        serviceRegistrations = bctx.registerService(ListenerHook.class.getName(), this, null);
-    }
-
-    protected void stop() {
-        serviceRegistrations.unregister();
-    }
-
-    public void added(Collection listeners) {
+    @SuppressWarnings("rawtypes")
+    public void added(Collection/* <ListenerInfo> */ listeners) {
         if (LOG.isLoggable(Level.FINE)) {
             LOG.fine("ListenerHookImpl: added() " + listeners);
         }
@@ -93,14 +89,15 @@ public class ListenerHookImpl implements
                                    + className + "]");
                 continue;
             }
-
-            tm.addServiceInterest(listenerInfo.getFilter());
+            String exFilter = extendFilter(listenerInfo.getFilter(), bctx);
+            serviceInterestListener.addServiceInterest(exFilter);
 
         }
 
     }
 
-    public void removed(Collection listeners) {
+    @SuppressWarnings("rawtypes")
+    public void removed(Collection/* <ListenerInfo> */ listeners) {
         if (LOG.isLoggable(Level.FINE)) {
             LOG.fine("ListenerHookImpl: removed: " + listeners);
         }
@@ -110,8 +107,8 @@ public class ListenerHookImpl implements
             LOG.fine("*** Filter: " + listenerInfo.getFilter());
 
             // TODO: determine if service was handled ? 
-
-            tm.removeServiceInterest(listenerInfo.getFilter());
+            String exFilter = extendFilter(listenerInfo.getFilter(), bctx);
+            serviceInterestListener.removeServiceInterest(exFilter);
 
         }
 
@@ -134,11 +131,25 @@ public class ListenerHookImpl implements
 
         for (String p : SYSTEM_PACKAGES) {
             if (className.startsWith(p)) {
-                LOG.fine("Lookup for " + className + " is ignored");
                 return true;
             }
         }
         return false;
     }
 
+    static String getUUID(BundleContext bctx) {
+        synchronized ("org.osgi.framework.uuid") {
+            String uuid = bctx.getProperty("org.osgi.framework.uuid");
+            if(uuid==null){
+                uuid = UUID.randomUUID().toString();
+                System.setProperty("org.osgi.framework.uuid", uuid);
+            }
+            return uuid;
+        }
+    }
+    
+    static String extendFilter(String filter, BundleContext bctx) {
+        return "(&" + filter + "(!(" + RemoteConstants.ENDPOINT_FRAMEWORK_UUID + "=" + getUUID(bctx) + ")))";
+    }
+
 }

Added: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RefManager.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RefManager.java?rev=1403685&view=auto
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RefManager.java (added)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RefManager.java Tue Oct 30 13:10:20 2012
@@ -0,0 +1,64 @@
+/** 
+  * Licensed to the Apache Software Foundation (ASF) under one 
+  * or more contributor license agreements. See the NOTICE file 
+  * distributed with this work for additional information 
+  * regarding copyright ownership. The ASF licenses this file 
+  * to you under the Apache License, Version 2.0 (the 
+  * "License"); you may not use this file except in compliance 
+  * with the License. You may obtain a copy of the License at 
+  * 
+  * http://www.apache.org/licenses/LICENSE-2.0 
+  * 
+  * Unless required by applicable law or agreed to in writing, 
+  * software distributed under the License is distributed on an 
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+  * KIND, either express or implied. See the License for the 
+  * specific language governing permissions and limitations 
+  * under the License. 
+  */
+package org.apache.cxf.dosgi.topologymanager;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Manages reference counters to keys
+ */
+public class RefManager {
+    Map<String, Integer> refMap = new HashMap<String, Integer>();
+
+    /**
+     * Increase reference count for this key
+     * 
+     * @return reference Counter
+     */
+    public int addReference(String key) {
+        synchronized (refMap) {
+            Integer refCount = refMap.get(key);
+            if (refCount == null) {
+                refCount = 0;
+            }
+            refMap.put(key, refCount++);
+            return refCount;
+        }
+    }
+
+    /**
+     * Decrease Reference count for this key
+     * 
+     * @param key
+     * @return
+     */
+    public int removeReference(String key) {
+        synchronized (refMap) {
+            Integer refCount = refMap.get(key);
+            refCount--;
+            if (refCount > 0) {
+                refMap.put(key, refCount);
+            } else {
+                refMap.remove(key);
+            }
+            return refCount;
+        }
+    }
+}

Propchange: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RefManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminLifeCycleListener.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminLifeCycleListener.java?rev=1403685&view=auto
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminLifeCycleListener.java (added)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminLifeCycleListener.java Tue Oct 30 13:10:20 2012
@@ -0,0 +1,29 @@
+/** 
+  * Licensed to the Apache Software Foundation (ASF) under one 
+  * or more contributor license agreements. See the NOTICE file 
+  * distributed with this work for additional information 
+  * regarding copyright ownership. The ASF licenses this file 
+  * to you under the Apache License, Version 2.0 (the 
+  * "License"); you may not use this file except in compliance 
+  * with the License. You may obtain a copy of the License at 
+  * 
+  * http://www.apache.org/licenses/LICENSE-2.0 
+  * 
+  * Unless required by applicable law or agreed to in writing, 
+  * software distributed under the License is distributed on an 
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+  * KIND, either express or implied. See the License for the 
+  * specific language governing permissions and limitations 
+  * under the License. 
+  */
+package org.apache.cxf.dosgi.topologymanager;
+
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
+
+/**
+ * Callback interface to be notified of RemoteServiceAdmin services that are added or removed
+ */
+public interface RemoteServiceAdminLifeCycleListener {
+    void added(RemoteServiceAdmin rsa);
+    void removed(RemoteServiceAdmin rsa);
+}

Propchange: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminLifeCycleListener.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminTracker.java (from r1401717, cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminList.java)
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminTracker.java?p2=cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminTracker.java&p1=cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminList.java&r1=1401717&r2=1403685&rev=1403685&view=diff
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminList.java (original)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/RemoteServiceAdminTracker.java Tue Oct 30 13:10:20 2012
@@ -1,113 +1,72 @@
 /** 
-  * Licensed to the Apache Software Foundation (ASF) under one 
-  * or more contributor license agreements. See the NOTICE file 
-  * distributed with this work for additional information 
-  * regarding copyright ownership. The ASF licenses this file 
-  * to you under the Apache License, Version 2.0 (the 
-  * "License"); you may not use this file except in compliance 
-  * with the License. You may obtain a copy of the License at 
-  * 
-  * http://www.apache.org/licenses/LICENSE-2.0 
-  * 
-  * Unless required by applicable law or agreed to in writing, 
-  * software distributed under the License is distributed on an 
-  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
-  * KIND, either express or implied. See the License for the 
-  * specific language governing permissions and limitations 
-  * under the License. 
-  */
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements. See the NOTICE file 
+ * distributed with this work for additional information 
+ * regarding copyright ownership. The ASF licenses this file 
+ * to you under the Apache License, Version 2.0 (the 
+ * "License"); you may not use this file except in compliance 
+ * with the License. You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ * 
+ * Unless required by applicable law or agreed to in writing, 
+ * software distributed under the License is distributed on an 
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied. See the License for the 
+ * specific language governing permissions and limitations 
+ * under the License. 
+ */
 package org.apache.cxf.dosgi.topologymanager;
 
 import java.util.ArrayList;
-import java.util.logging.Logger;
+import java.util.List;
 
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
 import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
 import org.osgi.util.tracker.ServiceTracker;
 
-public class RemoteServiceAdminList extends ArrayList<RemoteServiceAdmin>{
-
-    private BundleContext bctx;
-    private ServiceTracker stRemoteServiceAdmin;
-    private TopologyManager topManager;
-    private TopologyManagerImport topManagerImport;
-    
-    private final static Logger LOG = Logger.getLogger(RemoteServiceAdminList.class.getName()); 
-    
-    
-    public RemoteServiceAdminList(BundleContext bc) {
-
-        bctx = bc;
-        
-        final RemoteServiceAdminList rsal = this;
-        
-        stRemoteServiceAdmin = new ServiceTracker(bctx, RemoteServiceAdmin.class.getName(), null) {
-            @Override
-            public Object addingService(ServiceReference reference) {
-                LOG.info("Adding RemoteServiceAdmin to list of admins ");
-                RemoteServiceAdmin rsa = (RemoteServiceAdmin)bctx.getService(reference);
-                synchronized (rsal) {
-                    rsal.add(rsa);
-                }
-                LOG.info("enlisted RemoteEventAdmins: " + this.size());
-
-                triggerExportImportForRemoteServiceAdmin(rsa);
-
-                return super.addingService(reference);
-            }
-
-            @Override
-            public void removedService(ServiceReference reference, Object service) {
-                LOG.info("TopologyManager: Removing RemoteServiceAdmin from list of admins ");
-                synchronized (rsal) {
-                    rsal.remove(service);
-                }
-
-                // TODO: remove service exports from management structure and notify discovery stuff...
-                removeRemoteServiceAdmin((RemoteServiceAdmin)service);
-
-                LOG.info("TopologyManager: enlisted RemoteEventAdmins: " + rsal.size());
-
-                super.removedService(reference, service);
+/**
+ * Provides a list of RemoteServiceAdmin services and triggers RemoteServiceAdminLifeCycleListeners
+ * when RemoteServiceAdmin services are added or removed
+ */
+public class RemoteServiceAdminTracker extends ServiceTracker {
+    private List<RemoteServiceAdminLifeCycleListener> listeners;
+
+    public RemoteServiceAdminTracker(BundleContext bc) {
+        super(bc, RemoteServiceAdmin.class.getName(), null);
+        this.listeners = new ArrayList<RemoteServiceAdminLifeCycleListener>();
+    }
+    
+    public void addListener(RemoteServiceAdminLifeCycleListener listener) {
+        listeners.add(listener);
+    }
+
+    @Override
+    public Object addingService(ServiceReference reference) {
+        for (RemoteServiceAdminLifeCycleListener listener : listeners) {
+            listener.added((RemoteServiceAdmin) context.getService(reference));
+        }
+        return super.addingService(reference);
+    }
+
+    @Override
+    public void removedService(ServiceReference reference, Object service) {
+        for (RemoteServiceAdminLifeCycleListener listener : listeners) {
+            listener.removed((RemoteServiceAdmin) context.getService(reference));
+        }
+        super.removedService(reference, service);
+    }
+
+    public List<RemoteServiceAdmin> getList() {
+        ArrayList<RemoteServiceAdmin> list = new ArrayList<RemoteServiceAdmin>();
+        ServiceReference[] refs = getServiceReferences();
+        if (refs != null) {
+            for (ServiceReference ref : refs) {
+                list.add((RemoteServiceAdmin) context.getService(ref));
             }
-        };
-    
-    
-        
-        
-    } 
-
-    
-    protected void removeRemoteServiceAdmin(RemoteServiceAdmin service) {
-        topManager.removeRemoteServiceAdmin(service);
+        }
+        return list;
     }
 
-
-    protected void triggerExportImportForRemoteServiceAdmin(RemoteServiceAdmin rsa) {
-        topManager.triggerExportImportForRemoteServiceAdmin(rsa);
-        topManagerImport.triggerExportImportForRemoteServiceAdmin(rsa);
-    }
-
-
-    public void start(){
-        stRemoteServiceAdmin.open();
-    }
-
-    
-    public void stop(){
-        stRemoteServiceAdmin.close();
-    }
-
-
-    public void setTopologyManager(TopologyManager tm) {
-        topManager = tm;
-    }
-
-
-    public void setTopologyManagerImport(TopologyManagerImport tmi) {
-        topManagerImport = tmi;
-    }
-    
-    
 }

Added: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/ServiceInterestListener.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/ServiceInterestListener.java?rev=1403685&view=auto
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/ServiceInterestListener.java (added)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/ServiceInterestListener.java Tue Oct 30 13:10:20 2012
@@ -0,0 +1,27 @@
+/** 
+  * Licensed to the Apache Software Foundation (ASF) under one 
+  * or more contributor license agreements. See the NOTICE file 
+  * distributed with this work for additional information 
+  * regarding copyright ownership. The ASF licenses this file 
+  * to you under the Apache License, Version 2.0 (the 
+  * "License"); you may not use this file except in compliance 
+  * with the License. You may obtain a copy of the License at 
+  * 
+  * http://www.apache.org/licenses/LICENSE-2.0 
+  * 
+  * Unless required by applicable law or agreed to in writing, 
+  * software distributed under the License is distributed on an 
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+  * KIND, either express or implied. See the License for the 
+  * specific language governing permissions and limitations 
+  * under the License. 
+  */
+package org.apache.cxf.dosgi.topologymanager;
+
+public interface ServiceInterestListener {
+
+    public abstract void addServiceInterest(String filter);
+
+    public abstract void removeServiceInterest(String filter);
+
+}
\ No newline at end of file

Propchange: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/ServiceInterestListener.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/TopologyManager.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/TopologyManager.java?rev=1403685&r1=1403684&r2=1403685&view=diff
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/TopologyManager.java (original)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/TopologyManager.java Tue Oct 30 13:10:20 2012
@@ -21,8 +21,6 @@ package org.apache.cxf.dosgi.topologyman
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Dictionary;
-import java.util.Hashtable;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -34,31 +32,29 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.osgi.framework.BundleContext;
-import org.osgi.framework.Filter;
 import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceEvent;
+import org.osgi.framework.ServiceListener;
 import org.osgi.framework.ServiceReference;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
 import org.osgi.service.remoteserviceadmin.ExportReference;
 import org.osgi.service.remoteserviceadmin.ExportRegistration;
 import org.osgi.service.remoteserviceadmin.RemoteConstants;
 import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
-import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdminEvent;
 
 /**
  * <li>This class keeps a list of currently imported and exported endpoints <li>It requests the import/export
  * from RemoteAdminServices
  */
-public class TopologyManager {
+public class TopologyManager implements ExportRepository {
 
     private final static Logger LOG = Logger.getLogger(TopologyManager.class.getName());
 
-    private ExecutorService execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS,
-                                                                 new LinkedBlockingQueue<Runnable>());
-
-    private final RemoteServiceAdminList remoteServiceAdminList;
-
-    private ServiceListenerImpl serviceListerner;
+    private final BundleContext bctx;
+    private final EndpointListenerNotifier epListenerNotifier;
+    private final ExecutorService execService;
+    private final RemoteServiceAdminTracker remoteServiceAdminTracker;
+    private final ServiceListener serviceListerner;
 
     /**
      * Holds all services that are exported by this TopologyManager for each ServiceReference that should be
@@ -81,46 +77,48 @@ public class TopologyManager {
                       Map<RemoteServiceAdmin, Collection<ExportRegistration>>> exportedServices = 
         new LinkedHashMap<ServiceReference, Map<RemoteServiceAdmin, Collection<ExportRegistration>>>();
 
-    private BundleContext bctx;
-
-    private ServiceTracker stEndpointListeners;
-
-    public TopologyManager(BundleContext ctx, final RemoteServiceAdminList rsaList) {
+    public TopologyManager(BundleContext ctx, RemoteServiceAdminTracker rsaTracker) {
+        execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
         bctx = ctx;
+        this.remoteServiceAdminTracker = rsaTracker;
+        this.remoteServiceAdminTracker.addListener(new RemoteServiceAdminLifeCycleListener() {
 
-        remoteServiceAdminList = rsaList;
-
-        stEndpointListeners = new ServiceTracker(ctx, EndpointListener.class.getName(), null) {
-            @Override
-            public Object addingService(ServiceReference reference) {
-                LOG.fine("TopologyManager: new EndpointListener that wants to be informed about whats going on ... ");
-
-                notify(reference);
-
-                return super.addingService(reference);
+            public void added(RemoteServiceAdmin rsa) {
+                triggerExportForRemoteServiceAdmin(rsa);
             }
 
-            @Override
-            public void modifiedService(ServiceReference reference, Object service) {
-                LOG.fine("TopologyManager: EndpointListener changed ... ");
-                notify(reference);
-                super.modifiedService(reference, service);
+            public void removed(RemoteServiceAdmin rsa) {
+                // TODO: remove service exports from management structure and notify
+                // discovery stuff...
+                removeRemoteServiceAdmin(rsa);
             }
-
-            private void notify(ServiceReference reference) {
-                synchronized (exportedServices) {
-                    for (Map<RemoteServiceAdmin, Collection<ExportRegistration>> exports : exportedServices.values()) {
-                        for (Collection<ExportRegistration> regs : exports.values()) {
-                            if (regs != null)
-                                notifyListenerOfAddingIfAppropriate(reference, regs);
-                        }
+        });
+        serviceListerner = new ServiceListener() {
+            public void serviceChanged(ServiceEvent event) {
+                LOG.fine("Received ServiceEvent: " + event);
+                ServiceReference sref = event.getServiceReference();
+                if (event.getType() == ServiceEvent.REGISTERED) {
+                    LOG.fine("Registered");
+                    if (shouldExportService(sref)) {
+                        exportService(sref);
                     }
+                } else if (event.getType() == ServiceEvent.UNREGISTERING) {
+                    removeService(sref);
                 }
             }
         };
-
-        serviceListerner = new ServiceListenerImpl(bctx, this);
-
+        
+        epListenerNotifier = new EndpointListenerNotifier(ctx, this);
+    }
+    
+    /**
+     * checks if a Service is intended to be exported
+     */
+    private boolean shouldExportService(ServiceReference sref) {
+        if (sref.getProperty(RemoteConstants.SERVICE_EXPORTED_INTERFACES) != null) {
+            return true;
+        }
+        return false;
     }
 
     protected void removeRemoteServiceAdmin(RemoteServiceAdmin rsa) {
@@ -132,10 +130,7 @@ public class TopologyManager {
                     Collection<ExportRegistration> endpoints = exports.getValue().get(rsa);
                     // TODO for each notify discovery......
 
-                    ServiceReference[] refs = getEndpointListeners(bctx);
-                    for (ServiceReference sref : refs) {
-                        notifyListenersOfRemovalIfAppropriate(sref, endpoints);
-                    }
+                    this.epListenerNotifier.notifyAllListenersOfRemoval(endpoints);
 
                     // remove all management information for the RemoteServiceAdmin
                     exports.getValue().remove(rsa);
@@ -144,36 +139,32 @@ public class TopologyManager {
         }
     }
 
-    protected void triggerExportImportForRemoteServiceAdmin(RemoteServiceAdmin rsa) {
+    protected void triggerExportForRemoteServiceAdmin(RemoteServiceAdmin rsa) {
         LOG.finer("TopologyManager: triggerExportImportForRemoteSericeAdmin()");
 
         synchronized (exportedServices) {
-            for (Map.Entry<ServiceReference, Map<RemoteServiceAdmin, Collection<ExportRegistration>>> exports : exportedServices
-                .entrySet()) {
-                if (exports.getValue().containsKey(rsa)) {
+            for (ServiceReference serviceRef : exportedServices.keySet()) {
+                Map<RemoteServiceAdmin, Collection<ExportRegistration>> rsaExports = exportedServices.get(serviceRef);
+                String bundleName = serviceRef.getBundle().getSymbolicName();
+                if (rsaExports.containsKey(rsa)) {
                     // already handled....
-                    LOG.fine("TopologyManager: service from bundle "
-                             + exports.getKey().getBundle().getSymbolicName()
-                             + "is already handled by this RSA");
+                    LOG.fine("TopologyManager: service from bundle " + bundleName + "is already handled by this RSA");
                 } else {
                     // trigger export of this service....
-                    LOG.fine("TopologyManager: service from bundle "
-                             + exports.getKey().getBundle().getSymbolicName()
-                             + " is to be exported by this RSA");
-                    triggerExport(exports.getKey());
+                    LOG.fine("TopologyManager: service from bundle " + bundleName + " is to be exported by this RSA");
+                    exportService(serviceRef);
                 }
-
             }
         }
 
     }
 
     public void start() {
-        stEndpointListeners.open();
-        serviceListerner.start();
-
+        epListenerNotifier.start();
+        bctx.addServiceListener(serviceListerner);
+        remoteServiceAdminTracker.open();
         try {
-            checkExistingServices();
+            exportExistingServices();
         } catch (InvalidSyntaxException e) {
             LOG.log(Level.FINER, "Failed to check existing services.", e);
         }
@@ -181,8 +172,9 @@ public class TopologyManager {
 
     public void stop() {
         execService.shutdown();
-        stEndpointListeners.close();
-        serviceListerner.stop();
+        remoteServiceAdminTracker.close();
+        bctx.removeServiceListener(serviceListerner);
+        epListenerNotifier.stop();
     }
 
     void removeService(ServiceReference sref) {
@@ -192,7 +184,7 @@ public class TopologyManager {
                 for (Map.Entry<RemoteServiceAdmin, Collection<ExportRegistration>> entry : rsas.entrySet()) {
                     if (entry.getValue() != null) {
                     	Collection<ExportRegistration> registrations = entry.getValue();
-                        notifyListenersOfRemovalIfAppropriate(registrations);
+                    	this.epListenerNotifier.notifyListenersOfRemoval(registrations);
                         for (ExportRegistration exReg : registrations) {
                             if (exReg != null) {
                                  exReg.close();
@@ -205,15 +197,8 @@ public class TopologyManager {
             }
         }
     }
-
-    private void notifyListenersOfRemovalIfAppropriate(Collection<ExportRegistration> registrations) {
-    	for (ServiceReference endpointReference : stEndpointListeners.getServiceReferences()) {
-    	    notifyListenersOfRemovalIfAppropriate(endpointReference, registrations);
-    	}
-    }
     
     protected void exportService(ServiceReference sref) {
-
         // add to local list of services that should/are be exported
         synchronized (exportedServices) {
             LOG.info("TopologyManager: adding service to exportedServices list to export it --- from bundle:  "
@@ -221,171 +206,83 @@ public class TopologyManager {
             exportedServices.put(sref,
                                  new LinkedHashMap<RemoteServiceAdmin, Collection<ExportRegistration>>());
         }
-
         triggerExport(sref);
-
     }
 
     private void triggerExport(final ServiceReference sref) {
         execService.execute(new Runnable() {
             public void run() {
-                LOG.finer("TopologyManager: exporting service ...");
-
-                Map<RemoteServiceAdmin, Collection<ExportRegistration>> exports = null;
-
-                synchronized (exportedServices) {
-                    exports = Collections.synchronizedMap(exportedServices.get(sref));
-                }
-                // FIXME: Not thread safe...?
-                if (exports != null) {
-                    if(remoteServiceAdminList == null || remoteServiceAdminList.size() == 0) {
-                        LOG.log(Level.SEVERE, "No RemoteServiceAdmin available! Unable to export service from bundle {0}, interfaces: {1}",
-                                new Object[]{sref.getBundle().getSymbolicName(), sref.getProperty(org.osgi.framework.Constants.OBJECTCLASS)});
-                    }
-
-                    synchronized (remoteServiceAdminList) {
-                        for (final RemoteServiceAdmin remoteServiceAdmin : remoteServiceAdminList) {
-                            LOG
-                                .info("TopologyManager: handling remoteServiceAdmin "
-                                      + remoteServiceAdmin);
-
-                            if (exports.containsKey(remoteServiceAdmin)) {
-                                // already handled by this remoteServiceAdmin
-                                LOG.fine("TopologyManager: already handled by this remoteServiceAdmin -> skipping");
-                            } else {
-                                // TODO: additional parameter Map ?
-                                LOG.fine("TopologyManager: exporting ...");
-                                Collection<ExportRegistration> endpoints = remoteServiceAdmin
-                                    .exportService(sref, null);
-                                if (endpoints == null) {
-                                    // TODO export failed -> What should be done here?
-                                    LOG.severe("TopologyManager: export failed");
-                                    exports.put(remoteServiceAdmin, null);
-                                } else {
-                                    LOG.info("TopologyManager: export sucessful Endpoints:" + endpoints);
-                                    // enqueue in local list of endpoints
-                                    exports.put(remoteServiceAdmin, endpoints);
-
-                                    nofifyEndpointListenersOfAdding(endpoints);
-                                }
-                            }
-                        }
-                    }
-                }
+                doExportService(sref);
             }
-
         });
     }
 
-    protected void nofifyEndpointListenersOfAdding(Collection<ExportRegistration> exportRegistrations) {
-        ServiceReference[] epListeners = getEndpointListeners(bctx);
-        for (ServiceReference sref : epListeners) {
-            notifyListenerOfAddingIfAppropriate(sref, exportRegistrations);
-        }
-    }
+    private void doExportService(final ServiceReference sref) {
+        LOG.finer("TopologyManager: exporting service ...");
 
-     /** 
-      * Find all EndpointListeners; They must have the Scope property otherwise they have to be ignored
-      * @param bctx
-      * @return
-      * @throws InvalidSyntaxException
-      */
-    protected static ServiceReference[] getEndpointListeners(BundleContext bctx) {
-        ServiceReference[] result = null;
-        try {
-            String filter = "(" + EndpointListener.ENDPOINT_LISTENER_SCOPE + "=*)";
-            result = bctx.getServiceReferences(EndpointListener.class.getName(), filter);
-        } catch (InvalidSyntaxException e) {
-            LOG.log(Level.SEVERE, e.getMessage(), e);
-        }
-        return (result == null) ? new ServiceReference[]{} : result;
-    }
+        Map<RemoteServiceAdmin, Collection<ExportRegistration>> exports = null;
 
-    /**
-     * Notifies the listener if he is interested in the provided registrations
-     * 
-     * @param sref The ServiceReference for an EndpointListener
-     * @param exportRegistrations the registrations, the listener should be informed about
-     */
-    protected void notifyListenerOfAddingIfAppropriate(ServiceReference sref,
-                                                       Collection<ExportRegistration> exportRegistrations) {
-
-        EndpointListener epl = (EndpointListener)bctx.getService(sref);
-        LOG.finer("TopologyManager: notifyListenerOfAddingIfAppropriate() ");
-        List<Filter> filters = getFiltersFromEndpointListenerScope(sref, bctx);
-
-        for (ExportRegistration exReg : exportRegistrations) {
-            EndpointDescription endpoint = getExportedEndpoint(exReg);
-            List<Filter> matchingFilters = getMatchingFilters(filters, endpoint);
-            for (Filter filter : matchingFilters) {
-                epl.endpointAdded(endpoint, filter.toString());
-            }
+        synchronized (exportedServices) {
+            exports = Collections.synchronizedMap(exportedServices.get(sref));
+        }
+        // FIXME: Not thread safe...?
+        if (exports == null) {
+            return;
+        }
+        if (remoteServiceAdminTracker == null || remoteServiceAdminTracker.size() == 0) {
+            LOG.log(Level.SEVERE,
+                    "No RemoteServiceAdmin available! Unable to export service from bundle {0}, interfaces: {1}",
+                    new Object[] {
+                            sref.getBundle().getSymbolicName(),
+                            sref.getProperty(org.osgi.framework.Constants.OBJECTCLASS) });
         }
 
-    }
-
-    protected void notifyListenersOfRemovalIfAppropriate(ServiceReference sref,
-            Collection<ExportRegistration> exportRegistrations) {
 
-        EndpointListener epl = (EndpointListener) bctx.getService(sref);
-        LOG.finer("TopologyManager: notifyListenerOfREMOVALIfAppropriate() ");
-        List<Filter> filters = getFiltersFromEndpointListenerScope(sref, bctx);
+        for (final RemoteServiceAdmin remoteServiceAdmin : remoteServiceAdminTracker
+                .getList()) {
+            LOG.info("TopologyManager: handling remoteServiceAdmin "
+                    + remoteServiceAdmin);
+
+            if (exports.containsKey(remoteServiceAdmin)) {
+                // already handled by this remoteServiceAdmin
+                LOG.fine("TopologyManager: already handled by this remoteServiceAdmin -> skipping");
+            } else {
+                // TODO: additional parameter Map ?
+                LOG.fine("TopologyManager: exporting ...");
+                Collection<ExportRegistration> endpoints = remoteServiceAdmin
+                        .exportService(sref, null);
+                if (endpoints == null) {
+                    // TODO export failed -> What should be done here?
+                    LOG.severe("TopologyManager: export failed");
+                    exports.put(remoteServiceAdmin, null);
+                } else {
+                    LOG.info("TopologyManager: export sucessful Endpoints:"
+                            + endpoints);
+                    // enqueue in local list of endpoints
+                    exports.put(remoteServiceAdmin, endpoints);
 
-        for (ExportRegistration exReg : exportRegistrations) {
-            EndpointDescription endpoint = getExportedEndpoint(exReg);
-            List<Filter> matchingFilters = getMatchingFilters(filters, endpoint);
-            for (Filter filter : matchingFilters) {
-                epl.endpointRemoved(endpoint, filter.toString());
+                    epListenerNotifier
+                            .nofifyEndpointListenersOfAdding(endpoints);
+                }
             }
         }
     }
     
-    static List<Filter> getFiltersFromEndpointListenerScope(ServiceReference sref,BundleContext bctx) {
-        List<Filter> filters = new ArrayList<Filter>();
-        try {
-            Object fo = sref.getProperty(EndpointListener.ENDPOINT_LISTENER_SCOPE);
-            if (fo instanceof String) {
-                filters.add(bctx.createFilter((String) fo));
-            } else if (fo instanceof String[]) {
-                String[] foArray = (String[]) fo;
-                for (String f : foArray) {
-                    filters.add(bctx.createFilter(f));
-                }
-            } else if (fo instanceof Collection) {
-                @SuppressWarnings("rawtypes")
-                Collection c = (Collection) fo;
-                for (Object o : c) {
-                    if (o instanceof String) {
-                        filters.add(bctx.createFilter((String) o));
-                    } else {
-                        LOG.warning("Component of a filter is not a string -> skipped !");
+    public Collection<ExportRegistration> getAllExportRegistrations() {
+        List<ExportRegistration> registrations = new ArrayList<ExportRegistration>();
+        synchronized (exportedServices) {
+            for (Map<RemoteServiceAdmin, Collection<ExportRegistration>> exports : exportedServices.values()) {
+                for (Collection<ExportRegistration> regs : exports.values()) {
+                    if (regs != null) {
+                        registrations.addAll(regs);
                     }
                 }
             }
-        } catch (InvalidSyntaxException e) {
-            LOG.log(Level.SEVERE, e.getMessage(), e);
-        }
-        return filters;
-    }
-
-    private List<Filter> getMatchingFilters(List<Filter> filters,
-            EndpointDescription endpoint) {
-        List<Filter> matchingFilters = new ArrayList<Filter>();
-        Dictionary<String, Object> d = getEndpointProperties(endpoint);
-
-        for (Filter filter : filters) {
-            if (LOG.isLoggable(Level.FINE)) {
-                LOG.fine("Matching: " + filter + "  against " + d);
-            }
-            if (filter.match(d)) {
-                LOG.fine("Listener matched one of the Endpoints !!!! --> calling removed() ...");
-                matchingFilters.add(filter);
-            }
         }
-        return matchingFilters;
+        return registrations;
     }
 
-    private void checkExistingServices() throws InvalidSyntaxException {
+    private void exportExistingServices() throws InvalidSyntaxException {
         String filter = "(" + RemoteConstants.SERVICE_EXPORTED_INTERFACES + "=*)";
         ServiceReference[] references = bctx.getServiceReferences(null, filter);
 
@@ -423,27 +320,10 @@ public class TopologyManager {
         // LOG.severe("NOT implemented !!!");
     }
 
-    /**
-     * Retrieve exported Endpoint while handling null
-     * @param exReg
-     * @return exported Endpoint or null if not present
-     */
-    private EndpointDescription getExportedEndpoint(ExportRegistration exReg) {
-        ExportReference ref = (exReg == null) ? null : exReg.getExportReference();
-        return (ref == null) ? null : ref.getExportedEndpoint(); 
-    }
-    
-    /**
-     * Retrieve endpoint properties as Dictionary
-     * 
-     * @param ep
-     * @return endpoint properties (will never return null) 
-     */
-    private Dictionary<String, Object> getEndpointProperties(EndpointDescription ep) {
-        if (ep == null || ep.getProperties() == null) {
-            return new Hashtable<String, Object>();
-        } else {
-            return new Hashtable<String, Object>(ep.getProperties());
+    public void remoteAdminEvent(RemoteServiceAdminEvent event) {
+        if (event.getType() == RemoteServiceAdminEvent.EXPORT_UNREGISTRATION) {
+            removeExportReference(event.getExportReference());
         }
     }
+
 }

Modified: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/TopologyManagerImport.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/TopologyManagerImport.java?rev=1403685&r1=1403684&r2=1403685&view=diff
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/TopologyManagerImport.java (original)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/TopologyManagerImport.java Tue Oct 30 13:10:20 2012
@@ -23,8 +23,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -33,20 +33,28 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.osgi.framework.BundleContext;
+import org.osgi.framework.hooks.service.ListenerHook;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
 import org.osgi.service.remoteserviceadmin.ImportReference;
 import org.osgi.service.remoteserviceadmin.ImportRegistration;
 import org.osgi.service.remoteserviceadmin.RemoteServiceAdmin;
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdminEvent;
+import org.osgi.service.remoteserviceadmin.RemoteServiceAdminListener;
 
-public class TopologyManagerImport {
+/**
+ * Listens for remote endpoints using the EndpointListener interface and the EndpointListenerManager.
+ * Listens for local service interests using the ServiceInterestListener interface. 
+ * Manages local creation and destruction of service imports using the available RemoteServiceAdmin services. 
+ */
+public class TopologyManagerImport implements EndpointListener, RemoteServiceAdminListener, ServiceInterestListener {
 
     private final static Logger LOG = Logger.getLogger(TopologyManagerImport.class.getName());
-    private ExecutorService execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS,
-                                                                 new LinkedBlockingQueue<Runnable>());
+    private ExecutorService execService;
 
-    private final EndpointListenerImpl endpointListener;
+    private final EndpointListenerManager endpointListenerManager;
     private final BundleContext bctx;
-    private final RemoteServiceAdminList remoteServiceAdminList;
+    private final RemoteServiceAdminTracker remoteServiceAdminTracker;
     private final ListenerHookImpl listenerHook;
 
     /**
@@ -62,105 +70,98 @@ public class TopologyManagerImport {
      * counter. If an interest is removed, the related ServiceInterest object is used to reduce the reference
      * counter until it reaches zero. in this case the interest is removed.
      */
-    private final Map<String/* filter */, ImportInterest> importInterests = new HashMap<String, ImportInterest>();
-
-    private static class ImportInterest {
-        String filter;
-        int refs;
-
-        public ImportInterest(String filter) {
-            this.filter = filter;
-            refs = 1;
-        }
-
-        public int addReference() {
-            return ++refs;
-        }
-
-        public int removeReference() {
-            return --refs;
-        }
-
-    }
+    private final RefManager importInterests = new RefManager();
 
     /**
-     * FIXME: Documnet me .... !
+     * FIXME: Document me .... !
      */
     private final Map<String /* filter */, List<EndpointDescription>> importPossibilities = new HashMap<String, List<EndpointDescription>>();
     private final Map<String /* filter */, List<ImportRegistration>> importedServices = new HashMap<String, List<ImportRegistration>>();
 
-    public TopologyManagerImport(BundleContext bc, RemoteServiceAdminList rsaList) {
+    public TopologyManagerImport(BundleContext bc, RemoteServiceAdminTracker rsaTracker) {
         bctx = bc;
-        remoteServiceAdminList = rsaList;
-        endpointListener = new EndpointListenerImpl(bctx, this);
-        listenerHook = new ListenerHookImpl(bctx, this);
+        this.remoteServiceAdminTracker = rsaTracker;
+        this.remoteServiceAdminTracker.addListener(new RemoteServiceAdminLifeCycleListener() {
+            public void added(RemoteServiceAdmin rsa) {
+                triggerImportsForRemoteServiceAdmin(rsa);
+            }
+
+            public void removed(RemoteServiceAdmin rsa) {
+            }
+        });
+        endpointListenerManager = new EndpointListenerManager(bctx, this);
+        execService = new ThreadPoolExecutor(5, 10, 50, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+        listenerHook = new ListenerHookImpl(bc, this);
         
-        if(rsaList == null || rsaList.size() == 0) {
-            LOG.log(Level.WARNING, "No RemoteServiceAdmin service available!");
-        }
     }
 
     public void start() {
-        // / register the EndpointListener for discovery
-        endpointListener.start();
-        listenerHook.start();
+        bctx.registerService(RemoteServiceAdminListener.class.getName(), this, null);
+        bctx.registerService(ListenerHook.class.getName(), listenerHook, null);
+        endpointListenerManager.start();
     }
 
     public void stop() {
         execService.shutdown();
-        endpointListener.stop();
-        listenerHook.stop();
     }
-
-    protected void addServiceInterest(String filter) {
-
-        String exFilter = Utils.extendFilter(filter, bctx);
-
-        synchronized (importInterests) {
-            ImportInterest i = importInterests.get(exFilter);
-            if (i != null) {
-                i.addReference();
-            } else {
-                importInterests.put(exFilter, new ImportInterest(exFilter));
-                endpointListener.extendScope(exFilter);
-            }
+    
+    /* (non-Javadoc)
+     * @see org.apache.cxf.dosgi.topologymanager.ServiceInterestListener#addServiceInterest(java.lang.String)
+     */
+    public void addServiceInterest(String filter) {
+        if (importInterests.addReference(filter) == 1) {
+            endpointListenerManager.extendScope(filter);
         }
-
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.cxf.dosgi.topologymanager.ServiceInterestListener#removeServiceInterest(java.lang.String)
+     */
     public void removeServiceInterest(String filter) {
-
-        String exFilter = Utils.extendFilter(filter, bctx);
-
-        synchronized (importInterests) {
-            ImportInterest i = importInterests.get(exFilter);
-            if (i != null) {
-                // remove reference
-                if (i.removeReference() <= 0) {
-                    // last reference, remove from scope
-                    LOG.log(Level.FINE,
-                            "last reference to import interest is gone -> removing interest  filter: {0}",
-                            exFilter);
-                    endpointListener.reduceScope(exFilter);
-                    importInterests.remove(exFilter);
-                    List<ImportRegistration> irs = importedServices.remove(exFilter);
-                    if (irs != null) {
-                        for (ImportRegistration ir : irs) {
-                            if (ir != null) {
-                                ir.close();
-                            }
-                        }
+        if (importInterests.removeReference(filter) <= 0) {
+            LOG.log(Level.FINE, "last reference to import interest is gone -> removing interest  filter: {0}", filter);
+            endpointListenerManager.reduceScope(filter);
+            List<ImportRegistration> irs = importedServices.remove(filter);
+            if (irs != null) {
+                for (ImportRegistration ir : irs) {
+                    if (ir != null) {
+                        ir.close();
                     }
                 }
-            } else {
-                // unhandled service ... do nothing
             }
         }
+    }
 
+    public void endpointAdded(EndpointDescription epd, String filter) {
+        if(filter==null){
+            LOG.severe("Endpoint is not handled because no matching filter was provided!");
+            return;
+        }
+        // Decide if it is worth it ? 
+        LOG.log(Level.FINE, "importable service added for filter {0} -> {1}", new Object[]{filter, epd});
+        addImportPossibility(epd, filter);
+        triggerImport(filter);
+    }
+
+    public void endpointRemoved(EndpointDescription epd, String filter) {
+        LOG.fine("EndpointRemoved -> " + epd);
+        removeImportPossibility(epd, filter);
+        triggerImport(filter);
     }
 
-    public void removeImportableService(String filter, EndpointDescription epd) {
+    private void addImportPossibility(EndpointDescription epd, String filter) {
+        synchronized (importPossibilities) {
+            List<EndpointDescription> ips = importPossibilities.get(filter);
+            if (ips == null) {
+                ips = new ArrayList<EndpointDescription>();
+                importPossibilities.put(filter, ips);
+            }
+
+            ips.add(epd);
+        }
+    }
 
+    private void removeImportPossibility(EndpointDescription epd, String filter) {
         synchronized (importPossibilities) {
             List<EndpointDescription> ips = importPossibilities.get(filter);
             if (ips != null) {
@@ -169,43 +170,32 @@ public class TopologyManagerImport {
                 // should not happen
             }
         }
-
-        triggerImport(filter);
-
     }
 
-    public void addImportableService(String filter, EndpointDescription epd) {
-
-        LOG.log(Level.FINE, "importable service added for filter {0} -> {1}",
-                new Object[]{filter, epd});
-
+    public void triggerImportsForRemoteServiceAdmin(RemoteServiceAdmin rsa) {
+        LOG.fine("New RSA detected trying to import services with it");
         synchronized (importPossibilities) {
-            List<EndpointDescription> ips = importPossibilities.get(filter);
-            if (ips == null) {
-                ips = new ArrayList<EndpointDescription>();
-                importPossibilities.put(filter, ips);
+            Set<Map.Entry<String, List<EndpointDescription>>> entries = importPossibilities.entrySet();
+            for (Entry<String, List<EndpointDescription>> entry : entries) {
+                triggerImport(entry.getKey());
             }
-
-            ips.add(epd);
         }
-
-        triggerImport(filter);
     }
 
     private void triggerImport(final String filter) {
-
         LOG.log(Level.FINE, "import of a service for filter {0} was queued", filter);
 
         execService.execute(new Runnable() {
             public void run() {
-                synchronized (importedServices) { // deadlock possibility ?
-                    synchronized (importPossibilities) {
-                        if (importAllAvailable) {
-                            importAllServicesStrategy(filter);
-                        } else {
-                            importSingleServiceStrategy(filter);
+                try {
+                    synchronized (importedServices) { // deadlock possibility ?
+                        synchronized (importPossibilities) {
+                            unexportNotAvailableServices(filter);
+                            importServices(filter);
                         }
                     }
+                } catch (Exception e) {
+                    LOG.log(Level.SEVERE, e.getMessage(), e);
                 }
                 // Notify EndpointListeners ? NO!
             }
@@ -213,143 +203,85 @@ public class TopologyManagerImport {
         });
 
     }
-
-    private void importAllServicesStrategy(String filter) {
-
-        List<ImportRegistration> irs = importedServices.get(filter);
-        if (irs == null) {
-            irs = new ArrayList<ImportRegistration>();
-            importedServices.put(filter, irs);
-        }
-
-        if (irs.size() > 0) { // remove old services that are not available anymore
-            List<EndpointDescription> ips = importPossibilities.get(filter);
-            Iterator<ImportRegistration> it = irs.iterator();
-            while (it.hasNext()) {
-                ImportRegistration ir = it.next();
-                EndpointDescription ep = ir.getImportReference().getImportedEndpoint();
-
-                // if service is already imported, check if endpoint is still in the list of
-                // possible imports
-                if ((ips != null && !ips.contains(ep)) || ips == null) {
-                    // unexport service
-                    ir.close();
-                    it.remove();
-                }
-
+    
+    private void unexportNotAvailableServices(String filter) {
+        List<ImportRegistration> importRegistrations = getImportedServices(filter);
+        if (importRegistrations.size() == 0) {
+            return;
+        }
+            
+        Iterator<ImportRegistration> it = importRegistrations.iterator();
+        while (it.hasNext()) {
+            ImportRegistration ir = it.next();
+            EndpointDescription ep = ir.getImportReference().getImportedEndpoint();
+            if (!isImportPossibilityAvailable(ep, filter)) {
+                // unexport service
+                ir.close();
+                it.remove();
             }
         }
+    }
+    
+    private boolean isImportPossibilityAvailable(EndpointDescription ep, String filter) {
+        List<EndpointDescription> ips = importPossibilities.get(filter);
+        return ips != null && ips.contains(ep);
+    }
 
+    /**
+     * TODO but optional: if the service is already imported and the endpoint is still
+     * in the list of possible imports check if a "better" endpoint is now in the list ?
+     * 
+     * @param filter
+     */
+    private void importServices(String filter) {        
+        List<ImportRegistration> importRegistrations = getImportedServices(filter);
         for (EndpointDescription epd : importPossibilities.get(filter)) {
-            if (!irs.contains(epd)) {
+            if (!importRegistrations.contains(epd)) {
                 // service not imported yet -> import it now
                 ImportRegistration ir = importService(epd);
                 if (ir != null) {
                     // import was successful
-                    irs.add(ir);
-                }
-            }
-        }
-
-    }
-
-    private void importSingleServiceStrategy(final String filter) {
-
-        if (importedServices.containsKey(filter) && importedServices.get(filter) != null
-            && importedServices.get(filter).size() > 0) {
-            // a service was already imported ....
-            List<ImportRegistration> irs = importedServices.get(filter);
-            List<EndpointDescription> ips = importPossibilities.get(filter);
-
-            Iterator<ImportRegistration> it = irs.iterator();
-            while (it.hasNext()) {
-                ImportRegistration ir = it.next();
-                EndpointDescription ep = ir.getImportReference().getImportedEndpoint();
-
-                // if service is already imported, check if endpoint is still in the list of
-                // possible imports
-                if ((ips != null && !ips.contains(ep)) || ips == null) {
-                    // unexport service
-                    ir.close();
-                    it.remove();
-                }
-            }
-
-            if (irs.size() == 0) {
-                // if there are other import possibilities available, try to import them...
-                if (ips != null && ips.size() > 0) {
-                    triggerImport(filter);
-                }
-            }
-
-            // TODO but optional: if the service is already imported and the endpoint is still
-            // in the list
-            // of possible imports check if a "better" endpoint is now in the list ?
-
-        } else {
-            // if the service is not yet imported, try ...
-            if (importPossibilities.get(filter).size() > 0) {
-                for (EndpointDescription ep : importPossibilities.get(filter)) {
-                    ImportRegistration ir = importService(ep);
-                    if (ir != null) {
-                        // import was successful
-                        List<ImportRegistration> irs = importedServices.get(filter);
-                        if (irs == null) {
-                            irs = new ArrayList<ImportRegistration>(1);
-                            importedServices.put(filter, irs);
-                        }
-                        irs.add(ir);
-                        break;
-                    } else {
-                        // import of endpoint failed -> try next one
+                    importRegistrations.add(ir);
+                    if (!importAllAvailable) {
+                        return;
                     }
                 }
             }
-
         }
     }
 
+    /**
+     * Tries to import the service with each rsa until one import is successful 
+     * 
+     * @param ep endpoint to import
+     * @return import registration of the first successful import
+     */
     private ImportRegistration importService(EndpointDescription ep) {
-        synchronized (remoteServiceAdminList) {
-            if(remoteServiceAdminList == null || remoteServiceAdminList.size() == 0) {
-                LOG.log(Level.WARNING,
-                        "Unable to import service ({0}): no RemoteServiceAdmin service available!",
-                        ep);
-            }
-
-            for (RemoteServiceAdmin rsa : remoteServiceAdminList) {
-                ImportRegistration ir = rsa.importService(ep);
-                if (ir != null && ir.getException() == null) {
-                    // successful
-                    LOG.fine("service impoort was successful: " + ir);
-                    return ir;
-                } else {
-                    // failed -> next RSA
-                }
+        for (RemoteServiceAdmin rsa : remoteServiceAdminTracker.getList()) {
+            ImportRegistration ir = rsa.importService(ep);
+            if (ir != null && ir.getException() == null) {
+                LOG.fine("service impoort was successful: " + ir);
+                return ir;
             }
         }
         return null;
     }
 
-    public void removeImportRegistration(ImportRegistration importRegistration) {
-        synchronized (importedServices) {
-            if (importedServices.remove(importRegistration) != null) {
-                LOG.fine("removed imported service reference: " + importRegistration);
-            }
-        }
-    }
-
-    public void triggerExportImportForRemoteServiceAdmin(RemoteServiceAdmin rsa) {
-        LOG.fine("New RSA detected trying to import services with it");
-        synchronized (importPossibilities) {
-            Set<Map.Entry<String, List<EndpointDescription>>> entries = importPossibilities.entrySet();
-            for (Entry<String, List<EndpointDescription>> entry : entries) {
-                triggerImport(entry.getKey());
-            }
+    /**
+     * Returns the list of already imported services for the given filter
+     *  
+     * @param filter
+     * @return import registrations for filter (will never return null)
+     */
+    private List<ImportRegistration> getImportedServices(String filter) {
+        List<ImportRegistration> irs = importedServices.get(filter);
+        if (irs == null) {
+            irs = new ArrayList<ImportRegistration>();
+            importedServices.put(filter, irs);
         }
+        return irs;
     }
 
-    
     /**
      * This method is called once a RemoteServiceAdminEvent for an removed import reference is received.
      * However the current implementation has no special support for multiple topology managers, therefore this method
@@ -359,4 +291,10 @@ public class TopologyManagerImport {
         //LOG.severe("NOT IMPLEMENTED !!!");
     }
 
+    public void remoteAdminEvent(RemoteServiceAdminEvent event) {
+        if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION) {
+            removeImportReference(event.getImportReference());
+        }
+    }
+
 }



Mime
View raw message