Author: chirino Date: Tue Nov 14 04:44:16 2006 New Revision: 474769 URL: http://svn.apache.org/viewvc?view=rev&rev=474769 Log: Added synconization to the methods that setup connection state so that when an async error is detected, it properly does a full cleanup. Previously subscriptions were not properly cleaned up since they were being setup at the same time as they were being cleaned up. Also start 1 async exception thread ever since an async exception leads to connection tear down. Subsequent failures do not need additional async threads started. Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=474769&r1=474768&r2=474769 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Tue Nov 14 04:44:16 2006 @@ -123,6 +123,7 @@ private AtomicBoolean stopped = new AtomicBoolean(false); protected final AtomicBoolean disposed=new AtomicBoolean(false); private CountDownLatch stopLatch = new CountDownLatch(1); + protected final AtomicBoolean asyncException = new AtomicBoolean(false); static class ConnectionState extends org.apache.activemq.state.ConnectionState { private final ConnectionContext context; @@ -217,11 +218,13 @@ * @param e */ public void serviceExceptionAsync(final IOException e) { - new Thread("Async Exception Handler") { - public void run() { - serviceException(e); - } - }.start(); + if( asyncException.compareAndSet(false, true) ) { + new Thread("Async Exception Handler") { + public void run() { + serviceException(e); + } + }.start(); + } } /** @@ -349,7 +352,7 @@ return null; } - public Response processBeginTransaction(TransactionInfo info) throws Exception { + synchronized public Response processBeginTransaction(TransactionInfo info) throws Exception { ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionContext context=null; if( cs!=null ) { @@ -364,14 +367,14 @@ return null; } - public Response processEndTransaction(TransactionInfo info) throws Exception { + synchronized public Response processEndTransaction(TransactionInfo info) throws Exception { // No need to do anything. This packet is just sent by the client // make sure he is synced with the server as commit command could // come from a different connection. return null; } - public Response processPrepareTransaction(TransactionInfo info) throws Exception { + synchronized public Response processPrepareTransaction(TransactionInfo info) throws Exception { ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionContext context=null; if( cs!=null ) { @@ -395,7 +398,7 @@ } } - public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { + synchronized public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionContext context=null; if( cs!=null ) { @@ -409,7 +412,7 @@ } - public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { + synchronized public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionContext context=null; if( cs!=null ) { @@ -421,7 +424,7 @@ return null; } - public Response processRollbackTransaction(TransactionInfo info) throws Exception { + synchronized public Response processRollbackTransaction(TransactionInfo info) throws Exception { ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionContext context=null; if( cs!=null ) { @@ -433,7 +436,7 @@ return null; } - public Response processForgetTransaction(TransactionInfo info) throws Exception { + synchronized public Response processForgetTransaction(TransactionInfo info) throws Exception { ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionContext context=null; if( cs!=null ) { @@ -443,7 +446,7 @@ return null; } - public Response processRecoverTransactions(TransactionInfo info) throws Exception { + synchronized public Response processRecoverTransactions(TransactionInfo info) throws Exception { ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionContext context=null; if( cs!=null ) { @@ -498,7 +501,7 @@ return null; } - public Response processAddDestination(DestinationInfo info) throws Exception { + synchronized public Response processAddDestination(DestinationInfo info) throws Exception { ConnectionState cs = lookupConnectionState(info.getConnectionId()); broker.addDestinationInfo(cs.getContext(), info); if( info.getDestination().isTemporary() ) { @@ -507,7 +510,7 @@ return null; } - public Response processRemoveDestination(DestinationInfo info) throws Exception { + synchronized public Response processRemoveDestination(DestinationInfo info) throws Exception { ConnectionState cs = lookupConnectionState(info.getConnectionId()); broker.removeDestinationInfo(cs.getContext(), info); if( info.getDestination().isTemporary() ) { @@ -517,7 +520,7 @@ } - public Response processAddProducer(ProducerInfo info) throws Exception { + synchronized public Response processAddProducer(ProducerInfo info) throws Exception { SessionId sessionId = info.getProducerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); @@ -538,7 +541,7 @@ return null; } - public Response processRemoveProducer(ProducerId id) throws Exception { + synchronized public Response processRemoveProducer(ProducerId id) throws Exception { SessionId sessionId = id.getParentId(); ConnectionId connectionId = sessionId.getParentId(); @@ -554,7 +557,7 @@ return null; } - public Response processAddConsumer(ConsumerInfo info) throws Exception { + synchronized public Response processAddConsumer(ConsumerInfo info) throws Exception { SessionId sessionId = info.getConsumerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); @@ -576,7 +579,7 @@ return null; } - public Response processRemoveConsumer(ConsumerId id) throws Exception { + synchronized public Response processRemoveConsumer(ConsumerId id) throws Exception { SessionId sessionId = id.getParentId(); ConnectionId connectionId = sessionId.getParentId(); @@ -593,7 +596,7 @@ return null; } - public Response processAddSession(SessionInfo info) throws Exception { + synchronized public Response processAddSession(SessionInfo info) throws Exception { ConnectionId connectionId = info.getSessionId().getParentId(); ConnectionState cs = lookupConnectionState(connectionId); @@ -609,7 +612,7 @@ return null; } - public Response processRemoveSession(SessionId id) throws Exception { + synchronized public Response processRemoveSession(SessionId id) throws Exception { ConnectionId connectionId = id.getParentId(); @@ -646,7 +649,7 @@ return null; } - public Response processAddConnection(ConnectionInfo info) throws Exception { + synchronized public Response processAddConnection(ConnectionInfo info) throws Exception { ConnectionState state = (ConnectionState) brokerConnectionStates.get(info.getConnectionId()); @@ -695,7 +698,7 @@ return null; } - public Response processRemoveConnection(ConnectionId id) { + synchronized public Response processRemoveConnection(ConnectionId id) { ConnectionState cs = lookupConnectionState(id);