activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1302977 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/kahadb/plist/ test/java/org/apache/activemq/bugs/
Date Tue, 20 Mar 2012 16:26:08 GMT
Author: tabish
Date: Tue Mar 20 16:26:07 2012
New Revision: 1302977

URL: http://svn.apache.org/viewvc?rev=1302977&view=rev
Log:
fix and test for: https://issues.apache.org/jira/browse/AMQ-3780

ensure that non-persistent messages are cleaned up from temp storage when the Queue is deleted.

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=1302977&r1=1302976&r2=1302977&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Tue Mar 20 16:26:07 2012
@@ -26,8 +26,6 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Store based Cursor for Queues
- * 
- * 
  */
 public class StoreQueueCursor extends AbstractPendingMessageCursor {
 
@@ -42,7 +40,7 @@ public class StoreQueueCursor extends Ab
 
     /**
      * Construct
-     * @param broker 
+     * @param broker
      * @param queue
      */
     public StoreQueueCursor(Broker broker,Queue queue) {
@@ -78,6 +76,7 @@ public class StoreQueueCursor extends Ab
     public synchronized void stop() throws Exception {
         started = false;
         if (nonPersistent != null) {
+            nonPersistent.clear();
             nonPersistent.stop();
             nonPersistent.gc();
         }
@@ -101,7 +100,7 @@ public class StoreQueueCursor extends Ab
             }
         }
     }
-    
+
     public synchronized void addMessageFirst(MessageReference node) throws Exception {
         if (node != null) {
             Message msg = node.getMessage();
@@ -155,9 +154,9 @@ public class StoreQueueCursor extends Ab
     public synchronized void reset() {
         nonPersistent.reset();
         persistent.reset();
-        pendingCount = persistent.size() + nonPersistent.size();        
+        pendingCount = persistent.size() + nonPersistent.size();
     }
-    
+
     public void release() {
         nonPersistent.release();
         persistent.release();
@@ -179,7 +178,7 @@ public class StoreQueueCursor extends Ab
     /**
      * Informs the Broker if the subscription needs to intervention to recover
      * it's state e.g. DurableTopicSubscriber may do
-     * 
+     *
      * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
      * @return true if recovery required
      */
@@ -208,8 +207,8 @@ public class StoreQueueCursor extends Ab
         }
         super.setMaxBatchSize(maxBatchSize);
     }
-    
-    
+
+
     public void setMaxProducersToAudit(int maxProducersToAudit) {
         super.setMaxProducersToAudit(maxProducersToAudit);
         if (persistent != null) {
@@ -229,7 +228,7 @@ public class StoreQueueCursor extends Ab
             nonPersistent.setMaxAuditDepth(maxAuditDepth);
         }
     }
-    
+
     public void setEnableAudit(boolean enableAudit) {
         super.setEnableAudit(enableAudit);
         if (persistent != null) {
@@ -239,7 +238,7 @@ public class StoreQueueCursor extends Ab
             nonPersistent.setEnableAudit(enableAudit);
         }
     }
-    
+
     @Override
     public void setUseCache(boolean useCache) {
         super.setUseCache(useCache);
@@ -250,7 +249,7 @@ public class StoreQueueCursor extends Ab
             nonPersistent.setUseCache(useCache);
         }
     }
-    
+
     @Override
     public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
         super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java?rev=1302977&r1=1302976&r2=1302977&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
Tue Mar 20 16:26:07 2012
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.thread.Scheduler;
@@ -191,6 +192,10 @@ public class PListStore extends ServiceS
         }
     }
 
+    public Journal getJournal() {
+        return this.journal;
+    }
+
     public File getDirectory() {
         return directory;
     }
@@ -354,9 +359,9 @@ public class PListStore extends ServiceS
 
     public void run() {
         try {
-             if (isStopping()) {
+            if (isStopping()) {
                 return;
-             }
+            }
             final int lastJournalFileId = journal.getLastAppendLocation().getDataFileId();
             final Set<Integer> candidates = journal.getFileMap().keySet();
             LOG.trace("Full gc candidate set:" + candidates);
@@ -370,7 +375,7 @@ public class PListStore extends ServiceS
                 List<PList> plists = null;
                 synchronized (indexLock) {
                     synchronized (this) {
-                        plists = new ArrayList(persistentLists.values());
+                        plists = new ArrayList<PList>(persistentLists.values());
                     }
                 }
                 for (PList list : plists) {
@@ -481,5 +486,4 @@ public class PListStore extends ServiceS
         String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
         return "PListStore:[" + path + " ]";
     }
-
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java?rev=1302977&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
Tue Mar 20 16:26:07 2012
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TempStoreDataCleanupTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TempStoreDataCleanupTest.class);
+    private static final String QUEUE_NAME = TempStoreDataCleanupTest.class.getName() + "Queue";
+
+    private final String str = new String(
+        "QAa0bcLdUK2eHfJgTP8XhiFj61DOklNm9nBoI5pGqYVrs3CtSuMZvwWx4yE7zR");
+
+    private BrokerService broker;
+    private String connectionUri;
+    private ExecutorService pool;
+    private String queueName;
+    private Random r = new Random();
+
+    @Before
+    public void setUp() throws Exception {
+
+        broker = new BrokerService();
+        broker.setDataDirectory("target" + File.separator + "activemq-data");
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.setDedicatedTaskRunner(false);
+        broker.setAdvisorySupport(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        SharedDeadLetterStrategy strategy = new SharedDeadLetterStrategy();
+        strategy.setProcessExpired(false);
+        strategy.setProcessNonPersistent(false);
+
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setQueue(">");
+        defaultPolicy.setOptimizedDispatch(true);
+        defaultPolicy.setDeadLetterStrategy(strategy);
+        defaultPolicy.setMemoryLimit(9000000);
+
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(defaultPolicy);
+
+        broker.setDestinationPolicy(policyMap);
+
+        broker.getSystemUsage().getMemoryUsage().setLimit(300000000L);
+
+        broker.addConnector("tcp://localhost:0").setName("Default");
+        broker.start();
+        broker.waitUntilStarted();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+        pool = Executors.newFixedThreadPool(10);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+
+        if (pool != null) {
+            pool.shutdown();
+        }
+    }
+
+    @Test
+    public void testIt() throws Exception {
+
+        for (int i = 0; i < 2; i++) {
+            LOG.info("Started the test iteration: " + i + " using queueName = " + queueName);
+            queueName = QUEUE_NAME + i;
+            final CountDownLatch latch = new CountDownLatch(11);
+
+            pool.execute(new Runnable() {
+                @Override
+                public void run() {
+                    receiveAndDiscard100messages(latch);
+                }
+            });
+
+            for (int j = 0; j < 10; j++) {
+                pool.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        send10000messages(latch);
+                    }
+                });
+            }
+
+            LOG.info("Waiting on the send / receive latch");
+            latch.await(5, TimeUnit.MINUTES);
+            LOG.info("Resumed");
+
+            destroyQueue();
+            TimeUnit.SECONDS.sleep(2);
+        }
+
+        final PListStore pa = broker.getTempDataStore();
+        assertTrue("only one journal file should be left: " + pa.getJournal().getFileMap().size(),
+            Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return pa.getJournal().getFileMap().size() == 1;
+                }
+            }, TimeUnit.MINUTES.toMillis(3))
+        );
+    }
+
+    public void destroyQueue() {
+        try {
+            Broker broker = this.broker.getBroker();
+            if (!broker.isStopped()) {
+                LOG.info("Removing: " + queueName);
+                broker.removeDestination(this.broker.getAdminConnectionContext(), new ActiveMQQueue(queueName),
10);
+            }
+        } catch (Exception e) {
+            LOG.warn("Got an error while removing the test queue", e);
+        }
+    }
+
+    private void send10000messages(CountDownLatch latch) {
+        ActiveMQConnection activeMQConnection = null;
+        try {
+            activeMQConnection = createConnection(null);
+            Session session = activeMQConnection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(session
+                    .createQueue(queueName));
+            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+            activeMQConnection.start();
+            for (int i = 0; i < 10000; i++) {
+                TextMessage textMessage = session.createTextMessage();
+                textMessage.setText(generateBody(1000));
+                textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                producer.send(textMessage);
+                try {
+                    Thread.sleep(10);
+                } catch (InterruptedException e) {
+                }
+            }
+            producer.close();
+        } catch (JMSException e) {
+            LOG.warn("Got an error while sending the messages", e);
+        } finally {
+            if (activeMQConnection != null) {
+                try {
+                    activeMQConnection.close();
+                } catch (JMSException e) {
+                }
+            }
+        }
+        latch.countDown();
+    }
+
+    private void receiveAndDiscard100messages(CountDownLatch latch) {
+        ActiveMQConnection activeMQConnection = null;
+        try {
+            activeMQConnection = createConnection(null);
+            Session session = activeMQConnection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = session.createConsumer(
+                    session.createQueue(queueName));
+            activeMQConnection.start();
+            for (int i = 0; i < 100; i++) {
+                messageConsumer.receive();
+            }
+            messageConsumer.close();
+            LOG.info("Created and disconnected");
+        } catch (JMSException e) {
+            LOG.warn("Got an error while receiving the messages", e);
+        } finally {
+            if (activeMQConnection != null) {
+                try {
+                    activeMQConnection.close();
+                } catch (JMSException e) {
+                }
+            }
+        }
+        latch.countDown();
+    }
+
+    private ActiveMQConnection createConnection(String id) throws JMSException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+        if (id != null) {
+            factory.setClientID(id);
+        }
+
+        ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+        return connection;
+    }
+
+    private String generateBody(int length) {
+
+        StringBuilder sb = new StringBuilder();
+        int te = 0;
+        for (int i = 1; i <= length; i++) {
+            te = r.nextInt(62);
+            sb.append(str.charAt(te));
+        }
+        return sb.toString();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message