activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r636897 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
Date Thu, 13 Mar 2008 21:18:36 GMT
Author: rajdavies
Date: Thu Mar 13 14:18:35 2008
New Revision: 636897

URL: http://svn.apache.org/viewvc?rev=636897&view=rev
Log:
tidied up detection of stale subscriptions across a network

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java?rev=636897&r1=636896&r2=636897&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
Thu Mar 13 14:18:35 2008
@@ -16,33 +16,15 @@
  */
 package org.apache.activemq.broker.cluster;
 
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.advisory.AdvisoryBroker;
-import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.TransportConnection;
-import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DataStructure;
-import org.apache.activemq.command.Message;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -53,151 +35,52 @@
  * 
  * @version $Revision$
  */
-public class ConnectionSplitBroker extends BrokerFilter implements
-        MessageListener {
-    private static final Log LOG = LogFactory
-            .getLog(ConnectionSplitBroker.class);
-
-    private Connection connection;
-
-    private Map<ConnectionId, ConnectionContext> clientMap = new ConcurrentHashMap<ConnectionId,
ConnectionContext>();
-    private Map<ConsumerId,ConsumerInfo>consumerMap = new ConcurrentHashMap<ConsumerId,ConsumerInfo>();
+public class ConnectionSplitBroker extends BrokerFilter{
+    private static final Log LOG = LogFactory.getLog(ConnectionSplitBroker.class);
+    private List<ConsumerInfo>networkConsumerList = new ArrayList<ConsumerInfo>();
     public ConnectionSplitBroker(Broker next) {
         super(next);
     }
 
-    public void addConnection(ConnectionContext context, ConnectionInfo info)
-            throws Exception {
-        if (info != null) {
-            removeStaleConnection(info);
-            clientMap.put(info.getConnectionId(), context);
-        }
-        super.addConnection(context, info);
-    }
-
-    public void removeConnection(ConnectionContext context,
-            ConnectionInfo info, Throwable error) throws Exception {
-        if (info != null) {
-            clientMap.remove(info.getConnectionId());
-        }
-        super.removeConnection(context, info, error);
-    }
-    
+        
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws
Exception{
        
-        if (info.isNetworkSubscription()) {
-            List<ConsumerId>list = info.getNetworkConsumerIds();
-            for (ConsumerId id:list) {
-                consumerMap.put(id,info);
-            }
-        }else {
-            ConsumerInfo networkInfo = consumerMap.get(info.getConsumerId());
-            if (networkInfo != null) {
-                networkInfo.removeNetworkConsumerId(info.getConsumerId());
-                if (networkInfo.isNetworkConsumersEmpty()) {
-                    consumerMap.remove(info.getConsumerId());
-                    super.removeConsumer(context,networkInfo);
+        synchronized (networkConsumerList) {
+            if (info.isNetworkSubscription()) {
+                networkConsumerList.add(info);
+            } else {
+                List<ConsumerInfo> gcList = new ArrayList<ConsumerInfo>();
+                for (ConsumerInfo nc : networkConsumerList) {
+                    if (!nc.isNetworkConsumersEmpty()) {
+                        for (ConsumerId id : nc.getNetworkConsumerIds()) {
+                            if (id.equals(info.getConsumerId())) {
+                                nc.removeNetworkConsumerId(id);
+                                if (nc.isNetworkConsumersEmpty()) {
+                                    gcList.add(nc);
+                                }
+                            }
+                        }
+                    } else {
+                        gcList.add(nc);
+                    }
+                }
+                for (ConsumerInfo nc : gcList) {
+                    networkConsumerList.remove(nc);
+                    super.removeConsumer(context, nc);
+                    LOG.warn("Removed stale network consumer" + nc);
                 }
-                
             }
         }
         return super.addConsumer(context, info);
     }
 
-   
-    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception{
+    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception
{
         if (info.isNetworkSubscription()) {
-            List<ConsumerId>list = info.getNetworkConsumerIds();
-            for (ConsumerId id:list) {
-                consumerMap.remove(id);
-            }
-        }
-        super.removeConsumer(context, info);
-    }
-
-    public void start() throws Exception {
-        super.start();
-        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(
-                getBrokerService().getVmConnectorURI());
-        fac.setCloseTimeout(1);
-        fac.setWarnAboutUnstartedConnectionTimeout(10000);
-        fac.setWatchTopicAdvisories(false);
-        fac.setAlwaysSessionAsync(true);
-        fac.setClientID(getBrokerId().toString() + ":" + getBrokerName()
-                + ":ConnectionSplitBroker");
-        connection = fac.createConnection();
-        connection.start();
-        Session session = connection.createSession(false,
-                Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createConsumer(AdvisorySupport
-                .getConnectionAdvisoryTopic());
-        consumer.setMessageListener(this);
-    }
 
-    public synchronized void stop() throws Exception {
-        if (connection != null) {
-            connection.stop();
-            connection = null;
-        }
-        super.stop();
-    }
-
-    public void onMessage(javax.jms.Message m) {
-        ActiveMQMessage message = (ActiveMQMessage) m;
-
-        DataStructure o = message.getDataStructure();
-        if (o != null && o.getClass() == ConnectionInfo.class) {
-            ConnectionInfo info = (ConnectionInfo) o;
-            
-            String brokerId = null;
-            try {
-                brokerId = message
-                        .getStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID);
-                if (brokerId != null
-                        && !brokerId.equals(getBrokerId().getValue())) {
-                    // see if it already exits
-                    removeStaleConnection(info);
-                }
-            } catch (JMSException e) {
-                LOG.warn("Failed to get message property "
-                        + AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, e);
-            }
-
-        }
-
-    }
-
-    protected boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
-        if (brokerPath != null) {
-            for (int i = 0; i < brokerPath.length; i++) {
-                if (brokerId.equals(brokerPath[i])) {
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
-    
-    protected void removeStaleConnection(ConnectionInfo info) {
-     // see if it already exits
-        ConnectionContext old = clientMap.remove(info
-                .getConnectionId());
-        if (old != null && old.getConnection() != null) {
-            String str = "connectionId=" + old.getConnectionId()
-                    + ",clientId=" + old.getClientId();
-            LOG.warn("Removing stale connection: " + str);
-            try {
-                // remove connection states
-                TransportConnection connection = (TransportConnection) old
-                        .getConnection();
-                connection.processRemoveConnection(old
-                        .getConnectionId());
-                connection.stopAsync();
-            } catch (Exception e) {
-                LOG.error("Failed to remove stale connection: "
-                        + str, e);
+            synchronized (networkConsumerList) {
+                networkConsumerList.remove(info);
             }
         }
+        super.removeConsumer(context, info);
     }
-
 }



Mime
View raw message