activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r370223 [2/2] - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/ft/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/...
Date Wed, 18 Jan 2006 19:17:41 GMT
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java Wed Jan 18 11:16:58 2006
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -40,9 +41,9 @@
 
     private final PolicyMap policyMap;
 
-    public QueueRegion(DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+    public QueueRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
             PersistenceAdapter persistenceAdapter, PolicyMap policyMap) {
-        super(destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
+        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
         this.policyMap = policyMap;
     }
 
@@ -71,10 +72,10 @@
 
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
         if (info.isBrowser()) {
-            return new QueueBrowserSubscription(context, info);
+            return new QueueBrowserSubscription(broker,context, info);
         }
         else {
-            return new QueueSubscription(context, info);
+            return new QueueSubscription(broker,context, info);
         }
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Wed Jan 18 11:16:58 2006
@@ -16,10 +16,7 @@
  */
 package org.apache.activemq.broker.region;
 
-import java.io.IOException;
-
-import javax.jms.InvalidSelectorException;
-
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.group.MessageGroupMap;
 import org.apache.activemq.command.ConsumerId;
@@ -27,12 +24,14 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.transaction.Synchronization;
 
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import javax.jms.InvalidSelectorException;
+
+import java.io.IOException;
 
 public class QueueSubscription extends PrefetchSubscription {
     
-    public QueueSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
-        super(context, info);
+    public QueueSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
+        super(broker,context, info);
     }
     
     public void add(MessageReference node) throws Throwable {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java Wed Jan 18 11:16:58 2006
@@ -22,6 +22,7 @@
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 
 /**
@@ -86,6 +87,13 @@
      * @param context the environment the operation is being executed under.
      */
     public void acknowledge(ConnectionContext context, MessageAck ack) throws Throwable;
+    
+    /**
+     * Process a notification of a dispatch - used by a Slave Broker
+     * @param messageDispatchNotification
+     * @throws Throwable
+     */
+    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Throwable;
 
     public void gc();
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Wed Jan 18 11:16:58 2006
@@ -20,15 +20,19 @@
 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
 
 import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.Connection;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.SessionInfo;
@@ -46,8 +50,8 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.*;
-import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Routes Broker operations to the correct messaging regions for processing.
@@ -62,22 +66,25 @@
     private final Region topicRegion;
     private final Region tempQueueRegion;
     private final Region tempTopicRegion;
+    private BrokerService brokerService;
     
     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
     
     private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList();
     private final CopyOnWriteArraySet destinations = new CopyOnWriteArraySet();
+    private final CopyOnWriteArrayList brokerInfos = new CopyOnWriteArrayList();
 
     private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();    
     private BrokerId brokerId;
     private String brokerName;
     private Map clientIdSet = new HashMap(); // we will synchronize access
 
-    public RegionBroker(TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter) throws IOException {
-        this(taskRunnerFactory, memoryManager, createDefaultPersistenceAdapter(memoryManager), null);
+    public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter) throws IOException {
+        this(brokerService,taskRunnerFactory, memoryManager, createDefaultPersistenceAdapter(memoryManager), null);
     }
     
-    public RegionBroker(TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter, PolicyMap policyMap) throws IOException {
+    public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter, PolicyMap policyMap) throws IOException {
+        this.brokerService = brokerService;
         this.sequenceGenerator.setLastSequenceId( adapter.getLastMessageBrokerSequenceId() );
         
         queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, adapter, policyMap);
@@ -86,21 +93,28 @@
         tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory);
         tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory);        
     }
+    
+    public Broker getAdaptor(Class type){
+        if (type.isInstance(this)){
+            return this;
+        }
+        return null;
+    }
 
     protected Region createTempTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
-        return new TempTopicRegion(destinationStatistics, memoryManager, taskRunnerFactory);
+        return new TempTopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory);
     }
 
     protected Region createTempQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
-        return new TempQueueRegion(destinationStatistics, memoryManager, taskRunnerFactory);
+        return new TempQueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory);
     }
 
     protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter, PolicyMap policyMap) {
-        return new TopicRegion(destinationStatistics, memoryManager, taskRunnerFactory, adapter, policyMap);
+        return new TopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, adapter, policyMap);
     }
 
     protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter, PolicyMap policyMap) {
-        return new QueueRegion(destinationStatistics, memoryManager, taskRunnerFactory, adapter, policyMap);
+        return new QueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, adapter, policyMap);
     }
     
     private static PersistenceAdapter createDefaultPersistenceAdapter(UsageManager memoryManager) throws IOException {
@@ -279,7 +293,6 @@
     }
 
     public void send(ConnectionContext context,  Message message) throws Throwable {
-        
         message.getMessageId().setBrokerSequenceId(sequenceGenerator.getNextSequenceId());
         ActiveMQDestination destination = message.getDestination();
         switch(destination.getDestinationType()) {
@@ -385,5 +398,51 @@
     protected void throwUnknownDestinationType(ActiveMQDestination destination) throws JMSException {
         throw new JMSException("Unknown destination type: " + destination.getDestinationType());
     }
+
+    public synchronized void addBroker(Connection connection,BrokerInfo info){
+            brokerInfos.add(info);
+    }
+    
+    public synchronized void removeBroker(Connection connection,BrokerInfo info){
+        if (info != null){
+            brokerInfos.remove(info);
+        }   
+    }
+
+    public synchronized BrokerInfo[] getPeerBrokerInfos(){
+        BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
+        result = (BrokerInfo[])brokerInfos.toArray(result);
+        return result;
+    }
+    
+    public void processDispatch(MessageDispatch messageDispatch){
+        
+    }
+    
+    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Throwable {
+        ActiveMQDestination destination = messageDispatchNotification.getDestination();
+        switch(destination.getDestinationType()) {
+        case ActiveMQDestination.QUEUE_TYPE:
+            queueRegion.processDispatchNotification(messageDispatchNotification);
+            break;
+        case ActiveMQDestination.TOPIC_TYPE:
+            topicRegion.processDispatchNotification(messageDispatchNotification);
+            break;
+        case ActiveMQDestination.TEMP_QUEUE_TYPE:
+            tempQueueRegion.processDispatchNotification(messageDispatchNotification);
+            break;
+        case ActiveMQDestination.TEMP_TOPIC_TYPE:
+            tempTopicRegion.processDispatchNotification(messageDispatchNotification);
+            break;
+        default:
+            throwUnknownDestinationType(destination);
+        }
+    }
+    
+    public boolean isSlaveBroker(){
+        return brokerService.isSlave();
+    }
+
+    
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Wed Jan 18 11:16:58 2006
@@ -22,6 +22,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.filter.MessageEvaluationContext;
 
 /**
@@ -86,5 +87,16 @@
      * reclaim memory.
      */
     void gc();
+    
+    /**
+     * Used by a Slave Broker to update dispatch infomation
+     * @param mdn
+     */
+    void processMessageDispatchNotification(MessageDispatchNotification  mdn);
+    
+    /**
+     * @return true if the broker is currently in slave mode
+     */
+    boolean isSlaveBroker();
     
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Wed Jan 18 11:16:58 2006
@@ -19,6 +19,7 @@
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTempDestination;
@@ -32,8 +33,8 @@
  */
 public class TempQueueRegion extends AbstractRegion {
 
-    public TempQueueRegion(DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
-        super(destinationStatistics, memoryManager, taskRunnerFactory, null);
+    public TempQueueRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
+        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, null);
         setAutoCreateDestinations(false);
     }
 
@@ -55,9 +56,9 @@
 
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
         if( info.isBrowser() ) {
-            return new QueueBrowserSubscription(context, info);
+            return new QueueBrowserSubscription(broker,context, info);
         } else {
-            return new QueueSubscription(context, info);
+            return new QueueSubscription(broker,context, info);
         }
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java Wed Jan 18 11:16:58 2006
@@ -18,6 +18,7 @@
 
 import javax.jms.JMSException;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTempDestination;
@@ -31,8 +32,8 @@
  */
 public class TempTopicRegion extends AbstractRegion {
 
-    public TempTopicRegion(DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
-        super(destinationStatistics, memoryManager, taskRunnerFactory, null);
+    public TempTopicRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
+        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, null);
         setAutoCreateDestinations(false);
     }
 
@@ -56,7 +57,7 @@
         if( info.isDurable() ) {
             throw new JMSException("A durable subscription cannot be created for a temporary topic.");
         } else {
-            return new TopicSubscription(context, info, this.memoryManager);
+            return new TopicSubscription(broker,context, info, this.memoryManager);
         }
     }
         

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Wed Jan 18 11:16:58 2006
@@ -18,6 +18,7 @@
 
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -50,9 +51,9 @@
     protected final ConcurrentHashMap durableSubscriptions = new ConcurrentHashMap();
     private final PolicyMap policyMap;
 
-    public TopicRegion(DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+    public TopicRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
             PersistenceAdapter persistenceAdapter, PolicyMap policyMap) {
-        super(destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
+        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
         this.policyMap = policyMap;
     }
 
@@ -168,7 +169,7 @@
             SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
             DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
             if (sub == null) {
-                sub = new DurableTopicSubscription(context, info);
+                sub = new DurableTopicSubscription(broker,context, info);
                 durableSubscriptions.put(key, sub);
             }
             else {
@@ -177,14 +178,14 @@
             return sub;
         }
         else {
-            return new TopicSubscription(context, info, memoryManager);
+            return new TopicSubscription(broker,context, info, memoryManager);
         }
     }
     
     public Subscription createDurableSubscription(SubscriptionInfo info) throws JMSException {
         SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubcriptionName());
         DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
-        sub = new DurableTopicSubscription(info);
+        sub = new DurableTopicSubscription(broker,info);
         durableSubscriptions.put(key, sub);
         return sub;
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Wed Jan 18 11:16:58 2006
@@ -23,6 +23,7 @@
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -30,6 +31,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.transaction.Synchronization;
 
@@ -41,18 +43,34 @@
     protected int dispatched=0;
     protected int delivered=0;
     
-    public TopicSubscription(ConnectionContext context, ConsumerInfo info, UsageManager usageManager) throws InvalidSelectorException {
-        super(context, info);
+    public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, UsageManager usageManager) throws InvalidSelectorException {
+        super(broker,context, info);
         this.usageManager=usageManager;
     }
 
     public void add(MessageReference node) throws InterruptedException, IOException {
         node.incrementReferenceCount();
-        if( !isFull() ) {
+        if( !isFull() && !isSlaveBroker() ) {
             dispatch(node);
         } else {
+            synchronized(matched){
             matched.addLast(node);
+            }
         }        
+    }
+    
+    public void processMessageDispatchNotification(MessageDispatchNotification  mdn){
+        synchronized(matched){
+            for (Iterator i = matched.iterator(); i.hasNext();){
+                MessageReference node = (MessageReference)i.next();
+                if (node.getMessageId().equals(mdn.getMessageId())){
+                    i.remove();
+                    dispatched++;
+                    node.decrementReferenceCount();
+                    break;
+                }
+            }
+        }
     }
     
     public void acknowledge(final ConnectionContext context, final MessageAck ack) throws Throwable {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java Wed Jan 18 11:16:58 2006
@@ -60,6 +60,10 @@
     public boolean isMessageAck() {
         return false;
     }
+    
+    public boolean isMessageDispatchNotification(){
+        return false;
+    }
 
     /**
      * @openwire:property version=1

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java Wed Jan 18 11:16:58 2006
@@ -42,6 +42,7 @@
     boolean isWireFormatInfo();
     boolean isMessage();
     boolean isMessageAck();
+    boolean isMessageDispatchNotification();
     
     Response visit( CommandVisitor visitor) throws Throwable;
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java Wed Jan 18 11:16:58 2006
@@ -112,6 +112,14 @@
     
     ///////////////////////////////////////////////////
     //
+    // Broker to Broker command objects
+    //
+    /////////////////////////////////////////////////// 
+    
+    byte  MESSAGE_DISPATCH_NOTIFICATION     = 90;
+    
+    ///////////////////////////////////////////////////
+    //
     // Data structures contained in the command objects.
     //
     ///////////////////////////////////////////////////    
@@ -129,6 +137,8 @@
     byte  CONSUMER_ID                       = 122;
     byte  PRODUCER_ID                       = 123;
     byte  BROKER_ID                         = 124;
+    
+   
 
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java Wed Jan 18 11:16:58 2006
@@ -76,4 +76,8 @@
         return false;
     }
 
+    public boolean isMessageDispatchNotification(){
+        return false;
+    }
+
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java?rev=370223&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java Wed Jan 18 11:16:58 2006
@@ -0,0 +1,90 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * 
+ * @openwire:marshaller code="90"
+ * @version $Revision$
+ */
+public class MessageDispatchNotification extends BaseCommand{
+
+    public static final byte DATA_STRUCTURE_TYPE=CommandTypes.MESSAGE_DISPATCH_NOTIFICATION;
+
+    protected ConsumerId consumerId;
+    protected ActiveMQDestination destination;
+    protected MessageId messageId;
+    protected long deliverySequenceId;
+
+    public byte getDataStructureType(){
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isMessageDispatchNotification(){
+        return true;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConsumerId getConsumerId(){
+        return consumerId;
+    }
+    public void setConsumerId(ConsumerId consumerId){
+        this.consumerId=consumerId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ActiveMQDestination getDestination(){
+        return destination;
+    }
+    public void setDestination(ActiveMQDestination destination){
+        this.destination=destination;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+
+    public long getDeliverySequenceId(){
+        return deliverySequenceId;
+    }
+    public void setDeliverySequenceId(long deliverySequenceId){
+        this.deliverySequenceId=deliverySequenceId;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Throwable {
+        return visitor.processMessageDispatchNotification( this );
+    }
+    
+
+    /**
+     * @openwire:property version=1
+     */
+    public MessageId getMessageId(){
+        return messageId;
+    }
+
+    public void setMessageId(MessageId messageId){
+        this.messageId=messageId;
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java Wed Jan 18 11:16:58 2006
@@ -169,4 +169,8 @@
         return false;
     }
 
+    public boolean isMessageDispatchNotification(){
+        return false;
+    }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java Wed Jan 18 11:16:58 2006
@@ -58,8 +58,8 @@
         add(new ConnectionErrorMarshaller());
         add(new ActiveMQObjectMessageMarshaller());
         add(new ConsumerInfoMarshaller());
-        add(new ConnectionIdMarshaller());
         add(new ActiveMQTempTopicMarshaller());
+        add(new ConnectionIdMarshaller());
         add(new DiscoveryEventMarshaller());
         add(new ConnectionInfoMarshaller());
         add(new KeepAliveInfoMarshaller());
@@ -74,6 +74,7 @@
         add(new ProducerInfoMarshaller());
         add(new SubscriptionInfoMarshaller());
         add(new ActiveMQMapMessageMarshaller());
+        add(new MessageDispatchNotificationMarshaller());
         add(new SessionInfoMarshaller());
         add(new ActiveMQMessageMarshaller());
         add(new TransactionInfoMarshaller());

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MessageDispatchNotificationMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MessageDispatchNotificationMarshaller.java?rev=370223&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MessageDispatchNotificationMarshaller.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MessageDispatchNotificationMarshaller.java Wed Jan 18 11:16:58 2006
@@ -0,0 +1,108 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.openwire.v1;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.activemq.openwire.*;
+import org.apache.activemq.command.*;
+
+
+/**
+ * Marshalling code for Open Wire Format for MessageDispatchNotification
+ *
+ *
+ * NOTE!: This file is auto generated - do not modify!
+ *        if you need to make a change, please see the modify the groovy scripts in the
+ *        under src/gram/script and then use maven openwire:generate to regenerate 
+ *        this file.
+ *
+ * @version $Revision$
+ */
+public class MessageDispatchNotificationMarshaller extends BaseCommandMarshaller {
+
+    /**
+     * Return the type of Data Structure we marshal
+     * @return short representation of the type data structure
+     */
+    public byte getDataStructureType() {
+        return MessageDispatchNotification.DATA_STRUCTURE_TYPE;
+    }
+    
+    /**
+     * @return a new object instance
+     */
+    public DataStructure createObject() {
+        return new MessageDispatchNotification();
+    }
+
+    /**
+     * Un-marshal an object instance from the data input stream
+     *
+     * @param o the object to un-marshal
+     * @param dataIn the data input stream to build the object from
+     * @throws IOException
+     */
+    public void unmarshal(OpenWireFormat wireFormat, Object o, DataInputStream dataIn, BooleanStream bs) throws IOException {
+        super.unmarshal(wireFormat, o, dataIn, bs);
+
+        MessageDispatchNotification info = (MessageDispatchNotification)o;
+        info.setConsumerId((org.apache.activemq.command.ConsumerId) unmarsalCachedObject(wireFormat, dataIn, bs));
+        info.setDestination((org.apache.activemq.command.ActiveMQDestination) unmarsalCachedObject(wireFormat, dataIn, bs));
+        info.setDeliverySequenceId(unmarshalLong(wireFormat, dataIn, bs));
+        info.setMessageId((org.apache.activemq.command.MessageId) unmarsalNestedObject(wireFormat, dataIn, bs));
+
+    }
+
+
+    /**
+     * Write the booleans that this object uses to a BooleanStream
+     */
+    public int marshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) throws IOException {
+
+        MessageDispatchNotification info = (MessageDispatchNotification)o;
+
+        int rc = super.marshal1(wireFormat, o, bs);
+        rc += marshal1CachedObject(wireFormat, info.getConsumerId(), bs);
+        rc += marshal1CachedObject(wireFormat, info.getDestination(), bs);
+        rc+=marshal1Long(wireFormat, info.getDeliverySequenceId(), bs);
+        rc += marshal1NestedObject(wireFormat, info.getMessageId(), bs);
+
+        return rc+0;
+    }
+
+    /**
+     * Write a object instance to data output stream
+     *
+     * @param o the instance to be marshaled
+     * @param dataOut the output stream
+     * @throws IOException thrown if an error occurs
+     */
+    public void marshal2(OpenWireFormat wireFormat, Object o, DataOutputStream dataOut, BooleanStream bs) throws IOException {
+        super.marshal2(wireFormat, o, dataOut, bs);
+
+        MessageDispatchNotification info = (MessageDispatchNotification)o;
+        marshal2CachedObject(wireFormat, info.getConsumerId(), dataOut, bs);
+        marshal2CachedObject(wireFormat, info.getDestination(), dataOut, bs);
+        marshal2Long(wireFormat, info.getDeliverySequenceId(), dataOut, bs);
+        marshal2NestedObject(wireFormat, info.getMessageId(), dataOut, bs);
+
+    }
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java Wed Jan 18 11:16:58 2006
@@ -26,6 +26,7 @@
 import org.apache.activemq.command.KeepAliveInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
@@ -70,6 +71,7 @@
     Response processRecoverTransactions(TransactionInfo info) throws Throwable;
     Response processForgetTransaction(TransactionInfo info) throws Throwable;
     Response processEndTransaction(TransactionInfo info) throws Throwable;
+    Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Throwable;
     
 }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Wed Jan 18 11:16:58 2006
@@ -30,6 +30,7 @@
 import org.apache.activemq.command.KeepAliveInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
@@ -270,6 +271,10 @@
     }
 
     public Response processFlush(FlushCommand command) throws Throwable {
+        return null;
+    }
+    
+    public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Throwable{
         return null;
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed Jan 18 11:16:58 2006
@@ -23,6 +23,7 @@
 import java.util.Iterator;
 import java.util.Random;
 
+import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.state.ConnectionStateTracker;
@@ -69,6 +70,8 @@
     private long maxReconnectDelay = 1000 * 30;
     private long backOffMultiplier = 2;
     private boolean useExponentialBackOff = true;
+    private boolean randomize = true;
+    private boolean initialized;
     private int maxReconnectAttempts;
     private int connectFailures;
     private long reconnectDelay = initialReconnectDelay;
@@ -79,6 +82,20 @@
             if (command.isResponse()) {
                 requestMap.remove(new Short(((Response) command).getCorrelationId()));
             }
+            if (!initialized){
+                if (command.isBrokerInfo()){
+                    BrokerInfo info = (BrokerInfo)command;
+                    BrokerInfo[] peers = info.getPeerBrokerInfos();
+                    if (peers!= null){
+                        for (int i =0; i < peers.length;i++){
+                            String brokerString = peers[i].getBrokerURL();
+                            add(brokerString);
+                        }
+                    }
+                initialized = true;
+                }
+                
+            }
             transportListener.onCommand(command);
         }
 
@@ -173,6 +190,7 @@
         synchronized (reconnectMutex) {
             log.debug("Transport failed, starting up reconnect task", e);
             if (connectedTransport != null) {
+                initialized = false;
                 ServiceSupport.dispose(connectedTransport);
                 connectedTransport = null;
                 connectedTransportURI = null;
@@ -256,6 +274,20 @@
         this.maxReconnectAttempts = maxReconnectAttempts;
     }
 
+    /**
+     * @return Returns the randomize.
+     */
+    public boolean isRandomize(){
+        return randomize;
+    }
+
+    /**
+     * @param randomize The randomize to set.
+     */
+    public void setRandomize(boolean randomize){
+        this.randomize=randomize;
+    }
+
     public void oneway(Command command) throws IOException {
         Exception error = null;
         try {
@@ -335,6 +367,19 @@
         }
         reconnect();
     }
+    
+    public void add(String u){
+        try {
+        URI uri = new URI(u);
+        if (!uris.contains(uri))
+            uris.add(uri);
+
+        reconnect();
+        }catch(Exception e){
+            log.error("Failed to parse URI: " + u);
+        }
+    }
+
 
     public void reconnect() {
         log.debug("Waking up reconnect task");
@@ -345,17 +390,18 @@
         }
     }
 
-    private ArrayList getConnectList() {
-        ArrayList l = new ArrayList(uris);
-
-        // Randomly, reorder the list by random swapping
-        Random r = new Random();
-        r.setSeed(System.currentTimeMillis());
-        for (int i = 0; i < l.size(); i++) {
-            int p = r.nextInt(l.size());
-            Object t = l.get(p);
-            l.set(p, l.get(i));
-            l.set(i, t);
+    private ArrayList getConnectList(){
+        ArrayList l=new ArrayList(uris);
+        if (randomize){
+            // Randomly, reorder the list by random swapping
+            Random r=new Random();
+            r.setSeed(System.currentTimeMillis());
+            for (int i=0;i<l.size();i++){
+                int p=r.nextInt(l.size());
+                Object t=l.get(p);
+                l.set(p,l.get(i));
+                l.set(i,t);
+            }
         }
         return l;
     }

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/FTBrokerTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/FTBrokerTest.java?rev=370223&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/FTBrokerTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/FTBrokerTest.java Wed Jan 18 11:16:58 2006
@@ -0,0 +1,97 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.net.URI;
+
+import javax.jms.*;
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.core.io.ClassPathResource;
+
+public class FTBrokerTest extends TestCase {
+    
+    protected static final int MESSAGE_COUNT = 10;
+    protected BrokerService master;
+    protected BrokerService slave;
+    protected Connection connection;
+    protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false";
+    //protected String uriString = "tcp://localhost:62001";
+
+    protected void setUp() throws Exception {
+        BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/master.xml"));
+        brokerFactory.afterPropertiesSet();
+        master = brokerFactory.getBroker();
+        brokerFactory = new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/ft/slave.xml"));
+        brokerFactory.afterPropertiesSet();
+        slave = brokerFactory.getBroker();
+        //uriString = "failover://(" + master.getVmConnectorURI() + "," + slave.getVmConnectorURI() + ")?randomize=false";
+        //uriString = "failover://(" + master.getVmConnectorURI() + "," + slave.getVmConnectorURI() + ")";
+        System.out.println("URI = " + uriString);
+        URI uri = new URI(uriString);
+        ConnectionFactory fac = new ActiveMQConnectionFactory(uri);
+        connection = fac.createConnection();
+        master.start();
+        slave.start();
+        //wait for thing to connect
+        Thread.sleep(1000);
+        connection.start();
+        super.setUp();
+        
+        
+       
+    }
+
+
+ 
+    
+    protected void tearDown() throws Exception {
+        try {
+        connection.close();
+        slave.stop();
+        master.stop();
+        }catch(Throwable e){
+            e.printStackTrace();
+        }
+        
+        super.tearDown();
+    }
+    
+    public void testFTBroker() throws Exception{
+       
+        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getClass().toString());
+        MessageProducer producer = session.createProducer(queue);
+        for (int i = 0; i < MESSAGE_COUNT; i++){
+            Message msg = session.createTextMessage("test: " + i);
+            producer.send(msg);
+        }
+        master.stop();
+        session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queue);
+        for (int i = 0; i < MESSAGE_COUNT; i++){
+            System.out.println("GOT MSG: " + consumer.receive(1000));
+        }
+        
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/FTBrokerTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml?rev=370223&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml (added)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml Wed Jan 18 11:16:58 2006
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Copyright 2005-2006 The Apache Software Foundation
+   
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://activemq.org/config/1.0">
+
+  <broker brokerName="master" persistent="false" useJmx="false">
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:62001"/>
+    </transportConnectors>
+
+    <persistenceAdapter>
+      <memoryPersistenceAdapter/>
+    </persistenceAdapter>
+  </broker>
+
+</beans>

Propchange: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml?rev=370223&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml (added)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml Wed Jan 18 11:16:58 2006
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Copyright 2005-2006 The Apache Software Foundation
+   
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://activemq.org/config/1.0">
+
+  <broker brokerName="slave" persistent="false" useJmx="false" masterConnectorURI="tcp://localhost:62001">
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:62002"/>
+    </transportConnectors>
+    
+    
+
+    <persistenceAdapter>
+      <memoryPersistenceAdapter/>
+    </persistenceAdapter>
+  </broker>
+
+</beans>

Propchange: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message