activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r634589 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: advisory/AdvisoryBroker.java broker/cluster/ConnectionSplitBroker.java
Date Fri, 07 Mar 2008 09:06:32 GMT
Author: rajdavies
Date: Fri Mar  7 01:06:30 2008
New Revision: 634589

URL: http://svn.apache.org/viewvc?rev=634589&view=rev
Log:
Detect clients fail over - that haven't removed themselves from
a brokee

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=634589&r1=634588&r2=634589&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Fri Mar  7 01:06:30 2008
@@ -79,7 +79,7 @@
 
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws
Exception {
         Subscription answer = next.addConsumer(context, info);
-
+        
         // Don't advise advisory topics.
         if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java?rev=634589&r1=634588&r2=634589&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
Fri Mar  7 01:06:30 2008
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.cluster;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -31,6 +32,8 @@
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.TransportConnection;
+import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.ConnectionId;
@@ -50,14 +53,14 @@
 public class ConnectionSplitBroker extends BrokerFilter implements MessageListener{
 	private static final Log LOG = LogFactory.getLog(ConnectionSplitBroker.class);
     private Connection connection;
-    private Map <ConnectionId,ConnectionInfo>clientMap = new ConcurrentHashMap<ConnectionId,ConnectionInfo>();
+    private Map <ConnectionId,ConnectionContext>clientMap = new ConcurrentHashMap<ConnectionId,ConnectionContext>();
     public ConnectionSplitBroker(Broker next) {
         super(next);
     }
 
     public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception
{
     	if (info != null){
-    		clientMap.put(info.getConnectionId(),info);
+    		clientMap.put(info.getConnectionId(),context);
     	}
         super.addConnection(context, info);
     }
@@ -77,6 +80,7 @@
     	fac.setWarnAboutUnstartedConnectionTimeout(10000);
     	fac.setWatchTopicAdvisories(false);
     	fac.setAlwaysSessionAsync(true);
+    	fac.setClientID(getBrokerId().toString()+":" + getBrokerName() + ":ConnectionSplitBroker");
     	connection = fac.createConnection();
     	connection.start();
     	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -101,8 +105,18 @@
         	 String brokerId=null;
 			try {
 				brokerId = message.getStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID);
-				if (brokerId != null && brokerId.equals(getBrokerId().getValue())) {
-	        		 
+				if (brokerId != null && !brokerId.equals(getBrokerId().getValue())) {
+	        		 //see if it already exits
+	        		 ConnectionContext old = clientMap.remove(info.getConnectionId());
+	        		 if (old != null && old.getConnection() != null) {
+	        		     String str = "connectionId=" + old.getConnectionId() +",clientId="+old.getClientId();
+	        		     LOG.warn("Removing stale connection: " + str);
+	        		     try {
+                            old.getConnection().stop();
+                        } catch (Exception e) {
+                            LOG.error("Failed to remove stale connection: " + str);
+                        }
+	        		 }
 	        	 }
 			} catch (JMSException e) {
 				LOG.warn("Failed to get message property "+ AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID,e);



Mime
View raw message