activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r359769 - in /incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq: ./ broker/ broker/policy/
Date Thu, 29 Dec 2005 10:08:16 GMT
Author: aco
Date: Thu Dec 29 02:07:55 2005
New Revision: 359769

URL: http://svn.apache.org/viewcvs?rev=359769&view=rev
Log:
- Added test support for multiple consumers and producers
- Added test cases for queue and topic subscriptions
- Added test cases for the different dispatch policies

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java?rev=359769&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
Thu Dec 29 02:07:55 2005
@@ -0,0 +1,273 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Arrays;
+import java.util.Collections;
+import java.net.URI;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.*;
+
+/**
+ * Test case support used to test multiple message comsumers and message producers connecting
to a single broker.
+ * 
+ * @version $Revision$
+ */
+public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
+    private AtomicInteger producerLock;
+
+    protected Map consumers = new HashMap(); // Map of consumer with messages received
+    protected int consumerCount = 1;
+    protected int producerCount = 1;
+
+    protected int messageSize  = 1024;
+
+    protected boolean useConcurrentSend = true;
+    protected boolean durable = false;
+    protected boolean topic = false;
+
+    protected BrokerService broker;
+    protected Destination destination;
+    protected List connections = Collections.synchronizedList(new ArrayList());
+
+    protected void startProducers(Destination dest, int msgCount) throws Exception {
+        startProducers(createConnectionFactory(), dest, msgCount);
+    }
+
+    protected void startProducers(final ConnectionFactory factory, final Destination dest,
final int msgCount) throws Exception {
+        // Use concurrent send
+        if (useConcurrentSend) {
+            producerLock = new AtomicInteger(producerCount);
+
+            for (int i=0; i<producerCount; i++) {
+                Thread t = new Thread(new Runnable() {
+                    public void run() {
+                        try {
+                            sendMessages(factory.createConnection(), dest, msgCount);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+
+                        synchronized (producerLock) {
+                            producerLock.decrementAndGet();
+                            producerLock.notifyAll();
+                        }
+                    }
+                });
+
+                t.start();
+            }
+
+            // Wait for all producers to finish sending
+            synchronized (producerLock) {
+                while (producerLock.get() != 0) {
+                    producerLock.wait();
+                }
+            }
+
+
+        // Use serialized send
+        } else {
+            for (int i=0; i<producerCount; i++) {
+                sendMessages(factory.createConnection(), dest, msgCount);
+            }
+        }
+    }
+
+    protected void sendMessages(Connection connection, Destination destination, int count)
throws Exception {
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+
+        for (int i = 0; i < count; i++) {
+            TextMessage msg = createTextMessage(session, "" + i);
+            producer.send(msg);
+        }
+
+        producer.close();
+        session.close();
+        connection.close();
+    }
+
+    protected TextMessage createTextMessage(Session session, String initText) throws Exception
{
+        TextMessage msg = session.createTextMessage();
+
+        // Pad message text
+        if (initText.length() < messageSize) {
+            char[] data = new char[messageSize - initText.length()];
+            Arrays.fill(data, '*');
+            String str = new String(data);
+            msg.setText(initText + str);
+
+        // Do not pad message text
+        } else {
+            msg.setText(initText);
+        }
+
+        return msg;
+    }
+
+    protected void startConsumers(Destination dest) throws Exception {
+        startConsumers(createConnectionFactory(), dest);
+    }
+
+    protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception
{
+        MessageConsumer consumer;
+        for (int i=0; i<consumerCount; i++) {
+            if (durable && topic) {
+                consumer = createDurableSubscriber(factory.createConnection(), dest, "consumer"
+ (i+1));
+            } else {
+                consumer = createMessageConsumer(factory.createConnection(), dest);
+            }
+            // Add consumer object and message list
+            consumers.put(consumer, new ArrayList());
+        }
+    }
+
+    protected MessageConsumer createMessageConsumer(Connection conn, Destination dest) throws
Exception {
+        connections.add(conn);
+
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final MessageConsumer consumer = sess.createConsumer(dest);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                List messageList = (List)consumers.get(consumer);
+                messageList.add(message);
+            }
+        });
+        conn.start();
+
+        return consumer;
+    }
+
+    protected TopicSubscriber createDurableSubscriber(Connection conn, Destination dest,
String name) throws Exception {
+        conn.setClientID(name);
+        connections.add(conn);
+
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final TopicSubscriber consumer = sess.createDurableSubscriber((javax.jms.Topic)dest,
name);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                List messageList = (List)consumers.get(consumer);
+                messageList.add(message);
+            }
+        });
+        conn.start();
+
+        return consumer;
+    }
+
+    protected void waitForAllMessagesToBeReceived(int timeout) throws Exception {
+        Thread.sleep(timeout);
+    }
+
+    protected ActiveMQDestination createDestination() throws JMSException {
+        if (topic) {
+            destination = new ActiveMQTopic("Topic");
+            return (ActiveMQDestination)destination;
+        } else {
+            destination = new ActiveMQQueue("Queue");
+            return (ActiveMQDestination)destination;
+        }
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("vm://localhost");
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false"));
+    }
+
+    protected void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+        broker = createBroker();
+        broker.start();
+    }
+
+    protected void tearDown() throws Exception {
+        for (Iterator iter = connections.iterator(); iter.hasNext();) {
+            Connection conn= (Connection) iter.next();
+            try {
+                conn.close();
+            } catch (Throwable e) {
+            }
+        }
+        broker.stop();
+        super.tearDown();
+    }
+
+    /*
+     * Some helpful assertions for multiple consumers.
+     */
+    protected void assertConsumerReceivedAtLeastXMessages(MessageConsumer consumer, int msgCount)
{
+        List messageList = (List)consumers.get(consumer);
+        assertTrue("Consumer received less than " + msgCount + " messages. Actual messages
received is " + messageList.size(), (messageList.size() >= msgCount));
+    }
+
+    protected void assertConsumerReceivedAtMostXMessages(MessageConsumer consumer, int msgCount)
{
+        List messageList = (List)consumers.get(consumer);
+        assertTrue("Consumer received more than " + msgCount + " messages. Actual messages
received is " + messageList.size(), (messageList.size() <= msgCount));
+    }
+
+    protected void assertConsumerReceivedXMessages(MessageConsumer consumer, int msgCount)
{
+        List messageList = (List)consumers.get(consumer);
+        assertTrue("Consumer should have received exactly " + msgCount + " messages. Actual
messages received is " + messageList.size(), (messageList.size() == msgCount));
+    }
+
+    protected void assertEachConsumerReceivedAtLeastXMessages(int msgCount) {
+        for (Iterator i=consumers.keySet().iterator();i.hasNext();) {
+            assertConsumerReceivedAtLeastXMessages((MessageConsumer)i.next(), msgCount);
+        }
+    }
+
+    protected void assertEachConsumerReceivedAtMostXMessages(int msgCount) {
+        for (Iterator i=consumers.keySet().iterator();i.hasNext();) {
+            assertConsumerReceivedAtMostXMessages((MessageConsumer)i.next(), msgCount);
+        }
+    }
+
+    protected void assertEachConsumerReceivedXMessages(int msgCount) {
+        for (Iterator i=consumers.keySet().iterator();i.hasNext();) {
+            assertConsumerReceivedXMessages((MessageConsumer)i.next(), msgCount);
+        }
+    }
+
+    protected void assertTotalMessagesReceived(int msgCount) {
+        int totalMsg = 0;
+        for (Iterator i=consumers.keySet().iterator(); i.hasNext();) {
+            totalMsg += ((List)consumers.get(i.next())).size();
+        }
+
+        assertTrue("Total messages received should have been " + msgCount + ". Actual messages
received is " + totalMsg, (totalMsg == msgCount));
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java?rev=359769&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
Thu Dec 29 02:07:55 2005
@@ -0,0 +1,147 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import org.apache.activemq.JmsMultipleClientsTestSupport;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+
+public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
+    protected int messageCount  = 1000; // 1000 Messages per producer
+    protected int prefetchCount = 10;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        durable = false;
+        topic = false;
+    }
+
+    public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
+        consumerCount = 2;
+        producerCount = 1;
+        messageCount  = 1000;
+        prefetchCount = 1;
+        messageSize   = 1024; // 1 Kb
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount);
+    }
+
+    public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception
{
+        consumerCount = 2;
+        producerCount = 1;
+        messageCount  = 1000;
+        prefetchCount = messageCount * 2;
+        messageSize   = 1024; // 1 Kb
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount);
+    }
+
+    public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
+        consumerCount = 2;
+        producerCount = 1;
+        messageCount  = 10;
+        prefetchCount = 1;
+        messageSize   = 1024 * 1024 * 1; // 2 MB
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount);
+    }
+
+    public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception
{
+        consumerCount = 2;
+        producerCount = 1;
+        messageCount  = 10;
+        prefetchCount = messageCount * 2;
+        messageSize   = 1024 * 1024 * 1; // 2 MB
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount);
+    }
+
+    public void testOneProducerManyConsumersFewMessages() throws Exception {
+        consumerCount = 50;
+        producerCount = 1;
+        messageCount  = 10;
+        messageSize   = 1; // 1 byte
+        prefetchCount = 10;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount);
+    }
+
+    public void testOneProducerManyConsumersManyMessages() throws Exception {
+        consumerCount = 50;
+        producerCount = 1;
+        messageCount  = 1000;
+        messageSize   = 1; // 1 byte
+        prefetchCount = 10;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount);
+    }
+
+    public void testManyProducersOneConsumer() throws Exception {
+        consumerCount = 1;
+        producerCount = 50;
+        messageCount  = 100;
+        messageSize   = 1; // 1 byte
+        prefetchCount = 10;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount);
+    }
+
+    public void testManyProducersManyConsumers() throws Exception {
+        consumerCount = 50;
+        producerCount = 50;
+        messageCount  = 100;
+        messageSize   = 1; // 1 byte
+        prefetchCount = 10;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount);
+    }
+
+    public void doMultipleClientsTest() throws Exception {
+        // Create destination
+        final ActiveMQDestination dest = createDestination();
+
+        // Create consumers
+        ActiveMQConnectionFactory consumerFactory = (ActiveMQConnectionFactory)createConnectionFactory();
+        consumerFactory.getPrefetchPolicy().setAll(prefetchCount);
+
+        startConsumers(consumerFactory, dest);
+
+        // Wait for consumers to setup
+        Thread.sleep(500);
+
+        startProducers(dest, messageCount);
+
+        // Wait for messages to be received. Make it proportional to the messages delivered.
+        waitForAllMessagesToBeReceived((producerCount * messageCount) / 2000);
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java?rev=359769&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
Thu Dec 29 02:07:55 2005
@@ -0,0 +1,123 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+public class TopicSubscriptionTest extends QueueSubscriptionTest {
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        durable = true;
+        topic = true;
+    }
+
+    public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
+        consumerCount = 2;
+        producerCount = 1;
+        prefetchCount = 1;
+        messageSize   = 1024;
+        messageCount  = 1000;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
+    }
+
+    public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception
{
+        consumerCount = 2;
+        producerCount = 1;
+        messageCount  = 1000;
+        messageSize   = 1024;
+        prefetchCount = messageCount * 2;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
+    }
+
+    public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
+        consumerCount = 2;
+        producerCount = 1;
+        messageCount  = 10;
+        messageSize   = 1024 * 1024 * 1; // 1 MB
+        prefetchCount = 1;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
+    }
+
+    public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception
{
+        consumerCount = 2;
+        producerCount = 1;
+        messageCount  = 10;
+        messageSize   = 1024 * 1024 * 1; // 1 MB
+        prefetchCount = messageCount * 2;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
+    }
+
+    public void testOneProducerManyConsumersFewMessages() throws Exception {
+        consumerCount = 50;
+        producerCount = 1;
+        messageCount  = 10;
+        messageSize   = 1; // 1 byte
+        prefetchCount = 10;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
+    }
+
+    public void testOneProducerManyConsumersManyMessages() throws Exception {
+        consumerCount = 50;
+        producerCount = 1;
+        messageCount  = 100;
+        messageSize   = 1; // 1 byte
+        prefetchCount = 10;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
+    }
+
+
+    public void testManyProducersOneConsumer() throws Exception {
+        consumerCount = 1;
+        producerCount = 20;
+        messageCount  = 100;
+        messageSize   = 1; // 1 byte
+        prefetchCount = 10;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount * consumerCount);
+    }
+
+    public void testManyProducersManyConsumers() throws Exception {
+        consumerCount = 20;
+        producerCount = 20;
+        messageCount  = 20;
+        messageSize   = 1; // 1 byte
+        prefetchCount = 10;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount * consumerCount);
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java?rev=359769&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
Thu Dec 29 02:07:55 2005
@@ -0,0 +1,88 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.broker.QueueSubscriptionTest;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+
+public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setDispatchPolicy(new RoundRobinDispatchPolicy());
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+
+        return broker;
+    }
+
+    public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
+        super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
+
+        // Ensure that each consumer should have received at least one message
+        // We cannot guarantee that messages will be equally divided, since prefetch is one
+        assertEachConsumerReceivedAtLeastXMessages(1);
+    }
+
+    public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception
{
+        super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
+        assertMessagesDividedAmongConsumers();
+    }
+
+    public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
+        super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
+
+        // Ensure that each consumer should have received at least one message
+        // We cannot guarantee that messages will be equally divided, since prefetch is one
+        assertEachConsumerReceivedAtLeastXMessages(1);
+    }
+
+    public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception
{
+        super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
+        assertMessagesDividedAmongConsumers();
+    }
+
+    public void testOneProducerManyConsumersFewMessages() throws Exception {
+        super.testOneProducerManyConsumersFewMessages();
+
+        // Since there are more consumers, each consumer should have received at most one
message only
+        assertMessagesDividedAmongConsumers();
+    }
+
+    public void testOneProducerManyConsumersManyMessages() throws Exception {
+        super.testOneProducerManyConsumersManyMessages();
+        assertMessagesDividedAmongConsumers();
+    }
+
+    public void testManyProducersManyConsumers() throws Exception {
+        super.testManyProducersManyConsumers();
+        assertMessagesDividedAmongConsumers();
+    }
+
+    public void assertMessagesDividedAmongConsumers() {
+        assertEachConsumerReceivedAtLeastXMessages((messageCount * producerCount) / consumerCount);
+        assertEachConsumerReceivedAtMostXMessages(((messageCount * producerCount) / consumerCount)
+ 1);
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java?rev=359769&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
Thu Dec 29 02:07:55 2005
@@ -0,0 +1,76 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.broker.QueueSubscriptionTest;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class SimpleDispatchPolicyTest extends QueueSubscriptionTest {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setDispatchPolicy(new SimpleDispatchPolicy());
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+
+        return broker;
+    }
+
+    public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception
{
+        super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
+
+        // One consumer should have received all messages, and the rest none
+        assertOneConsumerReceivedAllMessages(messageCount);
+    }
+
+    public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception
{
+        super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
+
+        // One consumer should have received all messages, and the rest none
+        assertOneConsumerReceivedAllMessages(messageCount);
+    }
+
+    public void assertOneConsumerReceivedAllMessages(int messageCount) throws Exception {
+        boolean found = false;
+        for (Iterator i=consumers.keySet().iterator(); i.hasNext();) {
+            List messageList = (List)consumers.get(i.next());
+            if (messageList.size() > 0) {
+                if (found) {
+                    fail("No other consumers should have received any messages");
+                } else {
+                    assertTrue("Consumer should have received all " + messageCount + " messages.
Actual messages received is " + messageList.size(), messageList.size()==messageCount);
+                    found = true;
+                }
+            }
+        }
+
+        if (!found) {
+            fail("At least one consumer should have received all messages");
+        }
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java?rev=359769&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
Thu Dec 29 02:07:55 2005
@@ -0,0 +1,107 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TopicSubscriptionTest;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy;
+
+import java.util.List;
+import java.util.Iterator;
+
+public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setDispatchPolicy(new StrictOrderDispatchPolicy());
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+
+        return broker;
+    }
+
+    public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
+        super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
+
+        assertReceivedMessagesAreOrdered();
+    }
+
+    public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception
{
+        super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
+
+        assertReceivedMessagesAreOrdered();
+    }
+
+    public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
+        super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
+
+        assertReceivedMessagesAreOrdered();
+    }
+
+    public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception
{
+        super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
+
+        assertReceivedMessagesAreOrdered();
+    }
+
+    public void testOneProducerManyConsumersFewMessages() throws Exception {
+        super.testOneProducerManyConsumersFewMessages();
+
+        assertReceivedMessagesAreOrdered();
+    }
+
+    public void testOneProducerManyConsumersManyMessages() throws Exception {
+        super.testOneProducerManyConsumersManyMessages();
+
+        assertReceivedMessagesAreOrdered();
+    }
+
+    public void testManyProducersOneConsumer() throws Exception {
+        super.testManyProducersOneConsumer();
+
+        assertReceivedMessagesAreOrdered();
+    }
+
+    public void testManyProducersManyConsumers() throws Exception {
+        super.testManyProducersManyConsumers();
+
+        assertReceivedMessagesAreOrdered();
+    }
+
+    public void assertReceivedMessagesAreOrdered() throws Exception {
+        // If there is only one consumer, messages is definitely ordered
+        if (consumers.size() <= 1) {
+            return;
+        }
+
+        // Get basis of order
+        Iterator i = consumers.keySet().iterator();
+        List messageOrder = (List)consumers.get(i.next());
+
+        for (;i.hasNext();) {
+            List messageList = (List)consumers.get(i.next());
+            assertTrue("Messages are not ordered.", messageOrder.equals(messageList));
+        }
+    }
+}



Mime
View raw message