Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 61595 invoked from network); 14 Nov 2006 12:44:49 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 14 Nov 2006 12:44:49 -0000 Received: (qmail 25432 invoked by uid 500); 14 Nov 2006 12:44:59 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 25413 invoked by uid 500); 14 Nov 2006 12:44:59 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 25404 invoked by uid 99); 14 Nov 2006 12:44:59 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Nov 2006 04:44:59 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Nov 2006 04:44:47 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id A69E61A984F; Tue, 14 Nov 2006 04:44:17 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r474769 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Date: Tue, 14 Nov 2006 12:44:17 -0000 To: activemq-commits@geronimo.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061114124417.A69E61A984F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Tue Nov 14 04:44:16 2006 New Revision: 474769 URL: http://svn.apache.org/viewvc?view=rev&rev=474769 Log: Added synconization to the methods that setup connection state so that when an async error is detected, it properly does a full cleanup. Previously subscriptions were not properly cleaned up since they were being setup at the same time as they were being cleaned up. Also start 1 async exception thread ever since an async exception leads to connection tear down. Subsequent failures do not need additional async threads started. Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=474769&r1=474768&r2=474769 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Tue Nov 14 04:44:16 2006 @@ -123,6 +123,7 @@ private AtomicBoolean stopped = new AtomicBoolean(false); protected final AtomicBoolean disposed=new AtomicBoolean(false); private CountDownLatch stopLatch = new CountDownLatch(1); + protected final AtomicBoolean asyncException = new AtomicBoolean(false); static class ConnectionState extends org.apache.activemq.state.ConnectionState { private final ConnectionContext context; @@ -217,11 +218,13 @@ * @param e */ public void serviceExceptionAsync(final IOException e) { - new Thread("Async Exception Handler") { - public void run() { - serviceException(e); - } - }.start(); + if( asyncException.compareAndSet(false, true) ) { + new Thread("Async Exception Handler") { + public void run() { + serviceException(e); + } + }.start(); + } } /** @@ -349,7 +352,7 @@ return null; } - public Response processBeginTransaction(TransactionInfo info) throws Exception { + synchronized public Response processBeginTransaction(TransactionInfo info) throws Exception { ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionContext context=null; if( cs!=null ) { @@ -364,14 +367,14 @@ return null; } - public Response processEndTransaction(TransactionInfo info) throws Exception { + synchronized public Response processEndTransaction(TransactionInfo info) throws Exception { // No need to do anything. This packet is just sent by the client // make sure he is synced with the server as commit command could // come from a different connection. return null; } - public Response processPrepareTransaction(TransactionInfo info) throws Exception { + synchronized public Response processPrepareTransaction(TransactionInfo info) throws Exception { ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionContext context=null; if( cs!=null ) { @@ -395,7 +398,7 @@ } } - public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { + synchronized public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionContext context=null; if( cs!=null ) { @@ -409,7 +412,7 @@ } - public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { + synchronized public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionContext context=null; if( cs!=null ) { @@ -421,7 +424,7 @@ return null; } - public Response processRollbackTransaction(TransactionInfo info) throws Exception { + synchronized public Response processRollbackTransaction(TransactionInfo info) throws Exception { ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionContext context=null; if( cs!=null ) { @@ -433,7 +436,7 @@ return null; } - public Response processForgetTransaction(TransactionInfo info) throws Exception { + synchronized public Response processForgetTransaction(TransactionInfo info) throws Exception { ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionContext context=null; if( cs!=null ) { @@ -443,7 +446,7 @@ return null; } - public Response processRecoverTransactions(TransactionInfo info) throws Exception { + synchronized public Response processRecoverTransactions(TransactionInfo info) throws Exception { ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionContext context=null; if( cs!=null ) { @@ -498,7 +501,7 @@ return null; } - public Response processAddDestination(DestinationInfo info) throws Exception { + synchronized public Response processAddDestination(DestinationInfo info) throws Exception { ConnectionState cs = lookupConnectionState(info.getConnectionId()); broker.addDestinationInfo(cs.getContext(), info); if( info.getDestination().isTemporary() ) { @@ -507,7 +510,7 @@ return null; } - public Response processRemoveDestination(DestinationInfo info) throws Exception { + synchronized public Response processRemoveDestination(DestinationInfo info) throws Exception { ConnectionState cs = lookupConnectionState(info.getConnectionId()); broker.removeDestinationInfo(cs.getContext(), info); if( info.getDestination().isTemporary() ) { @@ -517,7 +520,7 @@ } - public Response processAddProducer(ProducerInfo info) throws Exception { + synchronized public Response processAddProducer(ProducerInfo info) throws Exception { SessionId sessionId = info.getProducerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); @@ -538,7 +541,7 @@ return null; } - public Response processRemoveProducer(ProducerId id) throws Exception { + synchronized public Response processRemoveProducer(ProducerId id) throws Exception { SessionId sessionId = id.getParentId(); ConnectionId connectionId = sessionId.getParentId(); @@ -554,7 +557,7 @@ return null; } - public Response processAddConsumer(ConsumerInfo info) throws Exception { + synchronized public Response processAddConsumer(ConsumerInfo info) throws Exception { SessionId sessionId = info.getConsumerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); @@ -576,7 +579,7 @@ return null; } - public Response processRemoveConsumer(ConsumerId id) throws Exception { + synchronized public Response processRemoveConsumer(ConsumerId id) throws Exception { SessionId sessionId = id.getParentId(); ConnectionId connectionId = sessionId.getParentId(); @@ -593,7 +596,7 @@ return null; } - public Response processAddSession(SessionInfo info) throws Exception { + synchronized public Response processAddSession(SessionInfo info) throws Exception { ConnectionId connectionId = info.getSessionId().getParentId(); ConnectionState cs = lookupConnectionState(connectionId); @@ -609,7 +612,7 @@ return null; } - public Response processRemoveSession(SessionId id) throws Exception { + synchronized public Response processRemoveSession(SessionId id) throws Exception { ConnectionId connectionId = id.getParentId(); @@ -646,7 +649,7 @@ return null; } - public Response processAddConnection(ConnectionInfo info) throws Exception { + synchronized public Response processAddConnection(ConnectionInfo info) throws Exception { ConnectionState state = (ConnectionState) brokerConnectionStates.get(info.getConnectionId()); @@ -695,7 +698,7 @@ return null; } - public Response processRemoveConnection(ConnectionId id) { + synchronized public Response processRemoveConnection(ConnectionId id) { ConnectionState cs = lookupConnectionState(id);