activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r763565 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network: ConduitBridge.java DemandForwardingBridge.java DemandForwardingBridgeSupport.java
Date Thu, 09 Apr 2009 09:05:17 GMT
Author: gtully
Date: Thu Apr  9 09:05:17 2009
New Revision: 763565

URL: http://svn.apache.org/viewvc?rev=763565&view=rev
Log:
resolve AMQ-2199|https://issues.apache.org/activemq/browse/AMQ-2199 add a latch for the event
that initialises the localbrokerid and path

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=763565&r1=763564&r2=763565&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
Thu Apr  9 09:05:17 2009
@@ -63,10 +63,15 @@
         for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
             DemandSubscription ds = (DemandSubscription)i.next();
             if (filter.matches(ds.getLocalInfo().getDestination())) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(configuration.getBrokerName() + " matched exsting sub (add
interest) for : " + ds.getRemoteInfo()
+                            + " with sub: " + info);
+                }
                 // add the interest in the subscription
                 // ds.add(ds.getRemoteInfo().getConsumerId());
                 ds.add(info.getConsumerId());
                 matched = true;
+                
                 // continue - we want interest to any existing
                 // DemandSubscriptions
             }
@@ -82,6 +87,10 @@
             ds.remove(id);
             if (ds.isEmpty()) {
                 tmpList.add(ds);
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(configuration.getBrokerName() + " removing interest in sub
on " + localBroker + " from " + remoteBrokerName + " :  " + ds.getRemoteInfo());
+                }
             }
         }
         for (Iterator<DemandSubscription> i = tmpList.iterator(); i.hasNext();) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=763565&r1=763564&r2=763565&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Thu Apr  9 09:05:17 2009
@@ -76,6 +76,7 @@
         synchronized (brokerInfoMutex) {
             localBrokerId = ((BrokerInfo)command).getBrokerId();
             localBrokerPath[0] = localBrokerId;
+            localBrokerIdKnownLatch.countDown();
             if (remoteBrokerId != null) {
                 if (remoteBrokerId.equals(localBrokerId)) {
                     if (LOG.isTraceEnabled()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=763565&r1=763564&r2=763565&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Thu Apr  9 09:05:17 2009
@@ -115,6 +115,7 @@
     protected CountDownLatch startedLatch = new CountDownLatch(2);
     protected CountDownLatch localStartedLatch = new CountDownLatch(1);
     protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1);
+    protected CountDownLatch localBrokerIdKnownLatch = new CountDownLatch(1);
     protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
     protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
     protected NetworkBridgeConfiguration configuration;
@@ -621,10 +622,6 @@
         }
     }
 
-    protected DemandSubscription getDemandSubscription(MessageDispatch md) {
-        return subscriptionMapByLocalId.get(md.getConsumerId());
-    }
-
     protected Message configureMessage(MessageDispatch md) {
         Message message = md.getMessage().copy();
         // Update the packet to show where it came from.
@@ -643,16 +640,15 @@
             try {
                 if (command.isMessageDispatch()) {
                     enqueueCounter.incrementAndGet();
-                    //localStartedLatch.await();
-                    final MessageDispatch md = (MessageDispatch)command;
+                    final MessageDispatch md = (MessageDispatch)command;   
                     DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
                     if (sub != null && md.getMessage()!=null) {
                     	
-                    	  // See if this consumer's brokerPath tells us it came from the broker
at the other end
-                    	  // of the bridge. I think we should be making this decision based
on the message's
-                    	  // broker bread crumbs and not the consumer's? However, the message's
broker bread
-                    	  // crumbs are null, which is another matter.   
-                    	  boolean cameFromRemote = false;
+                        // See if this consumer's brokerPath tells us it came from the broker
at the other end
+                        // of the bridge. I think we should be making this decision based
on the message's
+                        // broker bread crumbs and not the consumer's? However, the message's
broker bread
+                        // crumbs are null, which is another matter.   
+                        boolean cameFromRemote = false;
                         Object consumerInfo = md.getMessage().getDataStructure(); 
                         if( consumerInfo != null && (consumerInfo instanceof ConsumerInfo)
)                  	                   	  
                     	    cameFromRemote = contains( ((ConsumerInfo)consumerInfo).getBrokerPath(),remoteBrokerInfo.getBrokerId());
                   	                           
@@ -661,7 +657,7 @@
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("bridging " + configuration.getBrokerName() + " ->
" + remoteBrokerName + ": " + message);
                         }
-
+                        
                         if (!message.isResponseRequired()) {
 
                             // If the message was originally sent using async
@@ -672,9 +668,10 @@
                             // Don't send it off to the remote if it originally came from
the remote. 
                             if( !cameFromRemote ) {
                                remoteBroker.oneway(message);
-                              }
-                            else{
-                              LOG.info("Message not forwarded on to remote, because message
came from remote");                               
+                            } else {
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("Message not forwarded on to remote, because
message came from remote");                               
+                                }
                             }
                             localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE,
1));
                             dequeueCounter.incrementAndGet();                          
@@ -1060,16 +1057,20 @@
 
     protected void removeDemandSubscription(ConsumerId id) throws IOException {
         DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker
+ " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub);
+        }
         if (sub != null) {
             removeSubscription(sub);
             if (LOG.isDebugEnabled()) {
-                LOG.debug(configuration.getBrokerName() + " removing sub on " + localBroker
+ " from " + remoteBrokerName + " :  " + sub.getRemoteInfo());
+                LOG.debug(configuration.getBrokerName() + " removed sub on " + localBroker
+ " from " + remoteBrokerName + " :  " + sub.getRemoteInfo());
             }
         }
     }
 
     protected void waitStarted() throws InterruptedException {
         startedLatch.await();
+        localBrokerIdKnownLatch.await();
     }
 
     protected void clearDownSubscriptions() {



Mime
View raw message