activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r637609 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/cluster/ConnectionSplitBroker.java network/ConduitBridge.java network/DemandForwardingBridgeSupport.java
Date Sun, 16 Mar 2008 16:31:00 GMT
Author: rajdavies
Date: Sun Mar 16 09:30:55 2008
New Revision: 637609

URL: http://svn.apache.org/viewvc?rev=637609&view=rev
Log:
Ensure we detect Connection splits for any type of network

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java?rev=637609&r1=637608&r2=637609&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
Sun Mar 16 09:30:55 2008
@@ -44,39 +44,39 @@
     }
 
         
-    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws
Exception{
+    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info)
+            throws Exception {
         ActiveMQDestination dest = info.getDestination();
-        boolean validDestination = dest != null && !dest.isTemporary();
-        if (validDestination) {
-            synchronized (networkConsumerList) {
-                if (info.isNetworkSubscription()) {
-                    networkConsumerList.add(info);
-                } else {
-                    if(!networkConsumerList.isEmpty()) {
-                        List<ConsumerInfo> gcList = new ArrayList<ConsumerInfo>();
-                        for (ConsumerInfo nc : networkConsumerList) {
-                            if (!nc.isNetworkConsumersEmpty()) {
-                                for (ConsumerId id : nc.getNetworkConsumerIds()) {
-                                    if (id.equals(info.getConsumerId())) {
-                                        nc.removeNetworkConsumerId(id);
-                                        if (nc.isNetworkConsumersEmpty()) {
-                                            gcList.add(nc);
-                                        }
+
+        synchronized (networkConsumerList) {
+            if (info.isNetworkSubscription()) {
+                networkConsumerList.add(info);
+            } else {
+                if (!networkConsumerList.isEmpty()) {
+                    List<ConsumerInfo> gcList = new ArrayList<ConsumerInfo>();
+                    for (ConsumerInfo nc : networkConsumerList) {
+                        if (!nc.isNetworkConsumersEmpty()) {
+                            
+                            for (ConsumerId id : nc.getNetworkConsumerIds()) {
+                                
+                                if (id.equals(info.getConsumerId())) {
+                                    nc.removeNetworkConsumerId(id);
+                                    if (nc.isNetworkConsumersEmpty()) {
+                                        gcList.add(nc);
                                     }
                                 }
-                            } else {
-                                gcList.add(nc);
                             }
                         }
-                        for (ConsumerInfo nc : gcList) {
-                            networkConsumerList.remove(nc);
-                            super.removeConsumer(context, nc);
-                            LOG.warn("Removed stale network consumer " + nc);
-                        }
+                    }
+                    for (ConsumerInfo nc : gcList) {
+                        networkConsumerList.remove(nc);
+                        super.removeConsumer(context, nc);
+                        LOG.warn("Removed stale network consumer " + nc);
                     }
                 }
             }
         }
+
         return super.addConsumer(context, info);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=637609&r1=637608&r2=637609&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
Sun Mar 16 09:30:55 2008
@@ -51,6 +51,8 @@
         if (addToAlreadyInterestedConsumers(info)) {
             return null; // don't want this subscription added
         }
+        //add our original id to ourselves
+        info.addNetworkConsumerId(info.getConsumerId());
         return doCreateDemandSubscription(info);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=637609&r1=637608&r2=637609&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Sun Mar 16 09:30:55 2008
@@ -820,11 +820,14 @@
         }
         return result;
     }
-
+    
     protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException
{
+        //add our original id to ourselves
+        info.addNetworkConsumerId(info.getConsumerId());
         return doCreateDemandSubscription(info);
     }
 
+    
     protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException
{
         DemandSubscription result = new DemandSubscription(info);
         result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
consumerIdGenerator.getNextSequenceId()));
@@ -905,6 +908,7 @@
     protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException;
 
     protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException;
+    
 
     protected abstract BrokerId[] getRemoteBrokerPath();
 



Mime
View raw message