cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amic...@apache.org
Subject svn commit: r1487678 - /cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/
Date Wed, 29 May 2013 23:24:12 GMT
Author: amichai
Date: Wed May 29 23:24:12 2013
New Revision: 1487678

URL: http://svn.apache.org/r1487678
Log:
DOSGI-172 Fix various synchronization issues in discovery.zookeeper package

Modified:
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizer.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java
    cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizer.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizer.java?rev=1487678&r1=1487677&r2=1487678&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizer.java
(original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/EndpointListenerTrackerCustomizer.java
Wed May 29 23:24:12 2013
@@ -34,7 +34,7 @@ public class EndpointListenerTrackerCust
     private static final Logger LOG = LoggerFactory.getLogger(EndpointListenerTrackerCustomizer.class);
     private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(".*\\(objectClass=([^)]+)\\).*");
 
-    private InterfaceMonitorManager imManager;
+    private final InterfaceMonitorManager imManager;
 
     public EndpointListenerTrackerCustomizer(InterfaceMonitorManager imManager) {
         this.imManager = imManager;

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java?rev=1487678&r1=1487677&r2=1487678&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java
(original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitor.java
Wed May 29 23:24:12 2013
@@ -52,7 +52,7 @@ public class InterfaceMonitor implements
     private final ZooKeeper zookeeper;
     private final EndpointListener epListener;
     private final boolean recursive;
-    private boolean closed;
+    private volatile boolean closed;
 
     // This map reference changes, so don't synchronize on it
     private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>();
@@ -130,7 +130,8 @@ public class InterfaceMonitor implements
         }
     }
 
-    public void close() {
+    public synchronized void close() {
+        closed = true;
         for (EndpointDescription epd : nodes.values()) {
             epListener.endpointRemoved(epd, null);
         }
@@ -138,6 +139,9 @@ public class InterfaceMonitor implements
     }
 
     private synchronized void refreshNodes() {
+        if (closed) {
+            return;
+        }
         LOG.info("Processing change on node: {}", znode);
 
         Map<String, EndpointDescription> newNodes = new HashMap<String, EndpointDescription>();

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java?rev=1487678&r1=1487677&r2=1487678&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java
(original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/InterfaceMonitorManager.java
Wed May 29 23:24:12 2013
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.zookeeper.ZooKeeper;
 import org.osgi.framework.BundleContext;
@@ -46,13 +47,13 @@ public class InterfaceMonitorManager {
     private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class);
     
     private final ZooKeeper zooKeeper;
-    private final Map<ServiceReference, List<String> /* scopes of the epl */>
handledEndpointlisteners 
+    private final Map<ServiceReference, List<String> /* scopes of the epl */>
handledEndpointlisteners
         = new HashMap<ServiceReference, List<String>>();
     private final Map<String /* scope */, Interest> interestingScopes = new HashMap<String,
Interest>();
     private final BundleContext bctx;
 
     protected static class Interest {
-        List<ServiceReference> relatedServiceListeners = new ArrayList<ServiceReference>(1);
+        List<ServiceReference> relatedServiceListeners = new CopyOnWriteArrayList<ServiceReference>();
         InterfaceMonitor im;
     }
     
@@ -60,54 +61,50 @@ public class InterfaceMonitorManager {
         this.bctx = bctx;
         this.zooKeeper = zooKeeper;
     }
-    
-    public void addInterest(ServiceReference sref, String scope, String objClass) {
-        synchronized (interestingScopes) {
-            synchronized (handledEndpointlisteners) {
-                Interest interest = interestingScopes.get(scope);
-                if (interest == null) {
-                    interest = new Interest();
-                    interestingScopes.put(scope, interest);
-                }
-                
-                if (!interest.relatedServiceListeners.contains(sref)) {
-                    interest.relatedServiceListeners.add(sref);
-                }
 
-                if (interest.im == null) {
-                    interest.im = createInterfaceMonitor(scope, objClass, interest);
-                    interest.im.start();
-                }
+    public synchronized void addInterest(ServiceReference sref, String scope, String objClass)
{
+        Interest interest = interestingScopes.get(scope);
+        if (interest == null) {
+            interest = new Interest();
+            interestingScopes.put(scope, interest);
+        }
 
-                List<String> handledScopes = handledEndpointlisteners.get(sref);
-                if (handledScopes == null) {
-                    handledScopes = new ArrayList<String>(1);
-                    handledEndpointlisteners.put(sref, handledScopes);
-                }
+        if (!interest.relatedServiceListeners.contains(sref)) {
+            interest.relatedServiceListeners.add(sref);
+        }
 
-                if (!handledScopes.contains(scope)) {
-                    handledScopes.add(scope);
-                }
+        if (interest.im == null) {
+            interest.im = createInterfaceMonitor(scope, objClass, interest);
+            interest.im.start();
+        }
 
-            }
+        List<String> handledScopes = handledEndpointlisteners.get(sref);
+        if (handledScopes == null) {
+            handledScopes = new ArrayList<String>(1);
+            handledEndpointlisteners.put(sref, handledScopes);
+        }
+
+        if (!handledScopes.contains(scope)) {
+            handledScopes.add(scope);
         }
     }
     
     /**
      * Only for test case !
      * */
-    protected Map<String, Interest> getInterestingScopes() {
+    protected synchronized Map<String, Interest> getInterestingScopes() {
         return interestingScopes;
     }
 
     /**
      * Only for test case !
      * */
-    protected Map<ServiceReference, List<String>>  getHandledEndpointlisteners()
{
+    protected synchronized Map<ServiceReference, List<String>> getHandledEndpointlisteners()
{
         return handledEndpointlisteners;
     }
     
     private InterfaceMonitor createInterfaceMonitor(String scope, String objClass, final
Interest interest) {
+        // holding this object's lock in the callbacks can lead to a deadlock with InterfaceMonitor
         EndpointListener epListener = new EndpointListener() {
             public void endpointRemoved(EndpointDescription endpoint, String matchedFilter)
{
                 notifyListeners(endpoint, false, interest.relatedServiceListeners);
@@ -120,7 +117,7 @@ public class InterfaceMonitorManager {
         return new InterfaceMonitor(zooKeeper, objClass, epListener, scope);
     }
 
-    public void removeInterest(ServiceReference sref) {
+    public synchronized void removeInterest(ServiceReference sref) {
         List<String> handledScopes = handledEndpointlisteners.get(sref);
         if (handledScopes == null) {
             return;
@@ -178,7 +175,7 @@ public class InterfaceMonitorManager {
         }
     }
 
-    public void close() {
+    public synchronized void close() {
         for (Interest interest : interestingScopes.values()) {
             interest.im.close();
         }

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java?rev=1487678&r1=1487677&r2=1487678&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java
(original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListener.java
Wed May 29 23:24:12 2013
@@ -27,7 +27,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.cxf.dosgi.discovery.local.LocalDiscoveryUtils;
 import org.apache.zookeeper.CreateMode;
@@ -35,7 +34,6 @@ import org.apache.zookeeper.KeeperExcept
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointListener;
 import org.osgi.util.tracker.ServiceTracker;
@@ -49,42 +47,23 @@ public class PublishingEndpointListener 
     private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class);
 
     private final ZooKeeper zookeeper;
-    private final List<DiscoveryPlugin> discoveryPlugins = new CopyOnWriteArrayList<DiscoveryPlugin>();
     private final ServiceTracker discoveryPluginTracker;
     private final List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>();
     private boolean closed;
 
     public PublishingEndpointListener(ZooKeeper zooKeeper, BundleContext bctx) {
         this.zookeeper = zooKeeper;
-
-        discoveryPluginTracker = new ServiceTracker(bctx, DiscoveryPlugin.class.getName(),
null) {
-            @Override
-            public Object addingService(ServiceReference reference) {
-                Object svc = super.addingService(reference);
-                if (svc instanceof DiscoveryPlugin) {
-                    discoveryPlugins.add((DiscoveryPlugin) svc);
-                }
-                return svc;
-            }
-
-            @Override
-            public void removedService(ServiceReference reference, Object service) {
-                discoveryPlugins.remove(service);
-                super.removedService(reference, service);
-            }
-        };
+        discoveryPluginTracker = new ServiceTracker(bctx, DiscoveryPlugin.class.getName(),
null);
         discoveryPluginTracker.open();
     }
 
     public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
         LOG.info("Local EndpointDescription added: " + endpoint);
 
-        if (closed) {
-            return;
-        }
-
         synchronized (endpoints) {
-
+            if (closed) {
+                return;
+            }
             if (endpoints.contains(endpoint)) {
                 // TODO -> Should the published endpoint be updated here ?
                 return;
@@ -104,18 +83,23 @@ public class PublishingEndpointListener 
                                                                   InterruptedException, IOException
{
         Collection<String> interfaces = endpoint.getInterfaces();
         String endpointKey = getKey(endpoint.getId());
+        Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
 
-        for (String name : interfaces) {
-            Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
-            for (DiscoveryPlugin plugin : discoveryPlugins) {
-                endpointKey = plugin.process(props, endpointKey);
+        // process plugins
+        Object[] plugins = discoveryPluginTracker.getServices();
+        if (plugins != null) {
+            for (Object plugin : plugins) {
+                if (plugin instanceof DiscoveryPlugin) {
+                    endpointKey = ((DiscoveryPlugin)plugin).process(props, endpointKey);
+                }
             }
+        }
 
+        for (String name : interfaces) {
             String path = Util.getZooKeeperPath(name);
-            ensurePath(path, zookeeper);
-
             String fullPath = path + '/' + endpointKey;
             LOG.debug("Creating ZooKeeper node: {}", fullPath);
+            ensurePath(path, zookeeper);
             zookeeper.create(fullPath, getData(props), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
         }
     }
@@ -123,11 +107,10 @@ public class PublishingEndpointListener 
     public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
         LOG.info("Local EndpointDescription removed: " + endpoint);
 
-        if (closed) {
-            return;
-        }
-
         synchronized (endpoints) {
+            if (closed) {
+                return;
+            }
             if (!endpoints.contains(endpoint)) {
                 return;
             }
@@ -195,6 +178,7 @@ public class PublishingEndpointListener 
     public void close() {
         LOG.debug("closing - removing all endpoints");
         synchronized (endpoints) {
+            closed = true;
             for (EndpointDescription ed : endpoints) {
                 try {
                     removeEndpoint(ed);

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java?rev=1487678&r1=1487677&r2=1487678&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java
(original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/PublishingEndpointListenerFactory.java
Wed May 29 23:24:12 2013
@@ -41,9 +41,9 @@ public class PublishingEndpointListenerF
     public static final String DISCOVERY_ZOOKEEPER_ID = "org.apache.cxf.dosgi.discovery.zookeeper";
     private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListenerFactory.class);
 
-    private BundleContext bctx;
-    private ZooKeeper zookeeper;
-    private List<PublishingEndpointListener> listeners = new ArrayList<PublishingEndpointListener>();
+    private final BundleContext bctx;
+    private final ZooKeeper zookeeper;
+    private final List<PublishingEndpointListener> listeners = new ArrayList<PublishingEndpointListener>();
     private ServiceRegistration serviceRegistration;
 
     public PublishingEndpointListenerFactory(ZooKeeper zooKeeper, BundleContext bctx) {
@@ -82,18 +82,22 @@ public class PublishingEndpointListenerF
         if (serviceRegistration != null) {
             serviceRegistration.unregister();
         }
-        
-        for (PublishingEndpointListener epl : listeners) {
-            epl.close();
+
+        synchronized (listeners) {
+            for (PublishingEndpointListener epl : listeners) {
+                epl.close();
+            }
+            listeners.clear();
         }
-        listeners.clear();
     }
 
     /**
      * only for the test case !
      */
     protected List<PublishingEndpointListener> getListeners() {
-        return listeners;
+        synchronized (listeners) {
+            return listeners;
+        }
     }
     
 }

Modified: cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java?rev=1487678&r1=1487677&r2=1487678&view=diff
==============================================================================
--- cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java
(original)
+++ cxf/dosgi/trunk/discovery/distributed/cxf-discovery/src/main/java/org/apache/cxf/dosgi/discovery/zookeeper/ZooKeeperDiscovery.java
Wed May 29 23:24:12 2013
@@ -59,9 +59,7 @@ public class ZooKeeperDiscovery implemen
             LOG.debug("Received configuration update for Zookeeper Discovery: " + configuration);
         }
 
-        synchronized (this) {
-            stop();
-        }
+        stop();
 
         if (configuration == null) {
             return;
@@ -70,7 +68,7 @@ public class ZooKeeperDiscovery implemen
         createZooKeeper(configuration);
     }
 
-    private void start() {
+    private synchronized void start() {
         LOG.debug("starting ZookeeperDiscovery");
         endpointListenerFactory = new PublishingEndpointListenerFactory(zooKeeper, bctx);
         endpointListenerFactory.start();
@@ -101,7 +99,7 @@ public class ZooKeeperDiscovery implemen
     }
 
     @SuppressWarnings("rawtypes")
-    private void createZooKeeper(Dictionary props) {
+    private synchronized void createZooKeeper(Dictionary props) {
         String zkHost = getProp(props, "zookeeper.host", "localhost");
         String zkPort = getProp(props, "zookeeper.port", "2181");
         int zkTimeout = Integer.parseInt(getProp(props, "zookeeper.timeout", "3000"));



Mime
View raw message