activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1028277 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
Date Thu, 28 Oct 2010 12:36:36 GMT
Author: gtully
Date: Thu Oct 28 12:36:35 2010
New Revision: 1028277

URL: http://svn.apache.org/viewvc?rev=1028277&view=rev
Log:
verify that all journal files are reclaimed after duplicate messages with concurrentStoreAndDispatchQueues=false,
durable subs must be unsubscribed also https://issues.apache.org/activemq/browse/AMQ-2584

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java?rev=1028277&r1=1028276&r2=1028277&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
Thu Oct 28 12:36:35 2010
@@ -16,19 +16,21 @@
  */
 package org.apache.activemq.bugs;
 
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.Arrays;
 import java.util.Properties;
+import java.util.Vector;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.atomic.AtomicLong;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-
-import junit.framework.Test;
-
+import javax.jms.TopicSubscriber;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
@@ -53,95 +55,134 @@ public class AMQ2584ConcurrentDlqTest ex
     ActiveMQTopic topic;
 
     ActiveMQConnection consumerConnection = null, producerConnection = null, dlqConnection
= null;
+    Session consumerSession;
     Session producerSession;
     MessageProducer producer;
-    final int minPercentUsageForStore = 10;
+    Vector<TopicSubscriber> duralbeSubs = new Vector<TopicSubscriber>();
     final int numMessages = 1000;
+    final int numDurableSubs = 2;
 
     String data;
+    private long dlqConsumerLastReceivedTimeStamp;
+    private AtomicLong dlqReceivedCount = new AtomicLong(0);
+
+    // 2 deliveries of each message, 3 producers
+    CountDownLatch redeliveryConsumerLatch = new CountDownLatch(((2 * numMessages) * numDurableSubs)
- 1);
+    // should get at least numMessages, possibly more
+    CountDownLatch dlqConsumerLatch = new CountDownLatch((numMessages - 1));
 
     public void testSize() throws Exception {
-        CountDownLatch redeliveryConsumerLatch = new CountDownLatch(((2*numMessages) *3)
-1);
-        CountDownLatch dlqConsumerLatch = new CountDownLatch((numMessages) -1);
         openConsumer(redeliveryConsumerLatch);
         openDlqConsumer(dlqConsumerLatch);
-               
+
 
         assertEquals(0, broker.getAdminView().getStorePercentUsage());
 
         for (int i = 0; i < numMessages; i++) {
             sendMessage(false);
         }
-        
+
         final BrokerView brokerView = broker.getAdminView();
 
         broker.getSystemUsage().getStoreUsage().isFull();
-        LOG.info("store percent usage: "+brokerView.getStorePercentUsage());
-        //assertTrue("some store in use", broker.getAdminView().getStorePercentUsage() >
minPercentUsageForStore);
-        assertTrue("redelivery consumer got all it needs", redeliveryConsumerLatch.await(60,
TimeUnit.SECONDS));
+        LOG.info("store percent usage: " + brokerView.getStorePercentUsage());
+        assertTrue("redelivery consumer got all it needs, remaining: "
+                + redeliveryConsumerLatch.getCount(), redeliveryConsumerLatch.await(60, TimeUnit.SECONDS));
         assertTrue("dql  consumer got all it needs", dlqConsumerLatch.await(60, TimeUnit.SECONDS));
         closeConsumer();
 
         LOG.info("Giving dlq a chance to clear down once topic consumer is closed");
+
+        // consumer all of the duplicates that arrived after the first ack
+        closeDlqConsumer();
+
         //get broker a chance to clean obsolete messages, wait 2*cleanupInterval
         Thread.sleep(5000);
 
-        // consumer some of the duplicates that arrived after the first ack
-        closeDlqConsumer();
-        int numFiles = ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).getDirectory().list().length;
+        FilenameFilter justLogFiles = new FilenameFilter() {
+            public boolean accept(File file, String s) {
+                return s.endsWith(".log");
+            }
+        };
+        int numFiles = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getDirectory().list(justLogFiles).length;
+        if (numFiles > 2) {
+            LOG.info(Arrays.toString(((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getDirectory().list(justLogFiles)));
+        }
         LOG.info("num files: " + numFiles);
-        assertTrue("kahaDB dir should contain few db files,but definitely less than 10, is:
" + numFiles,10>numFiles);
-		}
-       
+        assertEquals("kahaDB dir should contain 1 db file,is: " + numFiles, 1, numFiles);
+    }
 
     private void openConsumer(final CountDownLatch latch) throws Exception {
         consumerConnection = (ActiveMQConnection) createConnection();
         consumerConnection.setClientID("cliID");
         consumerConnection.start();
-        final Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
         MessageListener listener = new MessageListener() {
             public void onMessage(Message message) {
                 latch.countDown();
                 try {
-                    session.recover();
+                    consumerSession.recover();
                 } catch (Exception ignored) {
                     ignored.printStackTrace();
                 }
-
             }
         };
 
-        session.createDurableSubscriber(topic, "subName1").setMessageListener(listener);
-        session.createDurableSubscriber(topic, "subName2").setMessageListener(listener);
-        session.createDurableSubscriber(topic, "subName3").setMessageListener(listener);
-    }
-    private void openDlqConsumer(final CountDownLatch received)throws Exception{
-    	
-    	dlqConnection  = (ActiveMQConnection) createConnection();
-    	Session dlqSession = dlqConnection .createSession(false, Session.AUTO_ACKNOWLEDGE);
-    	MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
-    	dlqConsumer.setMessageListener(new MessageListener() {
-          public void onMessage(Message message) {
-              if (received.getCount() % 200 == 0) {
-                  LOG.info("remaining on DLQ: " + received.getCount());
-              }
-              received.countDown();
-          }
-    	});
-    	dlqConnection.start();
-    } 
-    
-    
+        for (int i = 1; i <= numDurableSubs; i++) {
+            TopicSubscriber sub = consumerSession.createDurableSubscriber(topic, "subName"
+ i);
+            sub.setMessageListener(listener);
+            duralbeSubs.add(sub);
+        }
+    }
+
+    private void openDlqConsumer(final CountDownLatch received) throws Exception {
+
+        dlqConnection = (ActiveMQConnection) createConnection();
+        Session dlqSession = dlqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+        dlqConsumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                if (received.getCount() > 0 && received.getCount() % 200 == 0)
{
+                    LOG.info("remaining on DLQ: " + received.getCount());
+                }
+                received.countDown();
+                dlqConsumerLastReceivedTimeStamp = System.currentTimeMillis();
+                dlqReceivedCount.incrementAndGet();
+            }
+        });
+        dlqConnection.start();
+    }
+
+
     private void closeConsumer() throws JMSException {
-        if (consumerConnection != null)
+        for (TopicSubscriber sub : duralbeSubs) {
+            sub.close();
+        }
+        if (consumerSession != null) {
+            for (int i = 1; i <= numDurableSubs; i++) {
+                consumerSession.unsubscribe("subName" + i);
+            }
+        }
+        if (consumerConnection != null) {
             consumerConnection.close();
-        consumerConnection = null;
+            consumerConnection = null;
+        }
     }
-    private void closeDlqConsumer() throws JMSException {
-        if (dlqConnection != null)
-        	dlqConnection.close();
-        dlqConnection = null;
+
+    private void closeDlqConsumer() throws JMSException, InterruptedException {
+        final long limit = System.currentTimeMillis() + 30 * 1000;
+        if (dlqConsumerLastReceivedTimeStamp > 0) {
+            while (System.currentTimeMillis() < dlqConsumerLastReceivedTimeStamp + 5000
+                    && System.currentTimeMillis() < limit) {
+                LOG.info("waiting for DLQ do drain, receivedCount: " + dlqReceivedCount);
+                TimeUnit.SECONDS.sleep(1);
+            }
+        }
+        if (dlqConnection != null) {
+            dlqConnection.close();
+            dlqConnection = null;
+        }
     }
 
     private void sendMessage(boolean filter) throws Exception {
@@ -171,10 +212,6 @@ public class AMQ2584ConcurrentDlqTest ex
         if (deleteMessages) {
             broker.setDeleteAllMessagesOnStartup(true);
         }
-        KahaDBPersistenceAdapter persistenceAdapter=new KahaDBPersistenceAdapter();
-        persistenceAdapter.setEnableJournalDiskSyncs(false);
-        
-        broker.setPersistenceAdapter(persistenceAdapter);
         configurePersistenceAdapter(broker.getPersistenceAdapter());
         broker.getSystemUsage().getStoreUsage().setLimit(200 * 1000 * 1000);
         broker.start();
@@ -187,7 +224,8 @@ public class AMQ2584ConcurrentDlqTest ex
         properties.put("maxFileLength", maxFileLengthVal);
         properties.put("cleanupInterval", "2000");
         properties.put("checkpointInterval", "2000");
-       
+        properties.put("concurrentStoreAndDispatchQueues", "false");
+
         IntrospectionSupport.setProperties(persistenceAdapter, properties);
     }
 



Mime
View raw message