activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r679849 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java
Date Fri, 25 Jul 2008 16:13:20 GMT
Author: chirino
Date: Fri Jul 25 09:13:20 2008
New Revision: 679849

URL: http://svn.apache.org/viewvc?rev=679849&view=rev
Log:
Updating testcase.. this new version fails very fast now.

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java?rev=679849&r1=679848&r2=679849&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java Fri Jul
25 09:13:20 2008
@@ -30,6 +30,8 @@
 import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -58,6 +60,15 @@
     protected void setUp() throws Exception {
         // Start an embedded broker up.
         brokerService = new BrokerService();
+        brokerService.deleteAllMessages();
+        
+        // A small max page size makes this issue occur faster.
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry pe = new PolicyEntry();
+        pe.setMaxPageSize(1);
+        policyMap.put(new ActiveMQQueue(">"), pe);
+        brokerService.setDestinationPolicy(policyMap);
+        
         brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
         brokerService.start();
         destination = new ActiveMQQueue(getName());
@@ -68,52 +79,65 @@
         // Stop any running threads.
         shutdown.set(true);
         for (Thread t : threads) {
+            t.interrupt();
             t.join();
         }        
         brokerService.stop();
     }
 
     // Failing
-    public void testConsumerSlowDownPrefetch0() throws InterruptedException {
+    public void testConsumerSlowDownPrefetch0() throws Exception {
         ACTIVEMQ_BROKER_URI = "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=0";
         doTestConsumerSlowDown();
     }
 
     // Failing
-    public void testConsumerSlowDownPrefetch10() throws InterruptedException {
+    public void testConsumerSlowDownPrefetch10() throws Exception {
         ACTIVEMQ_BROKER_URI = "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=10";
         doTestConsumerSlowDown();
     }
     
     // Passing
-    public void testConsumerSlowDownDefaultPrefetch() throws InterruptedException {
+    public void testConsumerSlowDownDefaultPrefetch() throws Exception {
         ACTIVEMQ_BROKER_URI = "tcp://localhost:61616";
         doTestConsumerSlowDown();
     }
 
-    public void doTestConsumerSlowDown() throws InterruptedException {
-        ProducerThread p1 = new ProducerThread("Producer-1");
-        threads.add(p1);
-        p1.start();
+    public void doTestConsumerSlowDown() throws Exception {
         
-        // Wait a bit before starting the consumers to load up the queues a bit..
-        // If the queue is loaded up it seems that the even the Default Prefetch size case
fails.
-        Thread.sleep(10000);
-
+        // Preload the queue.
+        produce(20000);
+        
+        Thread producer = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    while(!shutdown.get()) {
+                        produce(1000);
+                    }
+                } catch (Exception e) {
+                }
+            }
+        };
+        threads.add(producer);
+        producer.start();
+        
+        // This is the slow consumer.
         ConsumerThread c1 = new ConsumerThread("Consumer-1");
         threads.add(c1);
         c1.start();
 
+        // Wait a bit so that the slow consumer gets assigned most of the messages.
+        Thread.sleep(500);
         ConsumerThread c2 = new ConsumerThread("Consumer-2");
         threads.add(c2);
         c2.start();
 
         for ( int i=0; i < 30; i++) {
             Thread.sleep(1000);
-            long p1Counter = p1.counter.getAndSet(0);
             long c1Counter = c1.counter.getAndSet(0);
             long c2Counter = c2.counter.getAndSet(0);
-            System.out.println("p1: "+p1Counter+", c1: "+c1Counter+", c2: "+c2Counter);
+            System.out.println("c1: "+c1Counter+", c2: "+c2Counter);
             
             // Once message have been flowing for a few seconds, start asserting that c2
always gets messages.  It should be receiving about 100 / sec
             if( i > 3 ) {
@@ -122,46 +146,30 @@
         }
     }    
     
-    public class ProducerThread extends Thread {
-        final AtomicLong counter = new AtomicLong();
-        
-        public ProducerThread(String threadId) {
-            super(threadId);
-        }
-
-        public void run() {
-            Connection connection=null;
+    public void produce(int count) throws Exception {
+        Connection connection=null;
+        try {
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI);
+            factory.setDispatchAsync(true);
+            
+            connection = factory.createConnection();
+            
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(destination);
+            connection.start();
+            
+            for( int i=0 ; i< count; i++ ) {
+                producer.send(session.createTextMessage(getName()+" Message "+(++i)));
+            }
+            
+        } finally {
             try {
-                log.debug(getName() + ": is running");
-                
-                ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI);
-                factory.setDispatchAsync(true);
-                
-                connection = factory.createConnection();
-                
-                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                MessageProducer producer = session.createProducer(destination);
-                connection.start();
-                
-                int i = 0;
-                while (!shutdown.get()) {
-                    producer.send(session.createTextMessage(getName()+" Message "+(++i)));
-                    counter.incrementAndGet();
-                    Thread.sleep(1);
-                }
-                
-            } catch (Exception e) {
-                e.printStackTrace();
-            } finally {
-                try {
-                    connection.close();
-                } catch (Throwable e) {
-                }
+                connection.close();
+            } catch (Throwable e) {
             }
         }
-
     }
-
+    
     public class ConsumerThread extends Thread {
         final AtomicLong counter = new AtomicLong();
 
@@ -188,7 +196,7 @@
                     if ( msg!=null ) {
                         int sleepingTime;
                         if (getName().equals("Consumer-1")) {
-                            sleepingTime = 10 * 1000;
+                            sleepingTime = 1000 * 1000;
                         } else {
                             sleepingTime = 1; 
                         }
@@ -198,8 +206,8 @@
                 }
                 
             } catch (Exception e) {
-                e.printStackTrace();
             } finally {
+                log.debug(getName() + ": is stopping");
                 try {
                     connection.close();
                 } catch (Throwable e) {



Mime
View raw message