activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r634797 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker: SingleTransportConnectionStateRegister.java TransportConnection.java cluster/ConnectionSplitBroker.java
Date Fri, 07 Mar 2008 20:09:08 GMT
Author: rajdavies
Date: Fri Mar  7 12:09:07 2008
New Revision: 634797

URL: http://svn.apache.org/viewvc?rev=634797&view=rev
Log:
Improvements to removing stale connections

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java?rev=634797&r1=634796&r2=634797&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java
Fri Mar  7 12:09:07 2008
@@ -109,10 +109,6 @@
 
     public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId)
{
         TransportConnectionState cs = connectionState;
-        if (cs == null) {
-            throw new IllegalStateException("Cannot lookup a connection that had not been
registered: "
-                                            + connectionId);
-        }
         return cs;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=634797&r1=634796&r2=634797&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Fri Mar  7 12:09:07 2008
@@ -673,44 +673,46 @@
         return null;
     }
 
-    public Response processRemoveConnection(ConnectionId id) throws InterruptedException
{
+    public synchronized Response processRemoveConnection(ConnectionId id) throws InterruptedException
{
         TransportConnectionState cs = lookupConnectionState(id);
-        // Don't allow things to be added to the connection state while we are
-        // shutting down.
-        cs.shutdown();
-        
-        // Cascade the connection stop to the sessions.
-        for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
-            SessionId sessionId = (SessionId)iter.next();
-            try {
-                processRemoveSession(sessionId);
-            } catch (Throwable e) {
-                SERVICELOG.warn("Failed to remove session " + sessionId, e);
+        if (cs != null) {
+            // Don't allow things to be added to the connection state while we are
+            // shutting down.
+            cs.shutdown();
+            
+            // Cascade the connection stop to the sessions.
+            for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
+                SessionId sessionId = (SessionId)iter.next();
+                try {
+                    processRemoveSession(sessionId);
+                } catch (Throwable e) {
+                    SERVICELOG.warn("Failed to remove session " + sessionId, e);
+                }
+            }
+            // Cascade the connection stop to temp destinations.
+            for (Iterator iter = cs.getTempDesinations().iterator(); iter.hasNext();) {
+                DestinationInfo di = (DestinationInfo)iter.next();
+                try {
+                    broker.removeDestination(cs.getContext(), di.getDestination(), 0);
+                } catch (Throwable e) {
+                    SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(),
e);
+                }
+                iter.remove();
             }
-        }
-        // Cascade the connection stop to temp destinations.
-        for (Iterator iter = cs.getTempDesinations().iterator(); iter.hasNext();) {
-            DestinationInfo di = (DestinationInfo)iter.next();
             try {
-                broker.removeDestination(cs.getContext(), di.getDestination(), 0);
+                broker.removeConnection(cs.getContext(), cs.getInfo(), null);
             } catch (Throwable e) {
-                SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(),
e);
+                SERVICELOG.warn("Failed to remove connection " + cs.getInfo(), e);
             }
-            iter.remove();
-        }
-        try {
-            broker.removeConnection(cs.getContext(), cs.getInfo(), null);
-        } catch (Throwable e) {
-            SERVICELOG.warn("Failed to remove connection " + cs.getInfo(), e);
-        }
-
-        TransportConnectionState state = unregisterConnectionState(id);
-        if (state != null) {
-            synchronized (brokerConnectionStates) {
-                // If we are the last reference, we should remove the state
-                // from the broker.
-                if (state.decrementReference() == 0) {
-                    brokerConnectionStates.remove(id);
+    
+            TransportConnectionState state = unregisterConnectionState(id);
+            if (state != null) {
+                synchronized (brokerConnectionStates) {
+                    // If we are the last reference, we should remove the state
+                    // from the broker.
+                    if (state.decrementReference() == 0) {
+                        brokerConnectionStates.remove(id);
+                    }
                 }
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java?rev=634797&r1=634796&r2=634797&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
Fri Mar  7 12:09:07 2008
@@ -112,9 +112,12 @@
 	        		     String str = "connectionId=" + old.getConnectionId() +",clientId="+old.getClientId();
 	        		     LOG.warn("Removing stale connection: " + str);
 	        		     try {
-                            old.getConnection().stop();
+	        		         //remove connection states
+	        		         TransportConnection connection = (TransportConnection) old.getConnection();
+                             connection.processRemoveConnection(old.getConnectionId());
+                             connection.stopAsync();
                         } catch (Exception e) {
-                            LOG.error("Failed to remove stale connection: " + str);
+                            LOG.error("Failed to remove stale connection: " + str,e);
                         }
 	        		 }
 	        	 }



Mime
View raw message