aries-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject [1/2] aries-rsa git commit: [ARIES-1577] Deadlock in TopologyManagerImport
Date Wed, 22 Jun 2016 10:07:12 GMT
Repository: aries-rsa
Updated Branches:
  refs/heads/master 7c73a3710 -> 08463cb24


[ARIES-1577] Deadlock in TopologyManagerImport

Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/7627ef98
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/7627ef98
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/7627ef98

Branch: refs/heads/master
Commit: 7627ef9867110cd072ec0a67d1b2642b205156c4
Parents: 7c73a37
Author: Johannes Utzig <j.utzig@seeburger.de>
Authored: Tue Jun 21 11:44:27 2016 +0200
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Wed Jun 22 09:28:29 2016 +0200

----------------------------------------------------------------------
 .../importer/TopologyManagerImport.java         | 187 ++++++++++---------
 1 file changed, 98 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/7627ef98/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
index e548288..3b98710 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
@@ -19,14 +19,14 @@
 package org.apache.aries.rsa.topologymanager.importer;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -125,12 +125,10 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
         if (importInterestsCounter.remove(filter) == 0) {
             LOG.debug("last reference to import interest is gone -> removing interest
filter: {}", filter);
             endpointListenerManager.reduceScope(filter);
-            synchronized (importedServices) {
-                List<ImportRegistration> irs = importedServices.remove(filter);
-                if (irs != null) {
-                    for (ImportRegistration ir : irs) {
-                        ir.close();
-                    }
+            List<ImportRegistration> irs = remove(filter, importedServices);
+            if (irs != null) {
+                for (ImportRegistration ir : irs) {
+                    ir.close();
                 }
             }
         }
@@ -153,39 +151,24 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
     }
 
     private void addImportPossibility(EndpointDescription endpoint, String filter) {
-        synchronized (importPossibilities) {
-            List<EndpointDescription> endpoints = importPossibilities.get(filter);
-            if (endpoints == null) {
-                endpoints = new ArrayList<EndpointDescription>();
-                importPossibilities.put(filter, endpoints);
-            }
-            // prevent adding the same endpoint multiple times, which can happen sometimes,
-            // and which causes imports to remain available even when services are actually
down
-            if (!endpoints.contains(endpoint)) {
-                endpoints.add(endpoint);
-            }
-        }
+        put(filter, importPossibilities, endpoint);
     }
 
     private void removeImportPossibility(EndpointDescription endpoint, String filter) {
-        synchronized (importPossibilities) {
-            List<EndpointDescription> endpoints = importPossibilities.get(filter);
-            if (endpoints != null) {
-                endpoints.remove(endpoint);
-                if (endpoints.isEmpty()) {
-                    importPossibilities.remove(filter);
-                }
-            }
+        List<EndpointDescription> endpoints = get(filter, importPossibilities);
+        remove(filter, importPossibilities, endpoint);
+        if (endpoints.isEmpty()) {
+            remove(filter,importPossibilities,null);
         }
     }
 
     public void add(RemoteServiceAdmin rsa) {
         rsaSet.add(rsa);
-        synchronized (importPossibilities) {
-            for (String filter : importPossibilities.keySet()) {
-                triggerImport(filter);
-            }
+
+        for (String filter : keySet(importPossibilities)) {
+            triggerImport(filter);
         }
+
     }
     
     public void remove(RemoteServiceAdmin rsa) {
@@ -210,56 +193,34 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
     }
 
     private void unexportNotAvailableServices(String filter) {
-        synchronized (importedServices) {
-            List<ImportRegistration> importRegistrations = importedServices.get(filter);
-            if (importRegistrations != null) {
-                // iterate over a copy
-                for (ImportRegistration ir : new ArrayList<ImportRegistration>(importRegistrations))
{
-                    EndpointDescription endpoint = ir.getImportReference().getImportedEndpoint();
-                    if (!isImportPossibilityAvailable(endpoint, filter)) {
-                        removeImport(ir, null); // also unexports the service
-                    }
-                }
+        List<ImportRegistration> importRegistrations = get(filter, importedServices);
+        for (ImportRegistration ir : importRegistrations) {
+            EndpointDescription endpoint = ir.getImportReference().getImportedEndpoint();
+            if (!isImportPossibilityAvailable(endpoint, filter)) {
+                removeImport(ir, null); // also unexports the service
             }
         }
     }
 
     private boolean isImportPossibilityAvailable(EndpointDescription endpoint, String filter)
{
-        synchronized (importPossibilities) {
-            List<EndpointDescription> endpoints = importPossibilities.get(filter);
-            return endpoints != null && endpoints.contains(endpoint);
-        }
-    }
+        List<EndpointDescription> endpoints = get(filter, importPossibilities);
+        return endpoints != null && endpoints.contains(endpoint);
 
-    // return a copy to prevent sync issues
-    private List<EndpointDescription> getImportPossibilitiesCopy(String filter) {
-        synchronized (importPossibilities) {
-            List<EndpointDescription> possibilities = importPossibilities.get(filter);
-            return possibilities == null
-                ? Collections.<EndpointDescription>emptyList()
-                : new ArrayList<EndpointDescription>(possibilities);
-        }
     }
 
     private void importServices(String filter) {
-        synchronized (importedServices) {
-            List<ImportRegistration> importRegistrations = importedServices.get(filter);
-            for (EndpointDescription endpoint : getImportPossibilitiesCopy(filter)) {
-                // TODO but optional: if the service is already imported and the endpoint
is still
-                // in the list of possible imports check if a "better" endpoint is now in
the list
-                if (!alreadyImported(endpoint, importRegistrations)) {
-                    // service not imported yet -> import it now
-                    ImportRegistration ir = importService(endpoint);
-                    if (ir != null) {
-                        // import was successful
-                        if (importRegistrations == null) {
-                            importRegistrations = new ArrayList<ImportRegistration>();
-                            importedServices.put(filter, importRegistrations);
-                        }
-                        importRegistrations.add(ir);
-                        if (!importAllAvailable) {
-                            return;
-                        }
+        List<ImportRegistration> importRegistrations = get(filter, importedServices);
+        for (EndpointDescription endpoint : get(filter, importPossibilities)) {
+            // TODO but optional: if the service is already imported and the endpoint is
still
+            // in the list of possible imports check if a "better" endpoint is now in the
list
+            if (!alreadyImported(endpoint, importRegistrations)) {
+                // service not imported yet -> import it now
+                ImportRegistration ir = importService(endpoint);
+                if (ir != null) {
+                    // import was successful
+                    put(filter, importedServices, ir);
+                    if (!importAllAvailable) {
+                        return;
                     }
                 }
             }
@@ -315,25 +276,19 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
         // and receiving a RemoteServiceAdminEvent for its unregistration, which results
         // in a ConcurrentModificationException. We avoid this by closing the registrations
         // only after data structure manipulation is done, and being re-entrant.
-        synchronized (importedServices) {
-            List<ImportRegistration> removed = new ArrayList<ImportRegistration>();
-            for (Iterator<List<ImportRegistration>> it1 = importedServices.values().iterator();
it1.hasNext();) {
-                Collection<ImportRegistration> irs = it1.next();
-                for (Iterator<ImportRegistration> it2 = irs.iterator(); it2.hasNext();)
{
-                    ImportRegistration ir = it2.next();
-                    if (ir.equals(reg) || ir.getImportReference().equals(ref)) {
-                        removed.add(ir);
-                        it2.remove();
-                    }
+        List<ImportRegistration> removed = new ArrayList<ImportRegistration>();
+        Set<Entry<String, List<ImportRegistration>>> entries = entrySet(importedServices);
+        for (Entry<String, List<ImportRegistration>> entry : entries) {
+            for (ImportRegistration ir : entry.getValue()) {
+                if (ir.equals(reg) || ir.getImportReference().equals(ref)) {
+                    removed.add(ir);
+                    remove(entry.getKey(), importedServices, ir);
                 }
-                if (irs.isEmpty()) {
-                    it1.remove();
-                }
-            }
-            for (ImportRegistration ir : removed) {
-                ir.close();
             }
         }
+        for (ImportRegistration ir : removed) {
+            ir.close();
+        }
     }
 
     public void remoteAdminEvent(RemoteServiceAdminEvent event) {
@@ -342,4 +297,58 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
         }
     }
 
+    private <T> void put(String key, Map<String, List<T>> map, T value)
{
+        synchronized (map) {
+            List<T> list = map.get(key);
+            if(list == null) {
+                list = new CopyOnWriteArrayList<T>();
+                map.put(key, list);
+            }
+            //make sure there is no duplicates
+            if(!list.contains(value)) {
+                list.add(value);
+            }
+        }
+    }
+
+    private <T> List<T> get(String key, Map<String, List<T>> map)
{
+        synchronized (map) {
+            List<T> list = map.get(key);
+            if(list == null)
+                return Collections.emptyList();
+            return list;
+        }
+    }
+
+    private <T> List<T> remove(String key, Map<String, List<T>> map)
{
+        synchronized (map) {
+            return map.remove(key);
+        }
+    }
+
+    private <T> void remove(String key, Map<String, List<T>> map, T value)
{
+        synchronized (map) {
+            List<T> list = map.get(key);
+            if (list != null) {
+                list.remove(value);
+                if(list.isEmpty()) {
+                    map.remove(key);
+                }
+            }
+        }
+    }
+
+    private <T> Set<Entry<String, List<T>>> entrySet(Map<String,
List<T>> map) {
+        synchronized (map) {
+            Set<Entry<String, List<T>>> entries = map.entrySet();
+            return new HashSet<Entry<String, List<T>>>(entries);
+        }
+    }
+
+    private <T> Set<String> keySet(Map<String, List<T>> map) {
+        synchronized (map) {
+            Set<String> keySet = map.keySet();
+            return new HashSet<String>(keySet);
+        }
+    }
 }


Mime
View raw message