activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1442122 - /activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Date Mon, 04 Feb 2013 14:06:07 GMT
Author: gtully
Date: Mon Feb  4 14:06:06 2013
New Revision: 1442122

URL: http://svn.apache.org/viewvc?rev=1442122&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4082 - fix regression with org.apache.activemq.usecases.NetworkOfTwentyBrokersTest
nd org.apache.activemq.usecases.RequestReplyNoAdvisoryNetworkTest - statically included dests
in duplex case being ignored and unregister without register

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1442122&r1=1442121&r2=1442122&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Mon Feb  4 14:06:06 2013
@@ -261,6 +261,15 @@ public abstract class DemandForwardingBr
                     l.onStop(this);
                 }
                 try {
+                    // local start complete
+                    if (startedLatch.getCount() < 2) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace(configuration.getBrokerName() + " unregister bridge
(" + this + ") to " + remoteBrokerName);
+                        }
+                        brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
+                        brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
+                    }
+
                     remoteBridgeStarted.set(false);
                     final CountDownLatch sendShutdown = new CountDownLatch(1);
 
@@ -306,12 +315,8 @@ public abstract class DemandForwardingBr
                 }
             }
 
-            if (remoteBrokerInfo != null) {
-                brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
-                brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
-                if (LOG.isInfoEnabled()) {
-                    LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName
+ " stopped");
-                }
+            if (LOG.isInfoEnabled()) {
+                LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName
+ " stopped");
             }
         }
     }
@@ -381,43 +386,43 @@ public abstract class DemandForwardingBr
     }
 
     private void doStartLocalAndRemoteBridges() {
-        try {
-            startLocalBridge();
-        } catch (Throwable e) {
-            serviceLocalException(e);
+
+        if (disposed.get()) {
             return;
         }
 
-        try {
-
-            if (disposed.get()) {
-                return;
-            }
-
-            Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
+        if (isCreatedByDuplex()) {
+            // apply remote (propagated) configuration to local duplex bridge before start
+            Properties props = null;
             try {
+                props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
                 IntrospectionSupport.getProperties(configuration, props, null);
                 if (configuration.getExcludedDestinations() != null) {
                     excludedDestinations = configuration.getExcludedDestinations().toArray(
-                        new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
+                            new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
                 }
                 if (configuration.getStaticallyIncludedDestinations() != null) {
                     staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
-                        new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
+                            new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
                 }
                 if (configuration.getDynamicallyIncludedDestinations() != null) {
                     dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
-                        new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
+                            new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
                 }
             } catch (Throwable t) {
-                LOG.error("Error mapping remote destinations", t);
+                LOG.error("Error mapping remote configuration: " + props, t);
             }
+        }
 
-            // Let the local broker know the remote broker's ID.
-            localBroker.oneway(remoteBrokerInfo);
-            // new peer broker (a consumer can work with remote broker also)
-            brokerService.getBroker().addBroker(null, remoteBrokerInfo);
+        try {
+            startLocalBridge();
+        } catch (Throwable e) {
+            serviceLocalException(e);
+            return;
+        }
 
+        try {
+            startRemoteBridge();
             startRemoteBridge();
         } catch (Throwable e) {
             serviceRemoteException(e);
@@ -454,7 +459,7 @@ public abstract class DemandForwardingBr
                     localBroker.oneway(localSessionInfo);
 
                     if (configuration.isDuplex()) {
-                        // separate in-bound chamnel for forwards so we don't
+                        // separate in-bound channel for forwards so we don't
                         // contend with out-bound dispatch on same connection
                         ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
                         duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
@@ -483,10 +488,17 @@ public abstract class DemandForwardingBr
                         l.onStart(this);
                     }
 
+                    // Let the local broker know the remote broker's ID.
+                    localBroker.oneway(remoteBrokerInfo);
+                    // new peer broker (a consumer can work with remote broker also)
+                    brokerService.getBroker().addBroker(null, remoteBrokerInfo);
+
                     if (LOG.isInfoEnabled()) {
                         LOG.info("Network connection between " + localBroker + " and " +
remoteBroker + "(" + remoteBrokerName + ") has been established.");
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace(configuration.getBrokerName() + " register bridge ("
+ this + ") to " + remoteBrokerName);
+                        }
                     }
-
                 } else {
                     LOG.warn("Bridge was disposed before the startLocalBridge() method was
fully executed.");
                 }
@@ -1137,7 +1149,7 @@ public abstract class DemandForwardingBr
                     LOG.error("Failed to add static destination " + dest, e);
                 }
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace("bridging messages for static destination: " + dest);
+                    LOG.trace(configuration.getBrokerName() + ", bridging messages for static
destination: " + dest);
                 }
             }
         }



Mime
View raw message