Return-Path: X-Original-To: apmail-cxf-commits-archive@www.apache.org Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E04E510B07 for ; Thu, 17 Oct 2013 00:30:09 +0000 (UTC) Received: (qmail 30449 invoked by uid 500); 17 Oct 2013 00:30:09 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 30406 invoked by uid 500); 17 Oct 2013 00:30:09 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 30399 invoked by uid 99); 17 Oct 2013 00:30:09 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Oct 2013 00:30:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Oct 2013 00:30:07 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9A4A023888A6; Thu, 17 Oct 2013 00:29:47 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1532945 - in /cxf/trunk: rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Date: Thu, 17 Oct 2013 00:29:47 -0000 To: commits@cxf.apache.org From: dkulp@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131017002947.9A4A023888A6@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dkulp Date: Thu Oct 17 00:29:46 2013 New Revision: 1532945 URL: http://svn.apache.org/r1532945 Log: Fix some of the locking in RMManager to prevent some deadlocks on close where a shutdown would ask to terminate the sequence, but we could not process the sequence ack on the decoupled channel. Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1532945&r1=1532944&r2=1532945&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Thu Oct 17 00:29:46 2013 @@ -20,9 +20,9 @@ package org.apache.cxf.ws.rm; import java.util.Collection; -import java.util.HashMap; import java.util.Map; import java.util.Timer; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -109,7 +109,7 @@ public class RMManager { private RMStore store; private SequenceIdentifierGenerator idGenerator; private RetransmissionQueue retransmissionQueue; - private Map reliableEndpoints = new HashMap(); + private Map reliableEndpoints = new ConcurrentHashMap(); private AtomicReference timer = new AtomicReference(); private RMConfiguration configuration; private SourcePolicyType sourcePolicy; @@ -308,7 +308,7 @@ public class RMManager { // The real stuff ... - public synchronized RMEndpoint getReliableEndpoint(Message message) throws RMException { + public RMEndpoint getReliableEndpoint(Message message) throws RMException { Endpoint endpoint = message.getExchange().get(Endpoint.class); QName name = endpoint.getEndpointInfo().getName(); if (LOG.isLoggable(Level.FINE)) { @@ -332,6 +332,7 @@ public class RMManager { addrUri = maps.getNamespaceURI(); } } + RMConfiguration config = getConfiguration(); if (rmUri != null) { config.setRMNamespace(rmUri); @@ -364,22 +365,28 @@ public class RMManager { } RMEndpoint rme = reliableEndpoints.get(endpoint); if (null == rme) { - rme = createReliableEndpoint(endpoint); - org.apache.cxf.transport.Destination destination = message.getExchange().getDestination(); - EndpointReferenceType replyTo = null; - if (null != destination) { - AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false); - replyTo = maps.getReplyTo(); - } - Endpoint ei = message.getExchange().get(Endpoint.class); - org.apache.cxf.transport.Destination dest - = ei == null ? null : ei.getEndpointInfo() - .getProperty(MAPAggregator.DECOUPLED_DESTINATION, - org.apache.cxf.transport.Destination.class); - config = RMPolicyUtilities.getRMConfiguration(config, message); - rme.initialise(config, message.getExchange().getConduit(message), replyTo, dest, message); - reliableEndpoints.put(endpoint, rme); - LOG.fine("Created new RMEndpoint."); + synchronized (endpoint) { + rme = reliableEndpoints.get(endpoint); + if (rme != null) { + return rme; + } + rme = createReliableEndpoint(endpoint); + org.apache.cxf.transport.Destination destination = message.getExchange().getDestination(); + EndpointReferenceType replyTo = null; + if (null != destination) { + AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false); + replyTo = maps.getReplyTo(); + } + Endpoint ei = message.getExchange().get(Endpoint.class); + org.apache.cxf.transport.Destination dest + = ei == null ? null : ei.getEndpointInfo() + .getProperty(MAPAggregator.DECOUPLED_DESTINATION, + org.apache.cxf.transport.Destination.class); + config = RMPolicyUtilities.getRMConfiguration(config, message); + rme.initialise(config, message.getExchange().getConduit(message), replyTo, dest, message); + reliableEndpoints.put(endpoint, rme); + LOG.fine("Created new RMEndpoint."); + } } return rme; } @@ -495,10 +502,8 @@ public class RMManager { // unregistring of this managed bean from the server is done by the bus itself } - synchronized void shutdownReliableEndpoint(Endpoint e) { - RMEndpoint rme = null; - - rme = reliableEndpoints.get(e); + void shutdownReliableEndpoint(Endpoint e) { + RMEndpoint rme = reliableEndpoints.get(e); if (rme == null) { // not found return; @@ -534,7 +539,9 @@ public class RMManager { new Object[] {null == conduit ? "client" : "server", id}); RMEndpoint rme = createReliableEndpoint(endpoint); rme.initialise(getConfiguration(), conduit, null, null, null); - reliableEndpoints.put(endpoint, rme); + synchronized (reliableEndpoints) { + reliableEndpoints.put(endpoint, rme); + } for (SourceSequence ss : sss) { recoverSourceSequence(endpoint, conduit, rme.getSource(), ss); } Modified: cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?rev=1532945&r1=1532944&r2=1532945&view=diff ============================================================================== --- cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original) +++ cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Thu Oct 17 00:29:46 2013 @@ -1576,13 +1576,13 @@ public class SequenceTest extends Abstra //ensure we close the decoupled destination of the conduit, //so that release the port if the destination reference count hit zero if (greeter != null) { - ClientProxy.getClient(greeter).getConduit().close(); + //ClientProxy.getClient(greeter).getConduit().close(); } if (greeter instanceof Closeable) { ((Closeable)greeter).close(); } if (dispatch != null) { - ((DispatchImpl)dispatch).getClient().getConduit().close(); + //((DispatchImpl)dispatch).getClient().getConduit().close(); } if (dispatch instanceof Closeable) { ((Closeable)dispatch).close();