activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r678865 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java
Date Tue, 22 Jul 2008 19:21:43 GMT
Author: chirino
Date: Tue Jul 22 12:21:43 2008
New Revision: 678865

URL: http://svn.apache.org/viewvc?rev=678865&view=rev
Log:
Updated test case to show that it fails with even the default prefetch size.

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java?rev=678865&r1=678864&r2=678865&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java Tue Jul
22 12:21:43 2008
@@ -16,25 +16,20 @@
  */
 package org.apache.activemq.bugs;
 
-import junit.framework.TestCase;
-
 import java.util.ArrayList;
-import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
-import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -43,8 +38,8 @@
  * This is a test case for the issue reported at:
  * https://issues.apache.org/activemq/browse/AMQ-1866
  * 
- * If you have a JMS producer sending messages to multiple consumers and 
- * you have a low prefetch, eventually all consumers will run as slow as 
+ * If you have a JMS producer sending messages to multiple fast consumers and 
+ * one slow consumer, eventually all consumers will run as slow as 
  * the slowest consumer.  
  */
 public class AMQ1866 extends TestCase {
@@ -55,9 +50,9 @@
     
     String ACTIVEMQ_BROKER_BIND = "tcp://localhost:61616";    
     String ACTIVEMQ_BROKER_URI = "tcp://localhost:61616";
-    String REQUEST_QUEUE = "provider.queue";
     
     AtomicBoolean shutdown = new AtomicBoolean();
+    private ActiveMQQueue destination;
 
     @Override
     protected void setUp() throws Exception {
@@ -65,6 +60,7 @@
         brokerService = new BrokerService();
         brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
         brokerService.start();
+        destination = new ActiveMQQueue(getName());
     }
     
     @Override
@@ -96,15 +92,21 @@
     }
 
     public void doTestConsumerSlowDown() throws InterruptedException {
-        ConsumerThread c1 = new ConsumerThread("Consumer-1");
-        ConsumerThread c2 = new ConsumerThread("Consumer-2");
         ProducerThread p1 = new ProducerThread("Producer-1");
-        threads.add(c1);
-        threads.add(c2);
         threads.add(p1);
+        p1.start();
+        
+        // Wait a bit before starting the consumers to load up the queues a bit..
+        // If the queue is loaded up it seems that the even the Default Prefetch size case
fails.
+        Thread.sleep(10000);
+
+        ConsumerThread c1 = new ConsumerThread("Consumer-1");
+        threads.add(c1);
         c1.start();
+
+        ConsumerThread c2 = new ConsumerThread("Consumer-2");
+        threads.add(c2);
         c2.start();
-        p1.start();
 
         for ( int i=0; i < 30; i++) {
             Thread.sleep(1000);
@@ -114,7 +116,7 @@
             System.out.println("p1: "+p1Counter+", c1: "+c1Counter+", c2: "+c2Counter);
             
             // Once message have been flowing for a few seconds, start asserting that c2
always gets messages.  It should be receiving about 100 / sec
-            if( i > 2 ) {
+            if( i > 3 ) {
                 assertTrue("Consumer 2 should be receiving new messages every second.", c2Counter
> 0);
             }
         }
@@ -134,19 +136,18 @@
                 
                 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI);
                 factory.setDispatchAsync(true);
-                Destination requestDestination = new ActiveMQQueue(REQUEST_QUEUE);
                 
                 connection = factory.createConnection();
                 
                 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                MessageProducer producer = session.createProducer(requestDestination);
+                MessageProducer producer = session.createProducer(destination);
                 connection.start();
                 
                 int i = 0;
                 while (!shutdown.get()) {
                     producer.send(session.createTextMessage(getName()+" Message "+(++i)));
                     counter.incrementAndGet();
-                    Thread.sleep(10);
+                    Thread.sleep(1);
                 }
                 
             } catch (Exception e) {
@@ -175,15 +176,13 @@
                 
                 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI);
                 factory.setDispatchAsync(true);
-                Destination requestDestination = new ActiveMQQueue(REQUEST_QUEUE);
                 
                 connection = factory.createConnection();
                 
                 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                MessageConsumer consumer = session.createConsumer(requestDestination);
+                MessageConsumer consumer = session.createConsumer(destination);
                 connection.start();
                 
-                int i = 0;
                 while (!shutdown.get()) {
                     TextMessage msg = (TextMessage)consumer.receive(1000);
                     if ( msg!=null ) {
@@ -191,7 +190,7 @@
                         if (getName().equals("Consumer-1")) {
                             sleepingTime = 10 * 1000;
                         } else {
-                            sleepingTime = 10; 
+                            sleepingTime = 1; 
                         }
                         Thread.sleep(sleepingTime);
                         counter.incrementAndGet();



Mime
View raw message