activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r395810 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Date Fri, 21 Apr 2006 08:02:21 GMT
Author: rajdavies
Date: Fri Apr 21 01:02:19 2006
New Revision: 395810

URL: http://svn.apache.org/viewcvs?rev=395810&view=rev
Log:
put back support for request/reply across networks

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=395810&r1=395809&r2=395810&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Fri Apr 21 01:02:19 2006
@@ -22,6 +22,7 @@
 
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTempDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -31,6 +32,7 @@
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.KeepAliveInfo;
 import org.apache.activemq.command.Message;
@@ -219,7 +221,11 @@
                             +destinationFilter));
             demandConsumerInfo.setPrefetchSize(prefetchSize);
             remoteBroker.oneway(demandConsumerInfo);
-
+            //we want infomation about Destinations as well
+            ConsumerInfo destinationInfo  = new ConsumerInfo(remoteSessionInfo,2);
+            destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
+            destinationInfo.setPrefetchSize(prefetchSize);
+            remoteBroker.oneway(destinationInfo);
             startedLatch.countDown();
         }
     }
@@ -331,7 +337,34 @@
                     log.trace("Ignoring sub " + info + " already subscribed to matching destination");
             }
         }
-        if(data.getClass()==RemoveInfo.class){
+        else if (data.getClass()==DestinationInfo.class){
+//          It's a destination info - we want to pass up
+            //infomation about temporary destinations 
+            DestinationInfo destInfo = (DestinationInfo) data;
+            BrokerId[] path=destInfo.getBrokerPath();
+            if((path!=null&&path.length>= networkTTL)){
+                if(log.isTraceEnabled())
+                    log.trace("Ignoring Subscription " + destInfo + " restricted to " + networkTTL
+ " network hops only");
+                return;
+            }
+            if(contains(destInfo.getBrokerPath(),localBrokerPath[0])){
+                // Ignore this consumer as it's a consumer we locally sent to the broker.
+                if(log.isTraceEnabled())
+                    log.trace("Ignoring sub " + destInfo + " already routed through this
broker once");
+                return;
+            }
+            
+            destInfo.setConnectionId(localConnectionInfo.getConnectionId());
+            if (destInfo.getDestination() instanceof ActiveMQTempDestination){
+                //re-set connection id so comes from here
+                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
+                tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
+            }
+            destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(),getRemoteBrokerPath()));
+            localBroker.oneway(destInfo);
+            
+        }
+        else if(data.getClass()==RemoveInfo.class){
             ConsumerId id=(ConsumerId) ((RemoveInfo) data).getObjectId();
             removeDemandSubscription(id);
         }
@@ -761,6 +794,8 @@
     protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException;
 
     protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException;
+    
+    protected abstract BrokerId[] getRemoteBrokerPath();
 
 	public String getPassword() {
 		return password;



Mime
View raw message