activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1440181 - /activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Date Tue, 29 Jan 2013 22:14:10 GMT
Author: tabish
Date: Tue Jan 29 22:14:10 2013
New Revision: 1440181

URL: http://svn.apache.org/viewvc?rev=1440181&view=rev
Log:
Some additional code cleanup, doc fixes, etc. 

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1440181&r1=1440180&r2=1440181&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Tue Jan 29 22:14:10 2013
@@ -127,7 +127,7 @@ public abstract class DemandForwardingBr
     protected NetworkBridgeConfiguration configuration;
     protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
 
-    protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
+    protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null };
     protected Object brokerInfoMutex = new Object();
     protected BrokerId remoteBrokerId;
 
@@ -224,7 +224,7 @@ public abstract class DemandForwardingBr
                     LOG.warn("Caught exception from remote start", e);
                 }
             } else {
-                LOG.warn ("Bridge was disposed before the start() method was fully executed.");
+                LOG.warn("Bridge was disposed before the start() method was fully executed.");
                 throw new TransportDisposedIOException();
             }
         }
@@ -288,16 +288,18 @@ public abstract class DemandForwardingBr
                     // sync requests that may fail
                     Object resp = localBroker.request(localConnectionInfo);
                     if (resp instanceof ExceptionResponse) {
-                        throw ((ExceptionResponse)resp).getException();
+                        throw ((ExceptionResponse) resp).getException();
                     }
                     localSessionInfo = new SessionInfo(localConnectionInfo, 1);
                     localBroker.oneway(localSessionInfo);
 
                     if (configuration.isDuplex()) {
-                        // separate inbound chanel for forwards so we don't contend with
outbound dispatch on same connection
+                        // separate in-bound chamnel for forwards so we don't
+                        // contend with out-bound dispatch on same connection
                         ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
                         duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
-                        duplexLocalConnectionInfo.setClientId(configuration.getName() + "_"
+ remoteBrokerName + "_inbound_duplex_" + configuration.getBrokerName());
+                        duplexLocalConnectionInfo.setClientId(configuration.getName() + "_"
+ remoteBrokerName + "_inbound_duplex_"
+                            + configuration.getBrokerName());
                         duplexLocalConnectionInfo.setUserName(configuration.getUserName());
                         duplexLocalConnectionInfo.setPassword(configuration.getPassword());
 
@@ -308,7 +310,7 @@ public abstract class DemandForwardingBr
                         // sync requests that may fail
                         resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
                         if (resp instanceof ExceptionResponse) {
-                            throw ((ExceptionResponse)resp).getException();
+                            throw ((ExceptionResponse) resp).getException();
                         }
                         SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo,
1);
                         duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession,
1);
@@ -323,7 +325,7 @@ public abstract class DemandForwardingBr
                     LOG.info("Network connection between " + localBroker + " and " + remoteBroker
+ "(" + remoteBrokerName + ") has been established.");
 
                 } else {
-                    LOG.warn ("Bridge was disposed before the startLocalBridge() method was
fully executed.");
+                    LOG.warn("Bridge was disposed before the startLocalBridge() method was
fully executed.");
                 }
                 startedLatch.countDown();
                 localStartedLatch.countDown();
@@ -334,7 +336,8 @@ public abstract class DemandForwardingBr
             if (!disposed.get()) {
                 setupStaticDestinations();
             } else {
-                LOG.warn("Network connection between " + localBroker + " and " + remoteBroker
+ "(" + remoteBrokerName + ") was interrupted during establishment.");
+                LOG.warn("Network connection between " + localBroker + " and " + remoteBroker
+ "(" + remoteBrokerName
+                    + ") was interrupted during establishment.");
             }
         }
     }
@@ -374,12 +377,11 @@ public abstract class DemandForwardingBr
                 producerInfo = new ProducerInfo(remoteSessionInfo, 1);
                 producerInfo.setResponseRequired(false);
                 remoteBroker.oneway(producerInfo);
-                // Listen to consumer advisory messages on the remote broker to
-                // determine demand.
+                // Listen to consumer advisory messages on the remote broker to determine
demand.
                 if (!configuration.isStaticBridge()) {
                     demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
-                    // always dispatch advisory message asynchronously so that we never block
the producer
-                    // broker if we are slow
+                    // always dispatch advisory message asynchronously so that
+                    // we never block the producer broker if we are slow
                     demandConsumerInfo.setDispatchAsync(true);
                     String advisoryTopic = configuration.getDestinationFilter();
                     if (configuration.isBridgeTempDestinations()) {
@@ -488,17 +490,15 @@ public abstract class DemandForwardingBr
                         IntrospectionSupport.getProperties(configuration, props, null);
                         if (configuration.getExcludedDestinations() != null) {
                             excludedDestinations = configuration.getExcludedDestinations().toArray(
-                                    new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
+                                new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
                         }
                         if (configuration.getStaticallyIncludedDestinations() != null) {
                             staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
-                                    new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
+                                new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
                         }
                         if (configuration.getDynamicallyIncludedDestinations() != null) {
-                            dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations()
-                                    .toArray(
-                                            new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations()
-                                                    .size()]);
+                            dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
+                                new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
                         }
                     } catch (Throwable t) {
                         LOG.error("Error mapping remote destinations", t);
@@ -514,7 +514,7 @@ public abstract class DemandForwardingBr
                 } else {
                     if (isDuplex()) {
                         if (LOG.isTraceEnabled()) {
-                            LOG.trace(configuration.getBrokerName() + " duplex command type:
"+ command.getCommandId());
+                            LOG.trace(configuration.getBrokerName() + " duplex command type:
" + command.getCommandId());
                         }
                         if (command.isMessage()) {
                             final ActiveMQMessage message = (ActiveMQMessage) command;
@@ -526,11 +526,13 @@ public abstract class DemandForwardingBr
                                 if (!isPermissableDestination(message.getDestination(), true))
{
                                     return;
                                 }
-                                // message being forwarded - we need to propagate the response
to our local send
+                                // message being forwarded - we need to
+                                // propagate the response to our local send
                                 message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
                                 if (message.isResponseRequired() || configuration.isAlwaysSyncSend())
{
                                     duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback()
{
                                         final int correlationId = message.getCommandId();
+
                                         @Override
                                         public void onCompletion(FutureResponse resp) {
                                             try {
@@ -641,10 +643,12 @@ public abstract class DemandForwardingBr
 
             if (path != null && path.length >= networkTTL) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName
+ ", restricted to " + networkTTL + " network hops only : " + info);
+                    LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName
+ ", restricted to " + networkTTL
+                        + " network hops only : " + info);
                 }
                 return;
             }
+
             if (contains(path, localBrokerPath[0])) {
                 // Ignore this consumer as it's a consumer we locally sent to the broker.
                 if (LOG.isDebugEnabled()) {
@@ -652,10 +656,12 @@ public abstract class DemandForwardingBr
                 }
                 return;
             }
+
             if (!isPermissableDestination(info.getDestination())) {
                 // ignore if not in the permitted or in the excluded list
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName
+ ", destination " + info.getDestination() + " is not permiited :" + info);
+                    LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName
+ ", destination " + info.getDestination()
+                        + " is not permiited :" + info);
                 }
                 return;
             }
@@ -669,7 +675,8 @@ public abstract class DemandForwardingBr
                     }
                 } else {
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " +
remoteBrokerName + " as already subscribed to matching destination : " + info);
+                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " +
remoteBrokerName
+                            + " as already subscribed to matching destination : " + info);
                     }
                 }
             }
@@ -698,10 +705,12 @@ public abstract class DemandForwardingBr
             }
             destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
             if (LOG.isTraceEnabled()) {
-                LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation()
? "add" : "remove") + " destination on " + localBroker + " from " + remoteBrokerName + ",
destination: " + destInfo);
+                LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation()
? "add" : "remove") + " destination on " + localBroker
+                    + " from " + remoteBrokerName + ", destination: " + destInfo);
             }
             if (destInfo.isRemoveOperation()) {
-                // Serialize with removeSub operations such that all removeSub advisories
are generated
+                // Serialize with removeSub operations such that all removeSub advisories
+                // are generated
                 serialExecutor.execute(new Runnable() {
                     @Override
                     public void run() {
@@ -729,10 +738,11 @@ public abstract class DemandForwardingBr
     public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
 
         if (!disposed.get()) {
-            if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException)error).isTemporary()
) {
-                // not a reason to terminate the bridge - temps can disappear with pending
sends as the demand sub may outlive the remote dest
+            if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException)
error).isTemporary()) {
+                // not a reason to terminate the bridge - temps can disappear with
+                // pending sends as the demand sub may outlive the remote dest
                 if (messageDispatch != null) {
-                    LOG.warn("PoisonAck of " + messageDispatch.getMessage().getMessageId()
+ " on forwarding error: " +  error);
+                    LOG.warn("PoisonAck of " + messageDispatch.getMessage().getMessageId()
+ " on forwarding error: " + error);
                     try {
                         MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE,
1);
                         poisonAck.setPoisonCause(error);
@@ -742,7 +752,7 @@ public abstract class DemandForwardingBr
                     }
                     fireFailedForwardAdvisory(messageDispatch, error);
                 } else {
-                    LOG.warn("Ignoring exception on forwarding to non existent temp dest:
" +  error, error);
+                    LOG.warn("Ignoring exception on forwarding to non existent temp dest:
" + error, error);
                 }
                 return;
             }
@@ -771,9 +781,8 @@ public abstract class DemandForwardingBr
 
                     ActiveMQMessage advisoryMessage = new ActiveMQMessage();
                     advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
-                    advisoryBroker.fireAdvisory(context,
-                            AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(),
-                            messageDispatch.getMessage(), null, advisoryMessage);
+                    advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(),
messageDispatch.getMessage(), null,
+                        advisoryMessage);
 
                 }
             } catch (Exception e) {
@@ -798,9 +807,8 @@ public abstract class DemandForwardingBr
     protected void removeSubscription(final DemandSubscription sub) throws IOException {
         if (sub != null) {
             if (LOG.isTraceEnabled()) {
-                LOG.trace(configuration.getBrokerName() + " remove local subscription:"
-                        + sub.getLocalInfo().getConsumerId()
-                        + " for remote " + sub.getRemoteInfo().getConsumerId());
+                LOG.trace(configuration.getBrokerName() + " remove local subscription:" +
sub.getLocalInfo().getConsumerId() + " for remote "
+                    + sub.getRemoteInfo().getConsumerId());
             }
 
             // ensure not available for conduit subs pending removal
@@ -808,8 +816,8 @@ public abstract class DemandForwardingBr
             subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
 
             // continue removal in separate thread to free up this thread for outstanding
responses
-            // Serialize with removeDestination operations so that removeSubs are serialised
with removeDestinations
-            // such that all removeSub advisories are generated
+            // Serialize with removeDestination operations so that removeSubs are serialized
with
+            // removeDestinations such that all removeSub advisories are generated
             serialExecutor.execute(new Runnable() {
                 @Override
                 public void run() {
@@ -852,7 +860,9 @@ public abstract class DemandForwardingBr
 
                         if (suppressMessageDispatch(md, sub)) {
                             if (LOG.isDebugEnabled()) {
-                                LOG.debug(configuration.getBrokerName() + " message not forwarded
to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath:
" + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage());
+                                LOG.debug(configuration.getBrokerName() + " message not forwarded
to " + remoteBrokerName
+                                    + " because message came from there or fails networkTTL,
brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath())
+                                    + ", message: " + md.getMessage());
                             }
                             // still ack as it may be durable
                             try {
@@ -865,14 +875,15 @@ public abstract class DemandForwardingBr
 
                         Message message = configureMessage(md);
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug("bridging (" + configuration.getBrokerName() + " ->
" + remoteBrokerName + ") " + (LOG.isTraceEnabled() ? message : message.getMessageId()) +
", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath:
" + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
+                            LOG.debug("bridging (" + configuration.getBrokerName() + " ->
" + remoteBrokerName + ") "
+                                + (LOG.isTraceEnabled() ? message : message.getMessageId())
+ ", consumer: " + md.getConsumerId() + ", destination "
+                                + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath())
+ ", message: " + message);
                         }
 
                         if (!configuration.isAlwaysSyncSend() && !message.isPersistent())
{
 
-                            // If the message was originally sent using async
-                            // send, we will preserve that QOS
-                            // by bridging it using an async send (small chance
+                            // If the message was originally sent using async send, we will
+                            // preserve that QOS by bridging it using an async send (small
chance
                             // of message loss).
                             try {
                                 remoteBroker.oneway(message);
@@ -884,9 +895,8 @@ public abstract class DemandForwardingBr
 
                         } else {
 
-                            // The message was not sent using async send, so we
-                            // should only ack the local
-                            // broker when we get confirmation that the remote
+                            // The message was not sent using async send, so we should only
+                            // ack the local broker when we get confirmation that the remote
                             // broker has received the message.
                             ResponseCallback callback = new ResponseCallback() {
                                 @Override
@@ -909,11 +919,11 @@ public abstract class DemandForwardingBr
                             };
 
                             remoteBroker.asyncRequest(message, callback);
-
                         }
                     } else {
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug("No subscription registered with this network bridge
for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
+                            LOG.debug("No subscription registered with this network bridge
for consumerId " + md.getConsumerId() + " for message: "
+                                + md.getMessage());
                         }
                     }
                 } else if (command.isBrokerInfo()) {
@@ -927,10 +937,10 @@ public abstract class DemandForwardingBr
                     serviceLocalException(ce.getException());
                 } else {
                     switch (command.getDataStructureType()) {
-                    case WireFormatInfo.DATA_STRUCTURE_TYPE:
-                        break;
-                    default:
-                        LOG.warn("Unexpected local command: " + command);
+                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
+                            break;
+                        default:
+                            LOG.warn("Unexpected local command: " + command);
                     }
                 }
             } catch (Throwable e) {
@@ -940,10 +950,46 @@ public abstract class DemandForwardingBr
         }
     }
 
+    protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
+        synchronized (brokerInfoMutex) {
+            if (remoteBrokerId != null) {
+                if (remoteBrokerId.equals(localBrokerId)) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace(configuration.getBrokerName() + " disconnecting local loop
back connection for: " + remoteBrokerName + ", with id:"
+                            + remoteBrokerId);
+                    }
+                    safeWaitUntilStarted();
+                    ServiceSupport.dispose(this);
+                }
+            }
+        }
+    }
+
+    protected void serviceRemoteBrokerInfo(Command command) throws IOException {
+        synchronized (brokerInfoMutex) {
+            BrokerInfo remoteBrokerInfo = (BrokerInfo) command;
+            remoteBrokerId = remoteBrokerInfo.getBrokerId();
+            remoteBrokerPath[0] = remoteBrokerId;
+            remoteBrokerName = remoteBrokerInfo.getBrokerName();
+            if (localBrokerId != null) {
+                if (localBrokerId.equals(remoteBrokerId)) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace(configuration.getBrokerName() + " disconnecting remote
loop back connection for: " + remoteBrokerName + ", with id:"
+                            + remoteBrokerId);
+                    }
+                    ServiceSupport.dispose(this);
+                }
+            }
+            if (!disposed.get()) {
+                triggerLocalStartBridge();
+            }
+        }
+    }
+
     private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws
Exception {
         boolean suppress = false;
-        // for durable subs, suppression via filter leaves dangling acks so we need to
-        // check here and allow the ack irrespective
+        // for durable subs, suppression via filter leaves dangling acks so we
+        // need to check here and allow the ack irrespective
         if (sub.getLocalInfo().isDurable()) {
             MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
             messageEvalContext.setMessageReference(md.getMessage());
@@ -953,96 +999,10 @@ public abstract class DemandForwardingBr
         return suppress;
     }
 
-    /**
-     * @return Returns the dynamicallyIncludedDestinations.
-     */
-    public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
-        return dynamicallyIncludedDestinations;
-    }
-
-    /**
-     * @param dynamicallyIncludedDestinations The
-     *            dynamicallyIncludedDestinations to set.
-     */
-    public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations)
{
-        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
-    }
-
-    /**
-     * @return Returns the excludedDestinations.
-     */
-    public ActiveMQDestination[] getExcludedDestinations() {
-        return excludedDestinations;
-    }
-
-    /**
-     * @param excludedDestinations The excludedDestinations to set.
-     */
-    public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
-        this.excludedDestinations = excludedDestinations;
-    }
-
-    /**
-     * @return Returns the staticallyIncludedDestinations.
-     */
-    public ActiveMQDestination[] getStaticallyIncludedDestinations() {
-        return staticallyIncludedDestinations;
-    }
-
-    /**
-     * @param staticallyIncludedDestinations The staticallyIncludedDestinations
-     *            to set.
-     */
-    public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations)
{
-        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
-    }
-
-    /**
-     * @return Returns the durableDestinations.
-     */
-    public ActiveMQDestination[] getDurableDestinations() {
-        return durableDestinations;
-    }
-
-    /**
-     * @param durableDestinations The durableDestinations to set.
-     */
-    public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
-        this.durableDestinations = durableDestinations;
-    }
-
-    /**
-     * @return Returns the localBroker.
-     */
-    public Transport getLocalBroker() {
-        return localBroker;
-    }
-
-    /**
-     * @return Returns the remoteBroker.
-     */
-    public Transport getRemoteBroker() {
-        return remoteBroker;
-    }
-
-    /**
-     * @return the createdByDuplex
-     */
-    public boolean isCreatedByDuplex() {
-        return this.createdByDuplex;
-    }
-
-    /**
-     * @param createdByDuplex the createdByDuplex to set
-     */
-    public void setCreatedByDuplex(boolean createdByDuplex) {
-        this.createdByDuplex = createdByDuplex;
-    }
-
     public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
         if (brokerPath != null) {
-            for (int i = 0; i < brokerPath.length; i++) {
-                if (brokerId.equals(brokerPath[i])) {
+            for (BrokerId id : brokerPath) {
+                if (brokerId.equals(id)) {
                     return true;
                 }
             }
@@ -1086,10 +1046,9 @@ public abstract class DemandForwardingBr
 
         ActiveMQDestination[] dests = staticallyIncludedDestinations;
         if (dests != null && dests.length > 0) {
-            for (int i = 0; i < dests.length; i++) {
-                ActiveMQDestination match = dests[i];
-                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match);
-                if (match != null && inclusionFilter.matches(destination) &&
dests[i].getDestinationType() == destination.getDestinationType()) {
+            for (ActiveMQDestination dest : dests) {
+                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
+                if (dest != null && inclusionFilter.matches(destination) &&
dest.getDestinationType() == destination.getDestinationType()) {
                     return true;
                 }
             }
@@ -1097,10 +1056,9 @@ public abstract class DemandForwardingBr
 
         dests = excludedDestinations;
         if (dests != null && dests.length > 0) {
-            for (int i = 0; i < dests.length; i++) {
-                ActiveMQDestination match = dests[i];
-                DestinationFilter exclusionFilter = DestinationFilter.parseFilter(match);
-                if (match != null && exclusionFilter.matches(destination) &&
dests[i].getDestinationType() == destination.getDestinationType()) {
+            for (ActiveMQDestination dest : dests) {
+                DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
+                if (dest != null && exclusionFilter.matches(destination) &&
dest.getDestinationType() == destination.getDestinationType()) {
                     return false;
                 }
             }
@@ -1108,10 +1066,9 @@ public abstract class DemandForwardingBr
 
         dests = dynamicallyIncludedDestinations;
         if (dests != null && dests.length > 0) {
-            for (int i = 0; i < dests.length; i++) {
-                ActiveMQDestination match = dests[i];
-                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match);
-                if (match != null && inclusionFilter.matches(destination) &&
dests[i].getDestinationType() == destination.getDestinationType()) {
+            for (ActiveMQDestination dest : dests) {
+                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
+                if (dest != null && inclusionFilter.matches(destination) &&
dest.getDestinationType() == destination.getDestinationType()) {
                     return true;
                 }
             }
@@ -1127,8 +1084,7 @@ public abstract class DemandForwardingBr
     protected void setupStaticDestinations() {
         ActiveMQDestination[] dests = staticallyIncludedDestinations;
         if (dests != null) {
-            for (int i = 0; i < dests.length; i++) {
-                ActiveMQDestination dest = dests[i];
+            for (ActiveMQDestination dest : dests) {
                 DemandSubscription sub = createDemandSubscription(dest);
                 try {
                     addSubscription(sub);
@@ -1164,21 +1120,21 @@ public abstract class DemandForwardingBr
     }
 
     /*
-     * check our existing subs networkConsumerIds against the list of network ids in this
subscription
-     * A match means a duplicate which we suppress for topics and maybe for queues
+     * check our existing subs networkConsumerIds against the list of network
+     * ids in this subscription A match means a duplicate which we suppress for
+     * topics and maybe for queues
      */
     private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
         final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
         boolean suppress = false;
 
-        if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()
||
-                consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions())
{
+        if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()
|| consumerInfo.getDestination().isTopic()
+            && !configuration.isSuppressDuplicateTopicSubscriptions()) {
             return suppress;
         }
 
         List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
-        Collection<Subscription> currentSubs =
-            getRegionSubscriptions(consumerInfo.getDestination());
+        Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination());
         for (Subscription sub : currentSubs) {
             List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
             if (!networkConsumers.isEmpty()) {
@@ -1196,7 +1152,7 @@ public abstract class DemandForwardingBr
     }
 
     private boolean isInActiveDurableSub(Subscription sub) {
-        return  (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription
&& !((DurableTopicSubscription)sub).isActive());
+        return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription
&& !((DurableTopicSubscription) sub).isActive());
     }
 
     private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo)
{
@@ -1204,9 +1160,9 @@ public abstract class DemandForwardingBr
 
         if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority())
{
             if (LOG.isDebugEnabled()) {
-                LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription
from " + remoteBrokerName
-                        + ", sub: " + candidateInfo + " is duplicated by network subscription
with equal or higher network priority: "
-                        + existingSub  + ", networkConsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds());
+                LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription
from " + remoteBrokerName + ", sub: " + candidateInfo
+                    + " is duplicated by network subscription with equal or higher network
priority: " + existingSub + ", networkConsumerIds: "
+                    + existingSub.getConsumerInfo().getNetworkConsumerIds());
             }
             suppress = true;
         } else {
@@ -1215,10 +1171,9 @@ public abstract class DemandForwardingBr
                 removeDuplicateSubscription(existingSub);
 
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription
" + existingSub.getConsumerInfo()
-                            + " with sub from " + remoteBrokerName
-                            + ", which has a higher priority, new sub: " + candidateInfo
+ ", networkComsumerIds: "
-                            + candidateInfo.getNetworkConsumerIds());
+                    LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription
" + existingSub.getConsumerInfo() + " with sub from "
+                        + remoteBrokerName + ", which has a higher priority, new sub: " +
candidateInfo + ", networkComsumerIds: "
+                        + candidateInfo.getNetworkConsumerIds());
                 }
             } catch (IOException e) {
                 LOG.error("Failed to remove duplicated sub as a result of sub with higher
priority, sub: " + existingSub, e);
@@ -1252,26 +1207,22 @@ public abstract class DemandForwardingBr
         Collection<Subscription> subs;
 
         region = null;
-        switch ( dest.getDestinationType() )
-        {
+        switch (dest.getDestinationType()) {
             case ActiveMQDestination.QUEUE_TYPE:
                 region = region_broker.getQueueRegion();
                 break;
-
             case ActiveMQDestination.TOPIC_TYPE:
                 region = region_broker.getTopicRegion();
                 break;
-
             case ActiveMQDestination.TEMP_QUEUE_TYPE:
                 region = region_broker.getTempQueueRegion();
                 break;
-
             case ActiveMQDestination.TEMP_TOPIC_TYPE:
                 region = region_broker.getTempTopicRegion();
                 break;
         }
 
-        if ( region instanceof AbstractRegion ) {
+        if (region instanceof AbstractRegion) {
             subs = ((AbstractRegion) region).getSubscriptions().values();
         } else {
             subs = null;
@@ -1281,7 +1232,7 @@ public abstract class DemandForwardingBr
     }
 
     protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException
{
-        //add our original id to ourselves
+        // add our original id to ourselves
         info.addNetworkConsumerId(info.getConsumerId());
         return doCreateDemandSubscription(info);
     }
@@ -1318,8 +1269,8 @@ public abstract class DemandForwardingBr
         // Indicate that this subscription is being made on behalf of the remote broker.
         info.setBrokerPath(new BrokerId[] { remoteBrokerId });
 
-        // the remote info held by the DemandSubscription holds the original consumerId,
-        // the local info get's overwritten
+        // the remote info held by the DemandSubscription holds the original
+        // consumerId, the local info get's overwritten
         info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
         DemandSubscription result = null;
         try {
@@ -1331,7 +1282,7 @@ public abstract class DemandForwardingBr
     }
 
     protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub)
throws IOException {
-        if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())){
+        if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
             sub.getLocalInfo().setDispatchAsync(true);
         } else {
             sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
@@ -1345,7 +1296,7 @@ public abstract class DemandForwardingBr
             // This works for now since we use a VM connection to the local broker.
             // may need to change if we ever subscribe to a remote broker.
             sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
-        } else  {
+        } else {
             // need to ack this message if it is ignored as it is durable so
             // we check before we send. see: suppressMessageDispatch()
         }
@@ -1354,7 +1305,8 @@ public abstract class DemandForwardingBr
     protected void removeDemandSubscription(ConsumerId id) throws IOException {
         DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
         if (LOG.isDebugEnabled()) {
-            LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker
+ " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub);
+            LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker
+ " from " + remoteBrokerName + " , consumer id: " + id
+                + ", matching sub: " + sub);
         }
         if (sub != null) {
             removeSubscription(sub);
@@ -1379,8 +1331,8 @@ public abstract class DemandForwardingBr
     }
 
     /**
-     * Performs a timed wait on the started latch and then checks for disposed before performing
-     * another wait each time the the started wait times out.
+     * Performs a timed wait on the started latch and then checks for disposed
+     * before performing another wait each time the the started wait times out.
      *
      * @throws InterruptedException
      */
@@ -1403,45 +1355,11 @@ public abstract class DemandForwardingBr
         return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL());
     }
 
-    protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
-        synchronized (brokerInfoMutex) {
-            if (remoteBrokerId != null) {
-                if (remoteBrokerId.equals(localBrokerId)) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace(configuration.getBrokerName() + " disconnecting local loop
back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
-                    }
-                    safeWaitUntilStarted();
-                    ServiceSupport.dispose(this);
-                }
-            }
-        }
-    }
-
     protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
         info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
     }
 
-    protected void serviceRemoteBrokerInfo(Command command) throws IOException {
-        synchronized (brokerInfoMutex) {
-            BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
-            remoteBrokerId = remoteBrokerInfo.getBrokerId();
-            remoteBrokerPath[0] = remoteBrokerId;
-            remoteBrokerName = remoteBrokerInfo.getBrokerName();
-            if (localBrokerId != null) {
-                if (localBrokerId.equals(remoteBrokerId)) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace(configuration.getBrokerName() + " disconnecting remote
loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
-                    }
-                    ServiceSupport.dispose(this);
-                }
-            }
-            if (!disposed.get()) {
-                triggerLocalStartBridge();
-            }
-        }
-    }
-
-    protected  BrokerId[] getRemoteBrokerPath() {
+    protected BrokerId[] getRemoteBrokerPath() {
         return remoteBrokerPath;
     }
 
@@ -1457,6 +1375,95 @@ public abstract class DemandForwardingBr
         }
     }
 
+    /**
+     * @return Returns the dynamicallyIncludedDestinations.
+     */
+    public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
+        return dynamicallyIncludedDestinations;
+    }
+
+    /**
+     * @param dynamicallyIncludedDestinations
+     *            The dynamicallyIncludedDestinations to set.
+     */
+    public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations)
{
+        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
+    }
+
+    /**
+     * @return Returns the excludedDestinations.
+     */
+    public ActiveMQDestination[] getExcludedDestinations() {
+        return excludedDestinations;
+    }
+
+    /**
+     * @param excludedDestinations
+     *            The excludedDestinations to set.
+     */
+    public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
+        this.excludedDestinations = excludedDestinations;
+    }
+
+    /**
+     * @return Returns the staticallyIncludedDestinations.
+     */
+    public ActiveMQDestination[] getStaticallyIncludedDestinations() {
+        return staticallyIncludedDestinations;
+    }
+
+    /**
+     * @param staticallyIncludedDestinations
+     *            The staticallyIncludedDestinations to set.
+     */
+    public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations)
{
+        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
+    }
+
+    /**
+     * @return Returns the durableDestinations.
+     */
+    public ActiveMQDestination[] getDurableDestinations() {
+        return durableDestinations;
+    }
+
+    /**
+     * @param durableDestinations
+     *            The durableDestinations to set.
+     */
+    public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
+        this.durableDestinations = durableDestinations;
+    }
+
+    /**
+     * @return Returns the localBroker.
+     */
+    public Transport getLocalBroker() {
+        return localBroker;
+    }
+
+    /**
+     * @return Returns the remoteBroker.
+     */
+    public Transport getRemoteBroker() {
+        return remoteBroker;
+    }
+
+    /**
+     * @return the createdByDuplex
+     */
+    public boolean isCreatedByDuplex() {
+        return this.createdByDuplex;
+    }
+
+    /**
+     * @param createdByDuplex
+     *            the createdByDuplex to set
+     */
+    public void setCreatedByDuplex(boolean createdByDuplex) {
+        this.createdByDuplex = createdByDuplex;
+    }
+
     @Override
     public String getRemoteAddress() {
         return remoteBroker.getRemoteAddress();



Mime
View raw message