activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r634411 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Date Thu, 06 Mar 2008 20:45:04 GMT
Author: chirino
Date: Thu Mar  6 12:45:02 2008
New Revision: 634411

URL: http://svn.apache.org/viewvc?rev=634411&view=rev
Log:
Fix for: https://issues.apache.org/activemq/browse/AMQ-1613
Avoid deadlocking with the thread calling oneway() when we are cleaning up the connection
due to an Inactivity error.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=634411&r1=634410&r2=634411&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Thu Mar  6 12:45:02 2008
@@ -29,6 +29,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
@@ -73,7 +74,7 @@
 
     private URI connectedTransportURI;
     private URI failedConnectTransportURI;
-    private Transport connectedTransport;
+    private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
     private final TaskRunner reconnectTask;
     private final ExecutorService executor;
     private boolean started;
@@ -115,7 +116,7 @@
             public boolean iterate() {
             	boolean result=false;
             	boolean buildBackup=true;
-            	if (connectedTransport==null && !disposed) {
+            	if (connectedTransport.get()==null && !disposed) {
             		result=doReconnect();
             		buildBackup=false;
             	}
@@ -196,6 +197,12 @@
         if (transportListener != null) {
             transportListener.transportInterupted();
         }
+        
+        Transport transport = connectedTransport.get();
+        if( transport!=null ) {
+            ServiceSupport.dispose(transport);
+        }
+        
         synchronized (reconnectMutex) {
             boolean reconnectOk = false;
             if(started) {
@@ -204,11 +211,10 @@
                 reconnectOk = true;
             }
             
-            if (connectedTransport != null) {
+            if (connectedTransport.get() != null) {
                 initialized = false;
-                ServiceSupport.dispose(connectedTransport);
                 failedConnectTransportURI=connectedTransportURI;
-                connectedTransport = null;
+                connectedTransport.set(null);
                 connectedTransportURI = null;
                 connected=false;
             }
@@ -228,8 +234,8 @@
             started = true;
             stateTracker.setMaxCacheSize(getMaxCacheSize());
             stateTracker.setTrackMessages(isTrackMessages());
-            if (connectedTransport != null) {
-                stateTracker.restore(connectedTransport);
+            if (connectedTransport.get() != null) {
+                stateTracker.restore(connectedTransport.get());
             } else {
                 reconnect();
             }
@@ -247,9 +253,8 @@
             disposed = true;
             connected = false;
 
-            if (connectedTransport != null) {
-                transportToStop = connectedTransport;
-                connectedTransport = null;
+            if (connectedTransport.get() != null) {
+                transportToStop = connectedTransport.getAndSet(null);
             }
             reconnectMutex.notifyAll();
         }
@@ -296,7 +301,7 @@
     }
 
     public Transport getConnectedTransport() {
-        return connectedTransport;
+        return connectedTransport.get();
     }
 
     public URI getConnectedTransportURI() {
@@ -373,7 +378,7 @@
 
             synchronized (reconnectMutex) {
  
-                if (isShutdownCommand(command) && connectedTransport == null) {
+                if (isShutdownCommand(command) && connectedTransport.get() == null)
{
                     if(command.isShutdownInfo()) {
                         // Skipping send of ShutdownInfo command when not connected.
                         return;
@@ -391,7 +396,7 @@
                     try {
 
                         // Wait for transport to be connected.
-                        while (connectedTransport == null && !disposed &&
connectionFailure == null) {
+                        while (connectedTransport.get() == null && !disposed &&
connectionFailure == null) {
                             LOG.trace("Waiting for transport to reconnect.");
                             try {
                                 reconnectMutex.wait(1000);
@@ -401,7 +406,7 @@
                             }
                         }
 
-                        if (connectedTransport == null) {
+                        if (connectedTransport.get() == null) {
                             // Previous loop may have exited due to use being
                             // disposed.
                             if (disposed) {
@@ -427,7 +432,7 @@
 
                         // Send the message.
                         try {
-                            connectedTransport.oneway(command);
+                            connectedTransport.get().oneway(command);
                             stateTracker.trackBack(command);
                         } catch (IOException e) {
 
@@ -559,10 +564,9 @@
         if (target.isAssignableFrom(getClass())) {
             return target.cast(this);
         }
-        synchronized (reconnectMutex) {
-            if (connectedTransport != null) {
-                return connectedTransport.narrow(target);
-            }
+        Transport transport = connectedTransport.get();
+        if ( transport != null) {
+            return transport.narrow(target);
         }
         return null;
 
@@ -594,8 +598,9 @@
     }
 
     public String getRemoteAddress() {
-        if (connectedTransport != null) {
-            return connectedTransport.getRemoteAddress();
+        Transport transport = connectedTransport.get();
+        if ( transport != null) {
+            return transport.getRemoteAddress();
         }
         return null;
     }
@@ -613,7 +618,7 @@
                 reconnectMutex.notifyAll();
             }
 
-            if (connectedTransport != null || disposed || connectionFailure != null) {
+            if (connectedTransport.get() != null || disposed || connectionFailure != null)
{
                 return false;
             } else {
                 List<URI> connectList = getConnectList();
@@ -635,7 +640,7 @@
                             reconnectDelay = initialReconnectDelay;
                             failedConnectTransportURI=null;
                             connectedTransportURI = uri;
-                            connectedTransport = t;
+                            connectedTransport.set(t);
                             reconnectMutex.notifyAll();
                             connectFailures = 0;
                             LOG.info("Successfully reconnected to backup " + uri);
@@ -646,7 +651,7 @@
                     }
                     
                     Iterator<URI> iter = connectList.iterator();
-                    while(iter.hasNext() && connectedTransport == null &&
!disposed) {
+                    while(iter.hasNext() && connectedTransport.get() == null &&
!disposed) {
                         URI uri = iter.next();
                         try {
                             LOG.debug("Attempting connect to: " + uri);
@@ -661,7 +666,7 @@
                             LOG.debug("Connection established");
                             reconnectDelay = initialReconnectDelay;
                             connectedTransportURI = uri;
-                            connectedTransport = t;
+                            connectedTransport.set(t);
                             reconnectMutex.notifyAll();
                             connectFailures = 0;
                             if (transportListener != null) {



Mime
View raw message