activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1174734 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
Date Fri, 23 Sep 2011 13:13:33 GMT
Author: tabish
Date: Fri Sep 23 13:13:33 2011
New Revision: 1174734

URL: http://svn.apache.org/viewvc?rev=1174734&view=rev
Log:
Remove the dependency on the fixed 61616 port number

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java?rev=1174734&r1=1174733&r2=1174734&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java Fri
Sep 23 13:13:33 2011
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * This is a test case for the issue reported at:
- * https://issues.apache.org/activemq/browse/AMQ-2021 
+ * https://issues.apache.org/activemq/browse/AMQ-2021
  * Bug is modification of inflight message properties so the failure can manifest itself
in a bunch
  * or ways, from message receipt with null properties to marshall errors
  */
@@ -53,25 +53,26 @@ public class AMQ2021Test extends TestCas
     BrokerService brokerService;
     ArrayList<Thread> threads = new ArrayList<Thread>();
     Vector<Throwable> exceptions;
-    
+
     AMQ2021Test testCase;
-    
-    String ACTIVEMQ_BROKER_BIND = "tcp://localhost:61616";    
-    String ACTIVEMQ_BROKER_URL = ACTIVEMQ_BROKER_BIND + "?jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0";
-    
-    private int numMessages = 1000;
-    private int numConsumers = 2;
-    private int dlqMessages = numMessages/2;
-    
-    CountDownLatch receivedLatch;
+
+    private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0";
+    private String CONSUMER_BROKER_URL = "?jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0";
+    private String PRODUCER_BROKER_URL;
+
+    private final int numMessages = 1000;
+    private final int numConsumers = 2;
+    private final int dlqMessages = numMessages/2;
+
+    private CountDownLatch receivedLatch;
     private ActiveMQTopic destination;
-    public CountDownLatch started;
+    private CountDownLatch started;
 
     @Override
     protected void setUp() throws Exception {
         Thread.setDefaultUncaughtExceptionHandler(this);
         testCase = this;
-        
+
         // Start an embedded broker up.
         brokerService = new BrokerService();
         brokerService.setDeleteAllMessagesOnStartup(true);
@@ -79,18 +80,21 @@ public class AMQ2021Test extends TestCas
         brokerService.start();
         destination = new ActiveMQTopic(getName());
         exceptions = new Vector<Throwable>();
-        
-        receivedLatch = 
+
+        CONSUMER_BROKER_URL = brokerService.getTransportConnectors().get(0).getPublishableConnectString()
+ CONSUMER_BROKER_URL;
+        PRODUCER_BROKER_URL = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
+
+        receivedLatch =
             new CountDownLatch(numConsumers * (numMessages + dlqMessages));
         started = new CountDownLatch(1);
     }
-    
+
     @Override
     protected void tearDown() throws Exception {
         for (Thread t : threads) {
             t.interrupt();
             t.join();
-        }        
+        }
         brokerService.stop();
     }
 
@@ -101,9 +105,9 @@ public class AMQ2021Test extends TestCas
             threads.add(c1);
             c1.start();
         }
-        
+
         assertTrue(started.await(10, TimeUnit.SECONDS));
-        
+
         Thread producer = new Thread() {
             @Override
             public void run() {
@@ -115,34 +119,34 @@ public class AMQ2021Test extends TestCas
         };
         threads.add(producer);
         producer.start();
-        
-        boolean allGood = receivedLatch.await(30, TimeUnit.SECONDS);
+
+        boolean allGood = receivedLatch.await(90, TimeUnit.SECONDS);
         for (Throwable t: exceptions) {
             log.error("failing test with first exception", t);
             fail("exception during test : " + t);
-        }        
+        }
         assertTrue("excepted messages received within time limit", allGood);
-        
+
         assertEquals(0, exceptions.size());
-    
+
         for (int i=0; i<numConsumers; i++) {
             // last recovery sends message to deq so is not received again
             assertEquals(dlqMessages*2, ((ConsumerThread)threads.get(i)).recoveries);
             assertEquals(numMessages + dlqMessages, ((ConsumerThread)threads.get(i)).counter);
         }
-       
+
         // half of the messages for each consumer should go to the dlq but duplicates will
         // be suppressed
         consumeFromDLQ(dlqMessages);
-        
-    }    
-    
+
+    }
+
     private void consumeFromDLQ( int messageCount) throws Exception {
-        ActiveMQConnectionFactory connectionFactory = 
-            new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URL);
+        ActiveMQConnectionFactory connectionFactory =
+            new ActiveMQConnectionFactory(CONSUMER_BROKER_URL);
         Connection connection = connectionFactory.createConnection();
         connection.start();
-                    
+
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
         int count = 0;
@@ -158,19 +162,19 @@ public class AMQ2021Test extends TestCas
     public void produce(int count) throws Exception {
         Connection connection=null;
         try {
-            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_BIND);
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(PRODUCER_BROKER_URL);
             connection = factory.createConnection();
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             MessageProducer producer = session.createProducer(destination);
             producer.setTimeToLive(0);
             connection.start();
-            
+
             for (int i=0 ; i< count; i++) {
                 int id = i+1;
                 TextMessage message = session.createTextMessage(getName()+" Message "+ id);
                 message.setIntProperty("MsgNumber", id);
                 producer.send(message);
-                
+
                 if (id % 500 == 0) {
                     log.info("sent " + id + ", ith " + message);
                 }
@@ -187,7 +191,7 @@ public class AMQ2021Test extends TestCas
             }
         }
     }
-    
+
     public class ConsumerThread extends Thread implements MessageListener   {
         public long counter = 0;
         public long recoveries = 0;
@@ -199,24 +203,24 @@ public class AMQ2021Test extends TestCas
 
         public void run() {
             try {
-                ActiveMQConnectionFactory connectionFactory = 
-                    new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URL);
+                ActiveMQConnectionFactory connectionFactory =
+                    new ActiveMQConnectionFactory(CONSUMER_BROKER_URL);
                 Connection connection = connectionFactory.createConnection();
                 connection.setExceptionListener(testCase);
-                connection.setClientID(getName());            
+                connection.setClientID(getName());
                 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-                MessageConsumer consumer = session.createDurableSubscriber(destination, getName());
                           
+                MessageConsumer consumer = session.createDurableSubscriber(destination, getName());
                 consumer.setMessageListener(this);
                 connection.start();
-                
+
                 started .countDown();
-            
+
             } catch (JMSException exception) {
                 log.error("unexpected ex in consumer run", exception);
                 exceptions.add(exception);
             }
         }
-        
+
         public void onMessage(Message message) {
             try {
                 counter++;
@@ -226,8 +230,8 @@ public class AMQ2021Test extends TestCas
                     recoveries++;
                 } else {
                     message.acknowledge();
-                }     
-                
+                }
+
                 if (counter % 200 == 0) {
                     log.info("recoveries:" + recoveries + ", Received " + counter + ", counter'th
" + message);
                 }
@@ -237,7 +241,7 @@ public class AMQ2021Test extends TestCas
                 exceptions.add(e);
             }
         }
-        
+
     }
 
     public void onException(JMSException exception) {



Mime
View raw message