activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r367958 - in /incubator/activemq/trunk: activemq-core/src/test/java/org/apache/activemq/ assembly/src/test/java/org/apache/activemq/usecases/
Date Wed, 11 Jan 2006 07:43:31 GMT
Author: aco
Date: Tue Jan 10 23:43:05 2006
New Revision: 367958

URL: http://svn.apache.org/viewcvs?rev=367958&view=rev
Log:
- Added two broker test where each broker has a client that connects and disconnects and the
destination is a queue
- Minor updates to some of the test cases and test support

Added:
    incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
Modified:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
    incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
    incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
    incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=367958&r1=367957&r2=367958&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
Tue Jan 10 23:43:05 2006
@@ -56,6 +56,7 @@
 
     protected int messageSize = 1;
 
+    protected boolean persistentDelivery = true;
     protected boolean verbose = false;
 
     protected void bridgeBrokers(String localBrokerName, String remoteBrokerName) throws
Exception {
@@ -140,6 +141,14 @@
         return broker;
     }
 
+    protected ConnectionFactory getConnectionFactory(String brokerName) throws Exception
{
+        BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
+        if (brokerItem != null) {
+            return brokerItem.factory;
+        }
+        return null;
+    }
+
     protected Connection createConnection(String brokerName) throws Exception {
         BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
         if (brokerItem != null) {
@@ -188,6 +197,7 @@
         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
         MessageProducer producer = brokerItem.createProducer(destination, sess);
+        producer.setDeliveryMode(persistentDelivery ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
 
         for (int i = 0; i < count; i++) {
             TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" +
i);
@@ -350,7 +360,6 @@
                 try {
                     c.close();
                 } catch (ConnectionClosedException e) {
-                    e.printStackTrace();
                 }
             }
 

Modified: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java?rev=367958&r1=367957&r2=367958&view=diff
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
(original)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
Tue Jan 10 23:43:05 2006
@@ -27,6 +27,7 @@
  * @version $Revision: 1.1.1.1 $
  */
 public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
+    protected static final int MESSAGE_COUNT = 100;
 
     /**
      * BrokerA -> BrokerB -> BrokerC
@@ -45,7 +46,7 @@
         MessageConsumer clientC = createConsumer("BrokerC", dest);
 
         // Send messages
-        sendMessages("BrokerA", dest, 10);
+        sendMessages("BrokerA", dest, MESSAGE_COUNT);
 
         // Let's try to wait for any messages. Should be none.
         Thread.sleep(1000);
@@ -73,7 +74,7 @@
         MessageConsumer clientC = createConsumer("BrokerC", dest);
 
         // Send messages
-        sendMessages("BrokerB", dest, 10);
+        sendMessages("BrokerB", dest, MESSAGE_COUNT);
 
         // Let's try to wait for any messages.
         Thread.sleep(1000);
@@ -82,8 +83,8 @@
         MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
         MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
 
-        // Total received should be 10
-        assertEquals(10, msgsA.getMessageCount() + msgsC.getMessageCount());
+        // Total received should be 100
+        assertEquals(MESSAGE_COUNT, msgsA.getMessageCount() + msgsC.getMessageCount());
     }
 
     /**
@@ -103,15 +104,15 @@
         MessageConsumer clientB = createConsumer("BrokerB", dest);
 
         // Send messages
-        sendMessages("BrokerA", dest, 10);
-        sendMessages("BrokerC", dest, 10);
+        sendMessages("BrokerA", dest, MESSAGE_COUNT);
+        sendMessages("BrokerC", dest, MESSAGE_COUNT);
 
         // Get message count
         MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
 
-        msgsB.waitForMessagesToArrive(20);
+        msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 2);
 
-        assertEquals(20, msgsB.getMessageCount());
+        assertEquals(MESSAGE_COUNT * 2, msgsB.getMessageCount());
     }
 
     /**
@@ -137,9 +138,9 @@
         MessageConsumer clientC = createConsumer("BrokerC", dest);
 
         // Send messages
-        sendMessages("BrokerA", dest, 10);
-        sendMessages("BrokerB", dest, 10);
-        sendMessages("BrokerC", dest, 10);
+        sendMessages("BrokerA", dest, MESSAGE_COUNT);
+        sendMessages("BrokerB", dest, MESSAGE_COUNT);
+        sendMessages("BrokerC", dest, MESSAGE_COUNT);
 
         // Let's try to wait for any messages.
         Thread.sleep(1000);
@@ -149,7 +150,7 @@
         MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
         MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
 
-        assertEquals(30, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
+        assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount()
+ msgsC.getMessageCount());
     }
 
     /**
@@ -170,9 +171,9 @@
         MessageConsumer clientC = createConsumer("BrokerC", dest);
 
         // Send messages
-        sendMessages("BrokerA", dest, 10);
-        sendMessages("BrokerB", dest, 10);
-        sendMessages("BrokerC", dest, 10);
+        sendMessages("BrokerA", dest, MESSAGE_COUNT);
+        sendMessages("BrokerB", dest, MESSAGE_COUNT);
+        sendMessages("BrokerC", dest, MESSAGE_COUNT);
 
         // Let's try to wait for any messages.
         Thread.sleep(1000);
@@ -182,7 +183,7 @@
         MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
         MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
 
-        assertEquals(30, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
+        assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount()
+ msgsC.getMessageCount());
     }
 
     public void setUp() throws Exception {

Modified: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java?rev=367958&r1=367957&r2=367958&view=diff
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
(original)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
Tue Jan 10 23:43:05 2006
@@ -27,6 +27,7 @@
  * @version $Revision: 1.1.1.1 $
  */
 public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
+    protected static final int MESSAGE_COUNT = 100;
 
     /**
      * BrokerA -> BrokerB -> BrokerC
@@ -47,22 +48,22 @@
         MessageConsumer clientC = createConsumer("BrokerC", dest);
 
         // Send messages
-        sendMessages("BrokerA", dest, 10);
-        sendMessages("BrokerB", dest, 10);
-        sendMessages("BrokerC", dest, 10);
+        sendMessages("BrokerA", dest, MESSAGE_COUNT);
+        sendMessages("BrokerB", dest, MESSAGE_COUNT);
+        sendMessages("BrokerC", dest, MESSAGE_COUNT);
 
         // Get message count
         MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
         MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
         MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
 
-        msgsA.waitForMessagesToArrive(10);
-        msgsB.waitForMessagesToArrive(20);
-        msgsC.waitForMessagesToArrive(20);
-
-        assertEquals(10, msgsA.getMessageCount());
-        assertEquals(20, msgsB.getMessageCount());
-        assertEquals(20, msgsC.getMessageCount());
+        msgsA.waitForMessagesToArrive(MESSAGE_COUNT);
+        msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 2);
+        msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 2);
+
+        assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
+        assertEquals(MESSAGE_COUNT * 2, msgsB.getMessageCount());
+        assertEquals(MESSAGE_COUNT * 2, msgsC.getMessageCount());
     }
 
     /**
@@ -84,22 +85,22 @@
         MessageConsumer clientC = createConsumer("BrokerC", dest);
 
         // Send messages
-        sendMessages("BrokerA", dest, 10);
-        sendMessages("BrokerB", dest, 10);
-        sendMessages("BrokerC", dest, 10);
+        sendMessages("BrokerA", dest, MESSAGE_COUNT);
+        sendMessages("BrokerB", dest, MESSAGE_COUNT);
+        sendMessages("BrokerC", dest, MESSAGE_COUNT);
 
         // Get message count
         MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
         MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
         MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
 
-        msgsA.waitForMessagesToArrive(20);
-        msgsB.waitForMessagesToArrive(10);
-        msgsC.waitForMessagesToArrive(20);
-
-        assertEquals(20, msgsA.getMessageCount());
-        assertEquals(10, msgsB.getMessageCount());
-        assertEquals(20, msgsC.getMessageCount());
+        msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 2);
+        msgsB.waitForMessagesToArrive(MESSAGE_COUNT);
+        msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 2);
+
+        assertEquals(MESSAGE_COUNT * 2, msgsA.getMessageCount());
+        assertEquals(MESSAGE_COUNT, msgsB.getMessageCount());
+        assertEquals(MESSAGE_COUNT * 2, msgsC.getMessageCount());
     }
 
     /**
@@ -121,22 +122,22 @@
         MessageConsumer clientC = createConsumer("BrokerC", dest);
 
         // Send messages
-        sendMessages("BrokerA", dest, 10);
-        sendMessages("BrokerB", dest, 10);
-        sendMessages("BrokerC", dest, 10);
+        sendMessages("BrokerA", dest, MESSAGE_COUNT);
+        sendMessages("BrokerB", dest, MESSAGE_COUNT);
+        sendMessages("BrokerC", dest, MESSAGE_COUNT);
 
         // Get message count
         MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
         MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
         MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
 
-        msgsA.waitForMessagesToArrive(10);
-        msgsB.waitForMessagesToArrive(30);
-        msgsC.waitForMessagesToArrive(10);
-
-        assertEquals(10, msgsA.getMessageCount());
-        assertEquals(30, msgsB.getMessageCount());
-        assertEquals(10, msgsC.getMessageCount());
+        msgsA.waitForMessagesToArrive(MESSAGE_COUNT);
+        msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3);
+        msgsC.waitForMessagesToArrive(MESSAGE_COUNT);
+
+        assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
+        assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount());
+        assertEquals(MESSAGE_COUNT, msgsC.getMessageCount());
     }
 
     /**
@@ -162,22 +163,22 @@
         MessageConsumer clientC = createConsumer("BrokerC", dest);
 
         // Send messages
-        sendMessages("BrokerA", dest, 10);
-        sendMessages("BrokerB", dest, 10);
-        sendMessages("BrokerC", dest, 10);
+        sendMessages("BrokerA", dest, MESSAGE_COUNT);
+        sendMessages("BrokerB", dest, MESSAGE_COUNT);
+        sendMessages("BrokerC", dest, MESSAGE_COUNT);
 
         // Get message count
         MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
         MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
         MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
 
-        msgsA.waitForMessagesToArrive(30);
-        msgsB.waitForMessagesToArrive(30);
-        msgsC.waitForMessagesToArrive(30);
-
-        assertEquals(30, msgsA.getMessageCount());
-        assertEquals(30, msgsB.getMessageCount());
-        assertEquals(30, msgsC.getMessageCount());
+        msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 3);
+        msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3);
+        msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 3);
+
+        assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount());
+        assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount());
+        assertEquals(MESSAGE_COUNT * 3, msgsC.getMessageCount());
     }
 
     /**
@@ -198,22 +199,22 @@
         MessageConsumer clientC = createConsumer("BrokerC", dest);
 
         // Send messages
-        sendMessages("BrokerA", dest, 10);
-        sendMessages("BrokerB", dest, 10);
-        sendMessages("BrokerC", dest, 10);
+        sendMessages("BrokerA", dest, MESSAGE_COUNT);
+        sendMessages("BrokerB", dest, MESSAGE_COUNT);
+        sendMessages("BrokerC", dest, MESSAGE_COUNT);
 
         // Get message count
         MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
         MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
         MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
 
-        msgsA.waitForMessagesToArrive(30);
-        msgsB.waitForMessagesToArrive(30);
-        msgsC.waitForMessagesToArrive(30);
-
-        assertEquals(30, msgsA.getMessageCount());
-        assertEquals(30, msgsB.getMessageCount());
-        assertEquals(30, msgsC.getMessageCount());
+        msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 3);
+        msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3);
+        msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 3);
+
+        assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount());
+        assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount());
+        assertEquals(MESSAGE_COUNT * 3, msgsC.getMessageCount());
     }
 
     public void setUp() throws Exception {

Modified: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java?rev=367958&r1=367957&r2=367958&view=diff
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java
(original)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java
Tue Jan 10 23:43:05 2006
@@ -36,6 +36,8 @@
  * @version $Revision: 1.1.1.1 $
  */
 public class TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest extends JmsMultipleBrokersTestSupport
{
+    protected static final int MESSAGE_COUNT = 10;
+
     protected List bridges;
     protected AtomicInteger msgDispatchCount;
 
@@ -56,20 +58,20 @@
         MessageConsumer clientB = createConsumer("BrokerB", dest);
 
         // Send messages
-        sendMessages("BrokerA", dest, 10);
+        sendMessages("BrokerA", dest, MESSAGE_COUNT);
 
         // Get message count
         MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
         MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
 
-        msgsA.waitForMessagesToArrive(10);
-        msgsB.waitForMessagesToArrive(10);
+        msgsA.waitForMessagesToArrive(MESSAGE_COUNT);
+        msgsB.waitForMessagesToArrive(MESSAGE_COUNT);
 
-        assertEquals(10, msgsA.getMessageCount());
-        assertEquals(10, msgsB.getMessageCount());
+        assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
+        assertEquals(MESSAGE_COUNT, msgsB.getMessageCount());
 
         // Check that 10 message dispatch commands are send over the network
-        assertEquals(10, msgDispatchCount.get());
+        assertEquals(MESSAGE_COUNT, msgDispatchCount.get());
     }
 
     /**
@@ -88,14 +90,14 @@
         MessageConsumer clientA = createConsumer("BrokerA", dest);
 
         // Send messages
-        sendMessages("BrokerA", dest, 10);
+        sendMessages("BrokerA", dest, MESSAGE_COUNT);
 
         // Get message count
         MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
 
-        msgsA.waitForMessagesToArrive(10);
+        msgsA.waitForMessagesToArrive(MESSAGE_COUNT);
 
-        assertEquals(10, msgsA.getMessageCount());
+        assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
 
         // Check that no message dispatch commands are send over the network
         assertEquals(0, msgDispatchCount.get());

Added: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java?rev=367958&view=auto
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
(added)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
Tue Jan 10 23:43:05 2006
@@ -0,0 +1,365 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageConsumer;
+import java.net.URI;
+
+/**
+ * @version $Revision: 1.1.1.1 $
+ */
+public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSupport {
+    protected static final int MESSAGE_COUNT  = 100; // Best if a factor of 100
+    protected static final int PREFETCH_COUNT = 1;
+
+    protected int msgsClient1, msgsClient2;
+    protected String broker1, broker2;
+
+    public void testClientAReceivesOnly() throws Exception {
+        broker1 = "BrokerA";
+        broker2 = "BrokerB";
+
+        doOneClientReceivesOnly();
+    }
+
+    public void testClientBReceivesOnly() throws Exception {
+        broker1 = "BrokerB";
+        broker2 = "BrokerA";
+
+        doOneClientReceivesOnly();
+    }
+
+    public void doOneClientReceivesOnly() throws Exception {
+        // Bridge brokers
+        bridgeBrokers(broker1, broker2);
+        bridgeBrokers(broker2, broker1);
+
+        // Run brokers
+        startAllBrokers();
+
+        // Create queue
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Create consumers
+        MessageConsumer client1 = createConsumer(broker1, dest);
+        MessageConsumer client2 = createConsumer(broker2, dest);
+
+        // Give clients time to register with broker
+        Thread.sleep(500);
+
+        // Always send messages to broker A
+        sendMessages("BrokerA", dest, MESSAGE_COUNT);
+
+        // Close the second client, messages should be sent to the first client
+        client2.close();
+
+        // Let the first client receive all messages
+        msgsClient1 += receiveAllMessages(client1);
+        client1.close();
+
+        // First client should have received 100 messages
+        assertEquals("Client for " + broker1 + " should have receive all messages.", MESSAGE_COUNT,
msgsClient1);
+    }
+
+    public void testClientAReceivesOnlyAfterReconnect() throws Exception {
+        broker1 = "BrokerA";
+        broker2 = "BrokerB";
+
+        doOneClientReceivesOnlyAfterReconnect();
+    }
+
+    public void testClientBReceivesOnlyAfterReconnect() throws Exception {
+        broker1 = "BrokerB";
+        broker2 = "BrokerA";
+
+        doOneClientReceivesOnlyAfterReconnect();
+    }
+
+    public void doOneClientReceivesOnlyAfterReconnect() throws Exception {
+        // Bridge brokers
+        bridgeBrokers(broker1, broker2);
+        bridgeBrokers(broker2, broker1);
+
+        // Run brokers
+        startAllBrokers();
+
+        // Create queue
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Create first consumer
+        MessageConsumer client1 = createConsumer(broker1, dest);
+        MessageConsumer client2 = createConsumer(broker2, dest);
+
+        // Give clients time to register with broker
+        Thread.sleep(500);
+
+        // Always send message to broker A
+        sendMessages("BrokerA", dest, MESSAGE_COUNT);
+
+        // Let the first client receive the first 20% of messages
+        msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
+
+        // Disconnect the first client
+        client1.close();
+
+        // Create another client for the first broker
+        client1 = createConsumer(broker1, dest);
+        Thread.sleep(500);
+
+        // Close the second client, messages should be sent to the first client
+        client2.close();
+
+        // Receive the rest of the messages
+        msgsClient1 += receiveAllMessages(client1);
+        client1.close();
+
+        // The first client should have received 100 messages
+        assertEquals("Client for " + broker1 + " should have received all messages.", MESSAGE_COUNT,
msgsClient1);
+    }
+
+    public void testTwoClientsReceiveClientADisconnects() throws Exception {
+        broker1 = "BrokerA";
+        broker2 = "BrokerB";
+
+        doTwoClientsReceiveOneClientDisconnects();
+    }
+
+    public void testTwoClientsReceiveClientBDisconnects() throws Exception {
+        broker1 = "BrokerB";
+        broker2 = "BrokerA";
+
+        doTwoClientsReceiveOneClientDisconnects();
+    }
+
+    public void doTwoClientsReceiveOneClientDisconnects() throws Exception {
+        // Bridge brokers
+        bridgeBrokers(broker1, broker2);
+        bridgeBrokers(broker2, broker1);
+
+        // Run brokers
+        startAllBrokers();
+
+        // Create queue
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Create first client
+        MessageConsumer client1 = createConsumer(broker1, dest);
+        MessageConsumer client2 = createConsumer(broker2, dest);
+
+        // Give clients time to register with broker
+        Thread.sleep(500);
+
+        // Always send messages to broker A
+        sendMessages("BrokerA", dest, MESSAGE_COUNT);
+
+        // Let each client receive 20% of the messages - 40% total
+        msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
+        msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
+
+        // Disconnect the first client
+        client1.close();
+
+        // Let the second client receive the rest of the messages
+        msgsClient2 += receiveAllMessages(client2);
+        client2.close();
+
+        // First client should have received 20% of the messages
+        assertEquals("Client for " + broker1 + " should have received 20% of the messages.",
(int)(MESSAGE_COUNT * 0.20), msgsClient1);
+
+        // Second client should have received 80% of the messages
+        assertEquals("Client for " + broker2 + " should have received 80% of the messages.",
(int)(MESSAGE_COUNT * 0.80), msgsClient2);
+    }
+
+    public void testTwoClientsReceiveClientAReconnects() throws Exception {
+        broker1 = "BrokerA";
+        broker2 = "BrokerB";
+
+        doTwoClientsReceiveOneClientReconnects();
+    }
+
+    public void testTwoClientsReceiveClientBReconnects() throws Exception {
+        broker1 = "BrokerB";
+        broker2 = "BrokerA";
+
+        doTwoClientsReceiveOneClientReconnects();
+    }
+
+    public void doTwoClientsReceiveOneClientReconnects() throws Exception {
+        // Bridge brokers
+        bridgeBrokers(broker1, broker2);
+        bridgeBrokers(broker2, broker1);
+
+        // Run brokers
+        startAllBrokers();
+
+        // Create queue
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Create the first client
+        MessageConsumer client1 = createConsumer(broker1, dest);
+        MessageConsumer client2 = createConsumer(broker2, dest);
+
+        // Give clients time to register with broker
+        Thread.sleep(500);
+
+        // Always send messages to broker A
+        sendMessages("BrokerA", dest, MESSAGE_COUNT);
+
+        // Let each client receive 20% of the messages - 40% total
+        msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
+        msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
+
+        // Disconnect the first client
+        client1.close();
+
+        // Let the second client receive 20% more of the total messages
+        msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
+
+        // Create another client for broker 1
+        client1 = createConsumer(broker1, dest);
+        Thread.sleep(500);
+
+        // Let each client receive 20% of the messages - 40% total
+        msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
+        client1.close();
+
+        msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
+        client2.close();
+
+        // First client should have received 40 messages
+        assertEquals("Client for " + broker1 + " should have received 40% of the messages.",
(int)(MESSAGE_COUNT * 0.40), msgsClient1);
+
+        // Second client should have received 60 messages
+        assertEquals("Client for " + broker2 + " should have received 60% of the messages.",
(int)(MESSAGE_COUNT * 0.60), msgsClient2);
+    }
+
+    public void testTwoClientsReceiveTwoClientReconnects() throws Exception {
+        broker1 = "BrokerA";
+        broker2 = "BrokerB";
+
+        // Bridge brokers
+        bridgeBrokers(broker1, broker2);
+        bridgeBrokers(broker2, broker1);
+
+        // Run brokers
+        startAllBrokers();
+
+        // Create queue
+        Destination dest = createDestination("TEST.FOO", false);
+
+        // Create the first client
+        MessageConsumer client1 = createConsumer(broker1, dest);
+        MessageConsumer client2 = createConsumer(broker2, dest);
+
+        // Give clients time to register with broker
+        Thread.sleep(500);
+
+        // Always send messages to broker A
+        sendMessages("BrokerA", dest, MESSAGE_COUNT);
+
+        // Let each client receive 20% of the messages - 40% total
+        msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.20));
+        msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.20));
+
+        // Disconnect both clients
+        client1.close();
+        client2.close();
+
+        // Create another two clients for each broker
+        client1 = createConsumer(broker1, dest);
+        client2 = createConsumer(broker2, dest);
+        Thread.sleep(500);
+
+        // Let each client receive 30% more of the total messages  - 60% total
+        msgsClient1 += receiveExactMessages(client1, (int)(MESSAGE_COUNT * 0.30));
+        client1.close();
+
+        msgsClient2 += receiveExactMessages(client2, (int)(MESSAGE_COUNT * 0.30));
+        client2.close();
+
+        // First client should have received 50% of the messages
+        assertEquals("Client for " + broker1 + " should have received 50% of the messages.",
(int)(MESSAGE_COUNT * 0.50), msgsClient1);
+
+        // Second client should have received 50% of the messages
+        assertEquals("Client for " + broker2 + " should have received 50% of the messages.",
(int)(MESSAGE_COUNT * 0.50), msgsClient2);
+    }
+
+    protected int receiveExactMessages(MessageConsumer consumer, int msgCount) throws Exception
{
+        Message msg;
+        int i;
+        for (i=0; i<msgCount; i++) {
+            msg = consumer.receive(1000);
+            if (msg == null) {
+                System.err.println("Consumer failed to receive exactly " + msgCount + " messages.
Actual messages received is: " + i);
+                break;
+            }
+        }
+
+        return i;
+    }
+
+    protected int receiveAllMessages(MessageConsumer consumer) throws Exception {
+        int msgsReceived = 0;
+
+        Message msg;
+        do {
+            msg = consumer.receive(1000);
+            if (msg != null) {
+                msgsReceived++;
+            }
+        } while (msg != null);
+
+        return msgsReceived;
+    }
+
+    protected MessageConsumer createConsumer(String brokerName, Destination dest) throws
Exception {
+        Connection conn = createConnection(brokerName);
+        conn.start();
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        return sess.createConsumer(dest);
+    }
+
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
+        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
+
+        // Configure broker connection factory
+        ActiveMQConnectionFactory factoryA, factoryB;
+        factoryA = (ActiveMQConnectionFactory)getConnectionFactory("BrokerA");
+        factoryB = (ActiveMQConnectionFactory)getConnectionFactory("BrokerB");
+
+        // Set prefetch policy
+        ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
+        policy.setAll(PREFETCH_COUNT);
+
+        factoryA.setPrefetchPolicy(policy);
+        factoryB.setPrefetchPolicy(policy);
+
+        msgsClient1 = 0;
+        msgsClient2 = 0;
+    }
+}



Mime
View raw message