Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AE320200B41 for ; Wed, 22 Jun 2016 12:07:14 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id ACF96160A35; Wed, 22 Jun 2016 10:07:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AA079160A2E for ; Wed, 22 Jun 2016 12:07:13 +0200 (CEST) Received: (qmail 27725 invoked by uid 500); 22 Jun 2016 10:07:12 -0000 Mailing-List: contact commits-help@aries.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aries.apache.org Delivered-To: mailing list commits@aries.apache.org Received: (qmail 27704 invoked by uid 99); 22 Jun 2016 10:07:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jun 2016 10:07:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B5562DFEDA; Wed, 22 Jun 2016 10:07:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cschneider@apache.org To: commits@aries.apache.org Date: Wed, 22 Jun 2016 10:07:12 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] aries-rsa git commit: [ARIES-1577] Deadlock in TopologyManagerImport archived-at: Wed, 22 Jun 2016 10:07:14 -0000 Repository: aries-rsa Updated Branches: refs/heads/master 7c73a3710 -> 08463cb24 [ARIES-1577] Deadlock in TopologyManagerImport Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/7627ef98 Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/7627ef98 Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/7627ef98 Branch: refs/heads/master Commit: 7627ef9867110cd072ec0a67d1b2642b205156c4 Parents: 7c73a37 Author: Johannes Utzig Authored: Tue Jun 21 11:44:27 2016 +0200 Committer: Christian Schneider Committed: Wed Jun 22 09:28:29 2016 +0200 ---------------------------------------------------------------------- .../importer/TopologyManagerImport.java | 187 ++++++++++--------- 1 file changed, 98 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/7627ef98/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java ---------------------------------------------------------------------- diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java index e548288..3b98710 100644 --- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java @@ -19,14 +19,14 @@ package org.apache.aries.rsa.topologymanager.importer; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -125,12 +125,10 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm if (importInterestsCounter.remove(filter) == 0) { LOG.debug("last reference to import interest is gone -> removing interest filter: {}", filter); endpointListenerManager.reduceScope(filter); - synchronized (importedServices) { - List irs = importedServices.remove(filter); - if (irs != null) { - for (ImportRegistration ir : irs) { - ir.close(); - } + List irs = remove(filter, importedServices); + if (irs != null) { + for (ImportRegistration ir : irs) { + ir.close(); } } } @@ -153,39 +151,24 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm } private void addImportPossibility(EndpointDescription endpoint, String filter) { - synchronized (importPossibilities) { - List endpoints = importPossibilities.get(filter); - if (endpoints == null) { - endpoints = new ArrayList(); - importPossibilities.put(filter, endpoints); - } - // prevent adding the same endpoint multiple times, which can happen sometimes, - // and which causes imports to remain available even when services are actually down - if (!endpoints.contains(endpoint)) { - endpoints.add(endpoint); - } - } + put(filter, importPossibilities, endpoint); } private void removeImportPossibility(EndpointDescription endpoint, String filter) { - synchronized (importPossibilities) { - List endpoints = importPossibilities.get(filter); - if (endpoints != null) { - endpoints.remove(endpoint); - if (endpoints.isEmpty()) { - importPossibilities.remove(filter); - } - } + List endpoints = get(filter, importPossibilities); + remove(filter, importPossibilities, endpoint); + if (endpoints.isEmpty()) { + remove(filter,importPossibilities,null); } } public void add(RemoteServiceAdmin rsa) { rsaSet.add(rsa); - synchronized (importPossibilities) { - for (String filter : importPossibilities.keySet()) { - triggerImport(filter); - } + + for (String filter : keySet(importPossibilities)) { + triggerImport(filter); } + } public void remove(RemoteServiceAdmin rsa) { @@ -210,56 +193,34 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm } private void unexportNotAvailableServices(String filter) { - synchronized (importedServices) { - List importRegistrations = importedServices.get(filter); - if (importRegistrations != null) { - // iterate over a copy - for (ImportRegistration ir : new ArrayList(importRegistrations)) { - EndpointDescription endpoint = ir.getImportReference().getImportedEndpoint(); - if (!isImportPossibilityAvailable(endpoint, filter)) { - removeImport(ir, null); // also unexports the service - } - } + List importRegistrations = get(filter, importedServices); + for (ImportRegistration ir : importRegistrations) { + EndpointDescription endpoint = ir.getImportReference().getImportedEndpoint(); + if (!isImportPossibilityAvailable(endpoint, filter)) { + removeImport(ir, null); // also unexports the service } } } private boolean isImportPossibilityAvailable(EndpointDescription endpoint, String filter) { - synchronized (importPossibilities) { - List endpoints = importPossibilities.get(filter); - return endpoints != null && endpoints.contains(endpoint); - } - } + List endpoints = get(filter, importPossibilities); + return endpoints != null && endpoints.contains(endpoint); - // return a copy to prevent sync issues - private List getImportPossibilitiesCopy(String filter) { - synchronized (importPossibilities) { - List possibilities = importPossibilities.get(filter); - return possibilities == null - ? Collections.emptyList() - : new ArrayList(possibilities); - } } private void importServices(String filter) { - synchronized (importedServices) { - List importRegistrations = importedServices.get(filter); - for (EndpointDescription endpoint : getImportPossibilitiesCopy(filter)) { - // TODO but optional: if the service is already imported and the endpoint is still - // in the list of possible imports check if a "better" endpoint is now in the list - if (!alreadyImported(endpoint, importRegistrations)) { - // service not imported yet -> import it now - ImportRegistration ir = importService(endpoint); - if (ir != null) { - // import was successful - if (importRegistrations == null) { - importRegistrations = new ArrayList(); - importedServices.put(filter, importRegistrations); - } - importRegistrations.add(ir); - if (!importAllAvailable) { - return; - } + List importRegistrations = get(filter, importedServices); + for (EndpointDescription endpoint : get(filter, importPossibilities)) { + // TODO but optional: if the service is already imported and the endpoint is still + // in the list of possible imports check if a "better" endpoint is now in the list + if (!alreadyImported(endpoint, importRegistrations)) { + // service not imported yet -> import it now + ImportRegistration ir = importService(endpoint); + if (ir != null) { + // import was successful + put(filter, importedServices, ir); + if (!importAllAvailable) { + return; } } } @@ -315,25 +276,19 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm // and receiving a RemoteServiceAdminEvent for its unregistration, which results // in a ConcurrentModificationException. We avoid this by closing the registrations // only after data structure manipulation is done, and being re-entrant. - synchronized (importedServices) { - List removed = new ArrayList(); - for (Iterator> it1 = importedServices.values().iterator(); it1.hasNext();) { - Collection irs = it1.next(); - for (Iterator it2 = irs.iterator(); it2.hasNext();) { - ImportRegistration ir = it2.next(); - if (ir.equals(reg) || ir.getImportReference().equals(ref)) { - removed.add(ir); - it2.remove(); - } + List removed = new ArrayList(); + Set>> entries = entrySet(importedServices); + for (Entry> entry : entries) { + for (ImportRegistration ir : entry.getValue()) { + if (ir.equals(reg) || ir.getImportReference().equals(ref)) { + removed.add(ir); + remove(entry.getKey(), importedServices, ir); } - if (irs.isEmpty()) { - it1.remove(); - } - } - for (ImportRegistration ir : removed) { - ir.close(); } } + for (ImportRegistration ir : removed) { + ir.close(); + } } public void remoteAdminEvent(RemoteServiceAdminEvent event) { @@ -342,4 +297,58 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm } } + private void put(String key, Map> map, T value) { + synchronized (map) { + List list = map.get(key); + if(list == null) { + list = new CopyOnWriteArrayList(); + map.put(key, list); + } + //make sure there is no duplicates + if(!list.contains(value)) { + list.add(value); + } + } + } + + private List get(String key, Map> map) { + synchronized (map) { + List list = map.get(key); + if(list == null) + return Collections.emptyList(); + return list; + } + } + + private List remove(String key, Map> map) { + synchronized (map) { + return map.remove(key); + } + } + + private void remove(String key, Map> map, T value) { + synchronized (map) { + List list = map.get(key); + if (list != null) { + list.remove(value); + if(list.isEmpty()) { + map.remove(key); + } + } + } + } + + private Set>> entrySet(Map> map) { + synchronized (map) { + Set>> entries = map.entrySet(); + return new HashSet>>(entries); + } + } + + private Set keySet(Map> map) { + synchronized (map) { + Set keySet = map.keySet(); + return new HashSet(keySet); + } + } }