activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r360195 - in /incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq: ./ broker/ broker/policy/ test/retroactive/ transport/peer/ util/
Date Fri, 30 Dec 2005 23:25:33 GMT
Author: chirino
Date: Fri Dec 30 15:25:03 2005
New Revision: 360195

URL: http://svn.apache.org/viewcvs?rev=360195&view=rev
Log:
Depending on the test configuration parameters, it was possible to get an OutOfMemory error.
 The causes were:
 - The MessageList was holding on to all the messages being consumed, changed this so that
it only holds on to the messageIds
 - Was using a non persistent broker, but was sending it persistent messages, in the topic
case, he holds on to the messages in a memory based message store.  By default we now send
non persistent messages.

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageIdList.java
      - copied, changed from r360183, incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageList.java
Removed:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageList.java
Modified:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerTestWithSimpleMessageListTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java?rev=360195&r1=360194&r2=360195&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
Fri Dec 30 15:25:03 2005
@@ -19,7 +19,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.MessageList;
+import org.apache.activemq.util.MessageIdList;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerFactory;
 
@@ -53,11 +53,12 @@
     protected boolean useConcurrentSend = true;
     protected boolean durable = false;
     protected boolean topic = false;
+    protected boolean persistent = false;
 
     protected BrokerService broker;
     protected Destination destination;
     protected List connections = Collections.synchronizedList(new ArrayList());
-    protected MessageList allMessagesList = new MessageList();
+    protected MessageIdList allMessagesList = new MessageIdList();
 
     protected void startProducers(Destination dest, int msgCount) throws Exception {
         startProducers(createConnectionFactory(), dest, msgCount);
@@ -108,7 +109,8 @@
 
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer producer = session.createProducer(destination);
-
+        producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+        
         for (int i = 0; i < count; i++) {
             TextMessage msg = createTextMessage(session, "" + i);
             producer.send(msg);
@@ -149,7 +151,7 @@
             } else {
                 consumer = createMessageConsumer(factory.createConnection(), dest);
             }
-            MessageList list = new MessageList();
+            MessageIdList list = new MessageIdList();
             list.setParent(allMessagesList);
             consumer.setMessageListener(list);
             consumers.put(consumer, list);
@@ -222,18 +224,18 @@
      * Some helpful assertions for multiple consumers.
      */
     protected void assertConsumerReceivedAtLeastXMessages(MessageConsumer consumer, int msgCount)
{
-        MessageList messageList = (MessageList)consumers.get(consumer);
-        messageList.assertAtLeastMessagesReceived(msgCount);
+        MessageIdList messageIdList = (MessageIdList)consumers.get(consumer);
+        messageIdList.assertAtLeastMessagesReceived(msgCount);
     }
 
     protected void assertConsumerReceivedAtMostXMessages(MessageConsumer consumer, int msgCount)
{
-        MessageList messageList = (MessageList)consumers.get(consumer);
-        messageList.assertAtMostMessagesReceived(msgCount);
+        MessageIdList messageIdList = (MessageIdList)consumers.get(consumer);
+        messageIdList.assertAtMostMessagesReceived(msgCount);
     }
 
     protected void assertConsumerReceivedXMessages(MessageConsumer consumer, int msgCount)
{
-        MessageList messageList = (MessageList)consumers.get(consumer);
-        messageList.assertMessagesReceivedNoWait(msgCount);
+        MessageIdList messageIdList = (MessageIdList)consumers.get(consumer);
+        messageIdList.assertMessagesReceivedNoWait(msgCount);
     }
 
     protected void assertEachConsumerReceivedAtLeastXMessages(int msgCount) {
@@ -260,8 +262,8 @@
         // now lets count the individual messages received 
         int totalMsg = 0;
         for (Iterator i=consumers.keySet().iterator(); i.hasNext();) {
-            MessageList messageList = (MessageList)consumers.get(i.next());
-            totalMsg += messageList.getMessageCount();
+            MessageIdList messageIdList = (MessageIdList)consumers.get(i.next());
+            totalMsg += messageIdList.getMessageCount();
         }
         assertEquals("Total of consumers message count", msgCount, totalMsg);
     }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java?rev=360195&r1=360194&r2=360195&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
Fri Dec 30 15:25:03 2005
@@ -24,36 +24,36 @@
         topic = true;
     }
 
-    public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
+    public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
         consumerCount = 2;
         producerCount = 1;
+        messageCount  = 100;
+        messageSize   = 1024 * 1024 * 1; // 1 MB
         prefetchCount = 1;
-        messageSize   = 1024;
-        messageCount  = 1000;
 
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
     }
 
-    public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception
{
+    public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
         consumerCount = 2;
         producerCount = 1;
-        messageCount  = 1000;
+        prefetchCount = 1;
         messageSize   = 1024;
-        prefetchCount = messageCount * 2;
+        messageCount  = 1000;
 
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
     }
 
-    public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
+    public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception
{
         consumerCount = 2;
         producerCount = 1;
-        messageCount  = 10;
-        messageSize   = 1024 * 1024 * 1; // 1 MB
-        prefetchCount = 1;
+        messageCount  = 1000;
+        messageSize   = 1024;
+        prefetchCount = messageCount * 2;
 
         doMultipleClientsTest();
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java?rev=360195&r1=360194&r2=360195&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
Fri Dec 30 15:25:03 2005
@@ -21,7 +21,7 @@
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
-import org.apache.activemq.util.MessageList;
+import org.apache.activemq.util.MessageIdList;
 
 import java.util.Iterator;
 import java.util.List;
@@ -59,8 +59,8 @@
     public void assertOneConsumerReceivedAllMessages(int messageCount) throws Exception {
         boolean found = false;
         for (Iterator i=consumers.keySet().iterator(); i.hasNext();) {
-            MessageList messageList = (MessageList)consumers.get(i.next());
-            int count = messageList.getMessageCount();
+            MessageIdList messageIdList = (MessageIdList)consumers.get(i.next());
+            int count = messageIdList.getMessageCount();
             if (count > 0) {
                 if (found) {
                     fail("No other consumers should have received any messages");

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java?rev=360195&r1=360194&r2=360195&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
Fri Dec 30 15:25:03 2005
@@ -21,7 +21,7 @@
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy;
-import org.apache.activemq.util.MessageList;
+import org.apache.activemq.util.MessageIdList;
 
 import java.util.List;
 import java.util.Iterator;
@@ -42,20 +42,20 @@
         return broker;
     }
 
-    public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
-        super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
+    public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
+        super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
 
         assertReceivedMessagesAreOrdered();
     }
 
-    public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception
{
-        super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
+    public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
+        super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
 
         assertReceivedMessagesAreOrdered();
     }
 
-    public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
-        super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
+    public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception
{
+        super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
 
         assertReceivedMessagesAreOrdered();
     }
@@ -98,11 +98,11 @@
 
         // Get basis of order
         Iterator i = consumers.keySet().iterator();
-        MessageList messageOrder = (MessageList)consumers.get(i.next());
+        MessageIdList messageOrder = (MessageIdList)consumers.get(i.next());
 
         for (;i.hasNext();) {
-            MessageList messageList = (MessageList)consumers.get(i.next());
-            assertTrue("Messages are not ordered.", messageOrder.equals(messageList));
+            MessageIdList messageIdList = (MessageIdList)consumers.get(i.next());
+            assertTrue("Messages are not ordered.", messageOrder.equals(messageIdList));
         }
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerTestWithSimpleMessageListTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerTestWithSimpleMessageListTest.java?rev=360195&r1=360194&r2=360195&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerTestWithSimpleMessageListTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerTestWithSimpleMessageListTest.java
Fri Dec 30 15:25:03 2005
@@ -19,7 +19,7 @@
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.MessageList;
+import org.apache.activemq.util.MessageIdList;
 import org.apache.activemq.xbean.BrokerFactoryBean;
 import org.springframework.core.io.ClassPathResource;
 
@@ -60,7 +60,7 @@
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
         MessageConsumer consumer = session.createConsumer(destination);
-        MessageList listener = new MessageList();
+        MessageIdList listener = new MessageIdList();
         consumer.setMessageListener(listener);
         listener.waitForMessagesToArrive(messageCount);
         listener.assertMessagesReceived(messageCount);

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java?rev=360195&r1=360194&r2=360195&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
Fri Dec 30 15:25:03 2005
@@ -19,7 +19,7 @@
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.MessageList;
+import org.apache.activemq.util.MessageIdList;
 import org.apache.activemq.xbean.BrokerFactoryBean;
 import org.springframework.core.io.ClassPathResource;
 
@@ -49,7 +49,7 @@
         connection.start();
 
         MessageConsumer consumer = session.createConsumer(destination);
-        MessageList listener = new MessageList();
+        MessageIdList listener = new MessageIdList();
         listener.setVerbose(true);
         consumer.setMessageListener(listener);
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java?rev=360195&r1=360194&r2=360195&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java
Fri Dec 30 15:25:03 2005
@@ -20,7 +20,7 @@
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.MessageList;
+import org.apache.activemq.util.MessageIdList;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -47,13 +47,13 @@
     protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
     protected MessageProducer[] producers;
     protected Connection[] connections;
-    protected MessageList messageList[];
+    protected MessageIdList messageIdList[];
 
     protected void setUp() throws Exception {
         
         connections = new Connection[NUMBER_IN_CLUSTER];
         producers = new MessageProducer[NUMBER_IN_CLUSTER];
-        messageList = new MessageList[NUMBER_IN_CLUSTER];
+        messageIdList = new MessageIdList[NUMBER_IN_CLUSTER];
         Destination destination = createDestination();
 
         String root = System.getProperty("activemq.store.dir");
@@ -67,8 +67,8 @@
             producers[i] = session.createProducer(destination);
             producers[i].setDeliveryMode(deliveryMode);
             MessageConsumer consumer = createMessageConsumer(session, destination);
-            messageList[i] = new MessageList();
-            consumer.setMessageListener(messageList[i]);
+            messageIdList[i] = new MessageIdList();
+            consumer.setMessageListener(messageIdList[i]);
         }
         System.out.println("Sleeping to ensure cluster is fully connected");
         Thread.sleep(10000);
@@ -120,7 +120,7 @@
         }
         
         for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
-            messageList[i].assertMessagesReceived(expectedReceiveCount());
+            messageIdList[i].assertMessagesReceived(expectedReceiveCount());
         }
     }
     

Copied: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageIdList.java
(from r360183, incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageList.java)
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageIdList.java?p2=incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageIdList.java&p1=incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageList.java&r1=360183&r2=360195&rev=360195&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageList.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageIdList.java
Fri Dec 30 15:25:03 2005
@@ -16,14 +16,13 @@
  */
 package org.apache.activemq.util;
 
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.TextMessage;
-
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
 import junit.framework.Assert;
 
 /**
@@ -38,38 +37,38 @@
  * 
  * @version $Revision: 1.6 $
  */
-public class MessageList extends Assert implements MessageListener {
-    private List messages = new ArrayList();
+public class MessageIdList extends Assert implements MessageListener {
+    private List messageIds = new ArrayList();
     private Object semaphore;
     private boolean verbose;
     private MessageListener parent;
     private long maximumDuration = 15000L;
 
-    public MessageList() {
+    public MessageIdList() {
         this(new Object());
     }
 
-    public MessageList(Object semaphore) {
+    public MessageIdList(Object semaphore) {
         this.semaphore = semaphore;
     }
 
     public boolean equals(Object that) {
-        if (that instanceof MessageList) {
-            MessageList thatList = (MessageList) that;
-            return getMessages().equals(thatList.getMessages());
+        if (that instanceof MessageIdList) {
+            MessageIdList thatList = (MessageIdList) that;
+            return getMessageIds().equals(thatList.getMessageIds());
         }
         return false;
     }
 
     public int hashCode() {
         synchronized (semaphore) {
-            return messages.hashCode() + 1;
+            return messageIds.hashCode() + 1;
         }
     }
 
     public String toString() {
         synchronized (semaphore) {
-            return messages.toString();
+            return messageIds.toString();
         }
     }
 
@@ -78,31 +77,15 @@
      */
     public List flushMessages() {
         synchronized (semaphore) {
-            List answer = new ArrayList(messages);
-            messages.clear();
+            List answer = new ArrayList(messageIds);
+            messageIds.clear();
             return answer;
         }
     }
 
-    public synchronized List getMessages() {
+    public synchronized List getMessageIds() {
         synchronized (semaphore) {
-            return new ArrayList(messages);
-        }
-    }
-
-    public synchronized List getTextMessages() {
-        synchronized (semaphore) {
-            ArrayList l = new ArrayList();
-            for (Iterator iter = messages.iterator(); iter.hasNext();) {
-                try {
-                    TextMessage m = (TextMessage) iter.next();
-                    l.add(m.getText());
-                }
-                catch (Throwable e) {
-                    l.add("" + e);
-                }
-            }
-            return l;
+            return new ArrayList(messageIds);
         }
     }
 
@@ -110,18 +93,24 @@
         if (parent != null) {
             parent.onMessage(message);
         }
-        synchronized (semaphore) {
-            messages.add(message);
-            semaphore.notifyAll();
-        }
-        if (verbose) {
-            System.out.println("###Êreceived message: " + message);
+        String id=null;
+        try {
+            id = message.getJMSMessageID();
+            synchronized (semaphore) {
+                messageIds.add(id);
+                semaphore.notifyAll();
+            }
+            if (verbose) {
+                System.out.println("###Êreceived message: " + message);
+            }
+        } catch (JMSException e) {
+            e.printStackTrace();
         }
     }
 
     public int getMessageCount() {
         synchronized (semaphore) {
-            return messages.size();
+            return messageIds.size();
         }
     }
 



Mime
View raw message