activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r388714 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/command/ main/java/org/apache...
Date Sat, 25 Mar 2006 06:55:35 GMT
Author: rajdavies
Date: Fri Mar 24 22:55:33 2006
New Revision: 388714

URL: http://svn.apache.org/viewcvs?rev=388714&view=rev
Log:
Fix for http://jira.activemq.org/jira/browse/AMQ-487

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
    incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml
    incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Fri Mar 24 22:55:33 2006
@@ -136,7 +136,8 @@
     }
     
     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination)
throws Exception {
-        Destination answer = next.addDestination(context, destination);        
+        Destination answer = next.addDestination(context, destination);  
+      
         ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
         DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE,
destination);
         fireAdvisory(context, topic, info);        
@@ -152,6 +153,21 @@
             info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
             fireAdvisory(context, topic, info);
         }
+    }
+    
+    public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws
Exception{
+        ActiveMQDestination destination =  info.getDestination();
+        next.addDestinationInfo(context, info);  
+        
+        ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
+        fireAdvisory(context, topic, info);        
+        destinations.put(destination, info);    
+    }
+
+    public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws
Exception{
+        next.removeDestinationInfo(context, info);
+        ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(info.getDestination());
+        fireAdvisory(context, topic, info);        
     }
 
     public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable
error) throws Exception {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
Fri Mar 24 22:55:33 2006
@@ -139,24 +139,24 @@
         this.processDispatch(connector.getBrokerInfo());
     }
 
-    public void stop() throws Exception {
-        if( disposed) 
+    public void stop() throws Exception{
+        if(disposed)
             return;
-        
         disposed=true;
         //
         // Remove all logical connection associated with this connection
         // from the broker.
-        ArrayList l = new ArrayList(connectionStates.keySet());
-        for (Iterator iter = l.iterator(); iter.hasNext();) {
-            ConnectionId connectionId = (ConnectionId) iter.next();
-            try {
-                processRemoveConnection(connectionId);
-            } catch (Throwable ignore) {
+        if(!broker.isStopped()){
+            ArrayList l=new ArrayList(connectionStates.keySet());
+            for(Iterator iter=l.iterator();iter.hasNext();){
+                ConnectionId connectionId=(ConnectionId) iter.next();
+                try{
+                    processRemoveConnection(connectionId);
+                }catch(Throwable ignore){}
+            }
+            if(brokerInfo!=null){
+                broker.removeBroker(this,brokerInfo);
             }
-        }
-        if (brokerInfo != null){
-            broker.removeBroker(this, brokerInfo);
         }
     }
     
@@ -364,7 +364,7 @@
 
     public Response processAddDestination(DestinationInfo info) throws Exception {
         ConnectionState cs = lookupConnectionState(info.getConnectionId());
-        broker.addDestination(cs.getContext(), info.getDestination());
+        broker.addDestinationInfo(cs.getContext(), info);
         if( info.getDestination().isTemporary() ) {
             cs.addTempDestination(info.getDestination());
         }
@@ -373,7 +373,7 @@
 
     public Response processRemoveDestination(DestinationInfo info) throws Exception {
         ConnectionState cs = lookupConnectionState(info.getConnectionId());
-        broker.removeDestination(cs.getContext(), info.getDestination(), info.getTimeout());
+        broker.removeDestinationInfo(cs.getContext(), info);
         if( info.getDestination().isTemporary() ) {
             cs.removeTempDestination(info.getDestination());
         }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
Fri Mar 24 22:55:33 2006
@@ -18,11 +18,13 @@
 
 import java.util.Set;
 import org.apache.activemq.Service;
+import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Region;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerInfo;
@@ -213,5 +215,23 @@
      * @return a Set of all durable destinations
      */
     public Set getDurableDestinations();
+    
+    /**
+     * Add and process a DestinationInfo object
+     * @param context
+     * @param info
+     * @throws Exception
+     */
+    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws
Exception;
+    
+    
+    /**
+     * Remove and process a DestinationInfo object
+     * @param context
+     * @param info
+     * @throws Exception
+     */
+    public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws
Exception;
+
     
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
Fri Mar 24 22:55:33 2006
@@ -24,6 +24,7 @@
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
@@ -192,6 +193,16 @@
     
     public Set getDurableDestinations(){
         return next.getDurableDestinations();
+    }
+    
+    public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws
Exception{
+        next.addDestinationInfo(context, info);
+        
+    }
+
+    public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws
Exception{
+        next.removeDestinationInfo(context, info);
+        
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
Fri Mar 24 22:55:33 2006
@@ -24,6 +24,7 @@
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
@@ -192,4 +193,11 @@
         return null;
     }
 
+    public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws
Exception{
+    }
+
+    public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws
Exception{        
+    }
+    
+   
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
Fri Mar 24 22:55:33 2006
@@ -24,6 +24,7 @@
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
@@ -188,6 +189,16 @@
     
     public Set getDurableDestinations(){
         throw new IllegalStateException(this.message);
+    }
+
+    public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws
Exception{
+        throw new IllegalStateException(this.message);
+        
+    }
+
+    public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws
Exception{
+        throw new IllegalStateException(this.message);
+        
     }
     
    

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
Fri Mar 24 22:55:33 2006
@@ -24,6 +24,7 @@
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
@@ -202,6 +203,16 @@
     
     public Set getDurableDestinations(){
         return getNext().getDurableDestinations();
+    }
+
+    public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws
Exception{
+        getNext().addDestinationInfo(context, info);
+        
+    }
+
+    public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws
Exception{
+        getNext().removeDestinationInfo(context, info);
+        
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Fri Mar 24 22:55:33 2006
@@ -29,6 +29,7 @@
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
@@ -182,8 +183,10 @@
     }
 
     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination)
throws Exception {
-        if( destinations.contains(destination) )
+        if( destinations.contains(destination) ){
+            System.err.println(brokerService.getBrokerName() + " SPLATYTTTT!!!!");
             throw new JMSException("Destination already exists: "+destination);
+        }
         
         Destination answer = null;
         switch(destination.getDestinationType()) {
@@ -229,6 +232,16 @@
         }
         
         destinations.remove(destination);
+    }
+    
+    public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws
Exception{
+        addDestination(context,info.getDestination());
+        
+    }
+
+    public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws
Exception{
+        removeDestination(context,info.getDestination(), info.getTimeout());
+        
     }
 
     public ActiveMQDestination[] getDestinations() throws Exception {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
Fri Mar 24 22:55:33 2006
@@ -71,6 +71,10 @@
     public String getConnectionId() {
         return connectionId;
     }
+    
+    public void setConnectionId(String connectionId) {
+        this.connectionId = connectionId;
+    }
 
     public int getSequenceId() {
         return sequenceId;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
Fri Mar 24 22:55:33 2006
@@ -99,5 +99,9 @@
     protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException
{
         return new NetworkBridgeFilter(getFromBrokerId(info), networkTTL);
     }
+    
+    protected BrokerId[] getRemoteBrokerPath(){
+        return remoteBrokerPath;
+    }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Fri Mar 24 22:55:33 2006
@@ -60,7 +60,7 @@
     }
 
     protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) {
-        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),remoteBrokerPath));
+        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),getRemoteBrokerPath()));
     }
 
     protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
@@ -79,5 +79,9 @@
     
     protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException
{
         return new NetworkBridgeFilter(remoteBrokerPath[0], networkTTL);
+    }
+    
+    protected BrokerId[] getRemoteBrokerPath(){
+        return remoteBrokerPath;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Fri Mar 24 22:55:33 2006
@@ -22,6 +22,7 @@
 
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTempDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -31,6 +32,7 @@
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.KeepAliveInfo;
 import org.apache.activemq.command.Message;
@@ -55,6 +57,7 @@
 import org.apache.commons.logging.LogFactory;
 
 import java.io.IOException;
+import javax.jms.TemporaryTopic;
 
 /**
  * A useful base class for implementing demand forwarding bridges.
@@ -211,7 +214,13 @@
                             +destinationFilter));
             demandConsumerInfo.setPrefetchSize(prefetchSize);
             remoteBroker.oneway(demandConsumerInfo);
-
+            
+            //we want infomation about Destinations as well
+            ConsumerInfo destinationInfo  = new ConsumerInfo(remoteSessionInfo,2);
+            destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
+            destinationInfo.setPrefetchSize(prefetchSize);
+            remoteBroker.oneway(destinationInfo);
+            
             startedLatch.countDown();
         }
     }
@@ -322,6 +331,32 @@
                 if(log.isTraceEnabled())
                     log.trace("Ignoring sub " + info + " already subscribed to matching destination");
             }
+        }else if (data.getClass()==DestinationInfo.class){
+//          It's a destination info - we want to pass up
+            //infomation about temporary destinations 
+            DestinationInfo destInfo = (DestinationInfo) data;
+            BrokerId[] path=destInfo.getBrokerPath();
+            if((path!=null&&path.length>= networkTTL)){
+                if(log.isTraceEnabled())
+                    log.trace("Ignoring Subscription " + destInfo + " restricted to " + networkTTL
+ " network hops only");
+                return;
+            }
+            if(contains(destInfo.getBrokerPath(),localBrokerPath[0])){
+                // Ignore this consumer as it's a consumer we locally sent to the broker.
+                if(log.isTraceEnabled())
+                    log.trace("Ignoring sub " + destInfo + " already routed through this
broker once");
+                return;
+            }
+            
+            destInfo.setConnectionId(localConnectionInfo.getConnectionId());
+            if (destInfo.getDestination() instanceof ActiveMQTempDestination){
+                //re-set connection id so comes from here
+                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
+                tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
+            }
+            destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(),getRemoteBrokerPath()));
+            localBroker.oneway(destInfo);
+            
         }
         if(data.getClass()==RemoveInfo.class){
             ConsumerId id=(ConsumerId) ((RemoveInfo) data).getObjectId();
@@ -339,7 +374,8 @@
             localBroker.oneway(sub.getLocalInfo());
         }
     }
-
+    
+    
     protected void removeSubscription(DemandSubscription sub) throws IOException {
         if(sub!=null){
             subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
@@ -732,5 +768,7 @@
     protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException;
 
     protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException;
+    
+    protected abstract BrokerId[] getRemoteBrokerPath();
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
Fri Mar 24 22:55:33 2006
@@ -16,10 +16,16 @@
 import java.net.URI;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TopicRequestor;
+import javax.jms.TopicSession;
 import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
@@ -40,6 +46,37 @@
     protected ActiveMQTopic included;
     protected ActiveMQTopic excluded;
     protected String consumerName="durableSubs";
+    
+    
+    public void testRequestReply() throws Exception{
+        final MessageProducer remoteProducer=remoteSession.createProducer(null);
+        MessageConsumer remoteConsumer=remoteSession.createConsumer(included);
+        remoteConsumer.setMessageListener(new MessageListener(){
+            public void onMessage(Message msg){
+                try{
+                    TextMessage textMsg=(TextMessage) msg;
+                    String payload="REPLY: "+textMsg.getText();
+                    Destination replyTo;
+                    replyTo=msg.getJMSReplyTo();
+                    textMsg.clearBody();
+                    textMsg.setText(payload);
+                    remoteProducer.send(replyTo,textMsg);
+                }catch(JMSException e){
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+            }
+        });
+        
+        TopicRequestor requestor=new TopicRequestor((TopicSession) localSession,included);
+        Thread.sleep(2000);//alow for consumer infos to perculate arround
+        for (int i =0;i < MESSAGE_COUNT; i++){
+            TextMessage msg = localSession.createTextMessage("test msg: " +i);
+            TextMessage result = (TextMessage) requestor.request(msg);
+            assertNotNull(result);
+            System.out.println(result.getText());
+        }
+    }
 
     public void testFiltering() throws Exception{
         MessageConsumer includedConsumer=remoteSession.createConsumer(included);
@@ -93,6 +130,8 @@
             assertNotNull(remoteConsumer.receive(500));
         }
     }
+    
+    
 
     protected void setUp() throws Exception{
         super.setUp();
@@ -114,16 +153,19 @@
     }
 
     protected void doSetUp() throws Exception{
-        Resource resource=new ClassPathResource(getLocalBrokerURI());
+        Resource resource=new ClassPathResource(getRemoteBrokerURI());
         BrokerFactoryBean factory=new BrokerFactoryBean(resource);
         factory.afterPropertiesSet();
-        localBroker=factory.getBroker();
-        resource=new ClassPathResource(getRemoteBrokerURI());
+        remoteBroker=factory.getBroker();
+        remoteBroker.start();
+        
+        resource=new ClassPathResource(getLocalBrokerURI());
         factory=new BrokerFactoryBean(resource);
         factory.afterPropertiesSet();
-        remoteBroker=factory.getBroker();
+        localBroker=factory.getBroker();
+        
         localBroker.start();
-        remoteBroker.start();
+        
         URI localURI=localBroker.getVmConnectorURI();
         ActiveMQConnectionFactory fac=new ActiveMQConnectionFactory(localURI);
         localConnection=fac.createConnection();

Modified: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml
Fri Mar 24 22:55:33 2006
@@ -23,7 +23,7 @@
     </transportConnectors>
 
     <networkConnectors>
-      <networkConnector uri="static://(tcp://localhost:61617)">
+      <networkConnector uri="static:failover:(tcp://localhost:61617)">
          dynamicOnly = false
          conduitSubscriptions = true
          decreaseNetworkConsumerPriority = false

Modified: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml
Fri Mar 24 22:55:33 2006
@@ -21,6 +21,9 @@
     <transportConnectors>
       <transportConnector uri="tcp://localhost:61617"/>
     </transportConnectors>
+    <networkConnectors>
+      <networkConnector uri="static:failover:(tcp://localhost:61616)"/>
+      </networkConnectors>
   </broker>
 
 </beans>



Mime
View raw message