Author: gtully
Date: Mon Feb 4 14:06:06 2013
New Revision: 1442122
URL: http://svn.apache.org/viewvc?rev=1442122&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4082 - fix regression with org.apache.activemq.usecases.NetworkOfTwentyBrokersTest
nd org.apache.activemq.usecases.RequestReplyNoAdvisoryNetworkTest - statically included dests
in duplex case being ignored and unregister without register
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1442122&r1=1442121&r2=1442122&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Mon Feb 4 14:06:06 2013
@@ -261,6 +261,15 @@ public abstract class DemandForwardingBr
l.onStop(this);
}
try {
+ // local start complete
+ if (startedLatch.getCount() < 2) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(configuration.getBrokerName() + " unregister bridge
(" + this + ") to " + remoteBrokerName);
+ }
+ brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
+ brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
+ }
+
remoteBridgeStarted.set(false);
final CountDownLatch sendShutdown = new CountDownLatch(1);
@@ -306,12 +315,8 @@ public abstract class DemandForwardingBr
}
}
- if (remoteBrokerInfo != null) {
- brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
- brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
- if (LOG.isInfoEnabled()) {
- LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName
+ " stopped");
- }
+ if (LOG.isInfoEnabled()) {
+ LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName
+ " stopped");
}
}
}
@@ -381,43 +386,43 @@ public abstract class DemandForwardingBr
}
private void doStartLocalAndRemoteBridges() {
- try {
- startLocalBridge();
- } catch (Throwable e) {
- serviceLocalException(e);
+
+ if (disposed.get()) {
return;
}
- try {
-
- if (disposed.get()) {
- return;
- }
-
- Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
+ if (isCreatedByDuplex()) {
+ // apply remote (propagated) configuration to local duplex bridge before start
+ Properties props = null;
try {
+ props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
IntrospectionSupport.getProperties(configuration, props, null);
if (configuration.getExcludedDestinations() != null) {
excludedDestinations = configuration.getExcludedDestinations().toArray(
- new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
+ new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
}
if (configuration.getStaticallyIncludedDestinations() != null) {
staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
- new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
+ new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
}
if (configuration.getDynamicallyIncludedDestinations() != null) {
dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
- new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
+ new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
}
} catch (Throwable t) {
- LOG.error("Error mapping remote destinations", t);
+ LOG.error("Error mapping remote configuration: " + props, t);
}
+ }
- // Let the local broker know the remote broker's ID.
- localBroker.oneway(remoteBrokerInfo);
- // new peer broker (a consumer can work with remote broker also)
- brokerService.getBroker().addBroker(null, remoteBrokerInfo);
+ try {
+ startLocalBridge();
+ } catch (Throwable e) {
+ serviceLocalException(e);
+ return;
+ }
+ try {
+ startRemoteBridge();
startRemoteBridge();
} catch (Throwable e) {
serviceRemoteException(e);
@@ -454,7 +459,7 @@ public abstract class DemandForwardingBr
localBroker.oneway(localSessionInfo);
if (configuration.isDuplex()) {
- // separate in-bound chamnel for forwards so we don't
+ // separate in-bound channel for forwards so we don't
// contend with out-bound dispatch on same connection
ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
@@ -483,10 +488,17 @@ public abstract class DemandForwardingBr
l.onStart(this);
}
+ // Let the local broker know the remote broker's ID.
+ localBroker.oneway(remoteBrokerInfo);
+ // new peer broker (a consumer can work with remote broker also)
+ brokerService.getBroker().addBroker(null, remoteBrokerInfo);
+
if (LOG.isInfoEnabled()) {
LOG.info("Network connection between " + localBroker + " and " +
remoteBroker + "(" + remoteBrokerName + ") has been established.");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(configuration.getBrokerName() + " register bridge ("
+ this + ") to " + remoteBrokerName);
+ }
}
-
} else {
LOG.warn("Bridge was disposed before the startLocalBridge() method was
fully executed.");
}
@@ -1137,7 +1149,7 @@ public abstract class DemandForwardingBr
LOG.error("Failed to add static destination " + dest, e);
}
if (LOG.isTraceEnabled()) {
- LOG.trace("bridging messages for static destination: " + dest);
+ LOG.trace(configuration.getBrokerName() + ", bridging messages for static
destination: " + dest);
}
}
}
|