activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1086378 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
Date Mon, 28 Mar 2011 20:10:41 GMT
Author: tabish
Date: Mon Mar 28 20:10:41 2011
New Revision: 1086378

URL: http://svn.apache.org/viewvc?rev=1086378&view=rev
Log:
Update the test case so that its not dependent on port 61616

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=1086378&r1=1086377&r2=1086378&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
Mon Mar 28 20:10:41 2011
@@ -50,7 +50,7 @@ import static org.apache.activemq.TestSu
 public class ExpiredMessagesTest extends CombinationTestSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(ExpiredMessagesTest.class);
-    
+
     BrokerService broker;
     Connection connection;
     Session session;
@@ -60,7 +60,8 @@ public class ExpiredMessagesTest extends
     public ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
     public boolean useTextMessage = true;
     public boolean useVMCursor = true;
-    
+    protected String brokerUri;
+
     public static Test suite() {
         return suite(ExpiredMessagesTest.class);
     }
@@ -68,78 +69,79 @@ public class ExpiredMessagesTest extends
     public static void main(String[] args) {
         junit.textui.TestRunner.run(suite());
     }
-	
-	protected void setUp() throws Exception {
+
+    protected void setUp() throws Exception {
         final boolean deleteAllMessages = true;
         broker = createBroker(deleteAllMessages, 100);
+        brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
     }
-	
-	public void testExpiredMessages() throws Exception {
-		
-		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
-		connection = factory.createConnection();
-		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-		producer = session.createProducer(destination);
-		producer.setTimeToLive(100);
-		consumer = session.createConsumer(destination);
-		connection.start();
-		final AtomicLong received = new AtomicLong();
-		
-		Thread consumerThread = new Thread("Consumer Thread") {
-			public void run() {
-				long start = System.currentTimeMillis();
-				try {
-					long end = System.currentTimeMillis();
-					while (end - start < 3000) {
-						if (consumer.receive(1000) != null) {
-						    received.incrementAndGet();
-						}
-						Thread.sleep(100);
-						end = System.currentTimeMillis();
-					}
-					consumer.close();
-				} catch (Throwable ex) {
-					ex.printStackTrace();
-				}
-			}
-		};
-		
+
+    public void testExpiredMessages() throws Exception {
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
+        connection = factory.createConnection();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(destination);
+        producer.setTimeToLive(100);
+        consumer = session.createConsumer(destination);
+        connection.start();
+        final AtomicLong received = new AtomicLong();
+
+        Thread consumerThread = new Thread("Consumer Thread") {
+            public void run() {
+                long start = System.currentTimeMillis();
+                try {
+                    long end = System.currentTimeMillis();
+                    while (end - start < 3000) {
+                        if (consumer.receive(1000) != null) {
+                            received.incrementAndGet();
+                        }
+                        Thread.sleep(100);
+                        end = System.currentTimeMillis();
+                    }
+                    consumer.close();
+                } catch (Throwable ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+
         consumerThread.start();
-		
-		final int numMessagesToSend = 10000;
-		Thread producingThread = new Thread("Producing Thread") {
+
+        final int numMessagesToSend = 10000;
+        Thread producingThread = new Thread("Producing Thread") {
             public void run() {
                 try {
-                	int i = 0;
-                	while (i++ < numMessagesToSend) {
-                		producer.send(session.createTextMessage("test"));
-                	}
-                	producer.close();
+                    int i = 0;
+                    while (i++ < numMessagesToSend) {
+                        producer.send(session.createTextMessage("test"));
+                    }
+                    producer.close();
                 } catch (Throwable ex) {
                     ex.printStackTrace();
                 }
             }
-		};
-		
-		producingThread.start();
-		
+        };
+
+        producingThread.start();
+
         consumerThread.join();
         producingThread.join();
         session.close();
-        
+
         final DestinationStatistics view = getDestinationStatistics(broker, destination);
 
         // wait for all to inflight to expire
         assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
                 return view.getInflight().getCount() == 0;
-            }        
+            }
         }));
         assertEquals("Wrong inFlightCount: ", 0, view.getInflight().getCount());
-        
+
         LOG.info("Stats: received: "  + received.get() + ", enqueues: " + view.getEnqueues().getCount()
+ ", dequeues: " + view.getDequeues().getCount()
                 + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount()
+ ", expiries: " + view.getExpired().getCount());
-        
+
         // wait for all sent to get delivered and expire
         assertTrue("all sent messages expired ", Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
@@ -148,15 +150,15 @@ public class ExpiredMessagesTest extends
                 LOG.info("Stats: received: "  + received.get() + ", size= " + view.getMessages().getCount()
+ ", enqueues: " + view.getDequeues().getCount() + ", dequeues: " + view.getDequeues().getCount()
                         + ", dispatched: " + view.getDispatched().getCount() + ", inflight:
" + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount());
                 return oldEnqueues == view.getEnqueues().getCount();
-            }           
+            }
         }, 60*1000));
-        
+
 
         LOG.info("Stats: received: "  + received.get() + ", size= " + view.getMessages().getCount()
+ ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount()
                 + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount()
+ ", expiries: " + view.getExpired().getCount());
-        
+
         assertTrue("got at least what did not expire", received.get() >= view.getDequeues().getCount()
- view.getExpired().getCount());
-        
+
         assertTrue("all messages expired - queue size gone to zero " + view.getMessages().getCount(),
Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
                 LOG.info("Stats: received: "  + received.get() + ", size= " + view.getMessages().getCount()
+ ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount()
@@ -164,61 +166,61 @@ public class ExpiredMessagesTest extends
                 return view.getMessages().getCount() == 0;
             }
         }));
-        
+
         final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueues().getCount();
         final long totalExpiredCount = view.getExpired().getCount() + expiredBeforeEnqueue;
-        
+
         final DestinationStatistics dlqView = getDestinationStatistics(broker, dlqDestination);
         LOG.info("DLQ stats: size= " + dlqView.getMessages().getCount() + ", enqueues: "
+ dlqView.getDequeues().getCount() + ", dequeues: " + dlqView.getDequeues().getCount()
                 + ", dispatched: " + dlqView.getDispatched().getCount() + ", inflight: "
+ dlqView.getInflight().getCount() + ", expiries: " + dlqView.getExpired().getCount());
-        
+
         Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
                 return totalExpiredCount == dlqView.getMessages().getCount();
             }
         });
         assertEquals("dlq contains all expired", totalExpiredCount, dlqView.getMessages().getCount());
-        
+
         // memory check
         assertEquals("memory usage is back to duck egg", 0, getDestination(broker, destination).getMemoryUsage().getPercentUsage());
-        assertTrue("memory usage is increased ", 0 < getDestination(broker, dlqDestination).getMemoryUsage().getPercentUsage());
   
-        
+        assertTrue("memory usage is increased ", 0 < getDestination(broker, dlqDestination).getMemoryUsage().getPercentUsage());
+
         // verify DLQ
         MessageConsumer dlqConsumer = createDlqConsumer(connection);
         final DLQListener dlqListener = new DLQListener();
         dlqConsumer.setMessageListener(dlqListener);
-        
+
         Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
                 return totalExpiredCount == dlqListener.count;
             }
         }, 60 * 1000);
-        
+
         assertEquals("dlq returned all expired", dlqListener.count, totalExpiredCount);
-	}
+    }
 
     class DLQListener implements MessageListener {
-        
+
         int count = 0;
-        
+
         public void onMessage(Message message) {
             count++;
         }
-        
+
     };
-    
-	private MessageConsumer createDlqConsumer(Connection connection) throws Exception {
-	    return connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(dlqDestination);
+
+    private MessageConsumer createDlqConsumer(Connection connection) throws Exception {
+        return connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(dlqDestination);
     }
 
     public void initCombosForTestRecoverExpiredMessages() {
-	    addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, Boolean.FALSE});
-	}
-	
-	public void testRecoverExpiredMessages() throws Exception {
+        addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, Boolean.FALSE});
+    }
+
+    public void testRecoverExpiredMessages() throws Exception {
 
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
-                "failover://tcp://localhost:61616");
+                "failover://"+brokerUri);
         connection = factory.createConnection();
         connection.start();
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -247,7 +249,7 @@ public class ExpiredMessagesTest extends
         producingThread.join();
 
         DestinationStatistics view = getDestinationStatistics(broker, destination);
-        LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: " 
+        LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: "
                 + view.getEnqueues().getCount() + ", dequeues: "
                 + view.getDequeues().getCount() + ", dispatched: "
                 + view.getDispatched().getCount() + ", inflight: "
@@ -263,7 +265,7 @@ public class ExpiredMessagesTest extends
         LOG.info("recovering broker");
         final boolean deleteAllMessages = false;
         broker = createBroker(deleteAllMessages, 5000);
-        
+
         Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
                 DestinationStatistics view = getDestinationStatistics(broker, destination);
@@ -273,25 +275,25 @@ public class ExpiredMessagesTest extends
                         + view.getDispatched().getCount() + ", inflight: "
                         + view.getInflight().getCount() + ", expiries: "
                         + view.getExpired().getCount());
-                    
+
                 return view.getMessages().getCount() == 0;
             }
         });
-        
+
         view = getDestinationStatistics(broker, destination);
         assertEquals("Expect empty queue, QueueSize: ", 0, view.getMessages().getCount());
         assertEquals("all dequeues were expired", view.getDequeues().getCount(), view.getExpired().getCount());
     }
 
-	private BrokerService createBroker(boolean deleteAllMessages, long expireMessagesPeriod)
throws Exception {
-	    BrokerService broker = new BrokerService();
+    private BrokerService createBroker(boolean deleteAllMessages, long expireMessagesPeriod)
throws Exception {
+        BrokerService broker = new BrokerService();
         broker.setBrokerName("localhost");
         broker.setDestinations(new ActiveMQDestination[]{destination});
         AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
         adaptor.setDirectory(new File("target/expiredtest-data/"));
         adaptor.setForceRecoverReferenceStore(true);
         broker.setPersistenceAdapter(adaptor);
-        
+
         PolicyEntry defaultPolicy = new PolicyEntry();
         if (useVMCursor) {
             defaultPolicy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
@@ -302,17 +304,17 @@ public class ExpiredMessagesTest extends
         policyMap.setDefaultEntry(defaultPolicy);
         broker.setDestinationPolicy(policyMap);
         broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
-        broker.addConnector("tcp://localhost:61616");
+        broker.addConnector("tcp://localhost:0");
         broker.start();
         broker.waitUntilStarted();
         return broker;
-	}
-    
-    
-
-	protected void tearDown() throws Exception {
-		connection.stop();
-		broker.stop();
-		broker.waitUntilStopped();
-	}
+    }
+
+
+
+    protected void tearDown() throws Exception {
+        connection.stop();
+        broker.stop();
+        broker.waitUntilStopped();
+    }
 }



Mime
View raw message