activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1233367 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
Date Thu, 19 Jan 2012 13:33:20 GMT
Author: gtully
Date: Thu Jan 19 13:33:19 2012
New Revision: 1233367

URL: http://svn.apache.org/viewvc?rev=1233367&view=rev
Log:
move test to KahaDb and add simple validation of sendFailIfNoSpace and temp usage for non
persistent messages. Producer stops on the exception, consumer picks up all the messages

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java?rev=1233367&r1=1233366&r2=1233367&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
Thu Jan 19 13:33:19 2012
@@ -16,9 +16,6 @@
  */
 package org.apache.activemq.bugs;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 import java.io.File;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -29,29 +26,26 @@ import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
 import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.store.amq.AMQPersistenceAdapter;
-import org.apache.activemq.store.kahadb.plist.PListStore;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.StoreUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.usage.TempUsage;
-import org.apache.activemq.util.IOHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
 
-public class TempStorageBlockedBrokerTest {
+public class TempStorageBlockedBrokerTest extends TestSupport {
 
-    public boolean consumeAll = false;
     public int deliveryMode = DeliveryMode.PERSISTENT;
 
     private static final Logger LOG = LoggerFactory.getLogger(TempStorageBlockedBrokerTest.class);
@@ -61,12 +55,11 @@ public class TempStorageBlockedBrokerTes
     AtomicInteger messagesSent = new AtomicInteger(0);
     AtomicInteger messagesConsumed = new AtomicInteger(0);
 
-    protected long messageReceiveTimeout = 10L;
+    protected long messageReceiveTimeout = 10000L;
 
     Destination destination = new ActiveMQTopic("FooTwo");
 
-    @Test
-    public void runProducerWithHungConsumer() throws Exception {
+    public void testRunProducerWithHungConsumer() throws Exception {
 
         final long origTempUsage = broker.getSystemUsage().getTempUsage().getUsage();
 
@@ -135,7 +128,7 @@ public class TempStorageBlockedBrokerTes
 
         while ((m = consumer.receive(messageReceiveTimeout)) != null) {
             count++;
-            if (count != 0 && count%10 == 0) {
+            if (count != 0 && count%100 == 0) {
                 LOG.info("Recieved Message (" + count + "):" + m);
             }
             messagesConsumed.incrementAndGet();
@@ -154,8 +147,6 @@ public class TempStorageBlockedBrokerTes
 
         final long tempUsageBySubscription = broker.getSystemUsage().getTempUsage().getUsage();
         LOG.info("Orig Usage: " + origTempUsage + ", currentUsage: " + tempUsageBySubscription);
-        // assertTrue("some temp store has been used", tempUsageBySubscription
-        // != origTempUsage);
 
         producerConnection.close();
         consumerConnection.close();
@@ -173,7 +164,56 @@ public class TempStorageBlockedBrokerTes
                 MESSAGES_COUNT);
     }
 
-    @Before
+    public void testFillTempAndConsume() throws Exception {
+
+        broker.getSystemUsage().setSendFailIfNoSpace(true);
+        destination = new ActiveMQQueue("Foo");
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61618");
+        final ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection();
+        // so we can easily catch the ResourceAllocationException on send
+        producerConnection.setAlwaysSyncSend(true);
+        producerConnection.start();
+
+        Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        try {
+            while (true) {
+                Message message = session.createTextMessage(new String(buf) + messagesSent.toString());
+                producer.send(message);
+                messagesSent.incrementAndGet();
+                if (messagesSent.get() % 100 == 0) {
+                    LOG.info("Sent Message " + messagesSent.get());
+                    LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
+                }
+            }
+        } catch (ResourceAllocationException ex) {
+            LOG.info("Got resource exception : " + ex + ", after sent: " + messagesSent.get());
+        }
+
+        // consume all sent
+        Connection consumerConnection = factory.createConnection();
+        consumerConnection.start();
+
+        Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = consumerSession.createConsumer(destination);
+
+
+        while (consumer.receive(messageReceiveTimeout) != null) {
+            messagesConsumed.incrementAndGet();
+            if (messagesConsumed.get() % 1000 == 0) {
+                LOG.info("received Message " + messagesConsumed.get());
+                LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage());
+            }
+        }
+
+        assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(),
messagesConsumed.get(),
+                messagesSent.get());
+    }
+
+    @Override
     public void setUp() throws Exception {
 
         broker = new BrokerService();
@@ -183,18 +223,8 @@ public class TempStorageBlockedBrokerTes
         broker.setAdvisorySupport(false);
         broker.setDeleteAllMessagesOnStartup(true);
 
-        AMQPersistenceAdapter persistence = new AMQPersistenceAdapter();
-        persistence.setSyncOnWrite(false);
-        File directory = new File("target" + File.separator + "activemq-data");
-        persistence.setDirectory(directory);
-        File tmpDir = new File(directory, "tmp");
-        IOHelper.deleteChildren(tmpDir);
-        PListStore tempStore = new PListStore();
-        tempStore.setDirectory(tmpDir);
-        tempStore.setJournalMaxFileLength(50*1024);
-        tempStore.start();
-
-        SystemUsage sysUsage = new SystemUsage("mySysUsage", persistence, tempStore);
+        setDefaultPersistenceAdapter(broker);
+        SystemUsage sysUsage = broker.getSystemUsage();
         MemoryUsage memUsage = new MemoryUsage();
         memUsage.setLimit((1024 * 1024));
         StoreUsage storeUsage = new StoreUsage();
@@ -216,14 +246,12 @@ public class TempStorageBlockedBrokerTes
 
         broker.setDestinationPolicy(policyMap);
         broker.setSystemUsage(sysUsage);
-        broker.setTempDataStore(tempStore);
-        broker.setPersistenceAdapter(persistence);
 
         broker.addConnector("tcp://localhost:61618").setName("Default");
         broker.start();
     }
 
-    @After
+    @Override
     public void tearDown() throws Exception {
         if (broker != null) {
             broker.stop();



Mime
View raw message