activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1061859 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ test/java/org/apache/activemq/ test/java/org/apache...
Date Fri, 21 Jan 2011 15:53:30 GMT
Author: gtully
Date: Fri Jan 21 15:53:29 2011
New Revision: 1061859

URL: http://svn.apache.org/viewvc?rev=1061859&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3149, https://issues.apache.org/jira/browse/AMQ-3145
expose current caching state of cursors (also via jmx), only use store async add if cache
is enabled, ensure cache is reenabled on add with 0 queue depth

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java
      - copied, changed from r1061365, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ExpiryHogTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=1061859&r1=1061858&r2=1061859&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
Fri Jan 21 15:53:29 2011
@@ -307,9 +307,9 @@ public interface DestinationViewMBean {
     public void setMaxPageSize(@MBeanInfo("pageSize") int pageSize);
     
     /**
-     * @return true if caching is enabled of for the destination
+     * @return true if caching is allowed of for the destination
      */
-    @MBeanInfo("Caching is enabled")
+    @MBeanInfo("Caching is allowed")
     public boolean isUseCache();
     
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java?rev=1061859&r1=1061858&r2=1061859&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
Fri Jan 21 15:53:29 2011
@@ -168,4 +168,12 @@ public class QueueView extends Destinati
         }
         return false;
     }
+
+    public boolean isCacheEnabled() {
+        Queue queue = (Queue) destination;
+        if (queue.getMessages() != null){
+            return queue.getMessages().isCacheEnabled();
+        }
+        return false;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java?rev=1061859&r1=1061858&r2=1061859&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
Fri Jan 21 15:53:29 2011
@@ -169,4 +169,9 @@ public interface QueueViewMBean extends 
     @MBeanInfo("Number of messages available to be paged in by the cursor.")
     public int cursorSize();
 
+    /**
+     * @return true if caching is currently enabled of for the destination
+     */
+    @MBeanInfo("Caching is enabled")
+    boolean isCacheEnabled();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1061859&r1=1061858&r2=1061859&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Jan 21 15:53:29 2011
@@ -667,7 +667,11 @@ public class Queue extends BaseDestinati
         try {
             if (store != null && message.isPersistent()) {        
                 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
-                result = store.asyncAddQueueMessage(context, message);
+                if (messages.isCacheEnabled()) {
+                    result = store.asyncAddQueueMessage(context, message);
+                } else {
+                    store.addMessage(context, message);
+                }
                 if (isReduceMemoryFootprint()) {
                     message.clearMarshalledState();
                 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=1061859&r1=1061858&r2=1061859&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Fri Jan 21 15:53:29 2011
@@ -45,6 +45,7 @@ public abstract class AbstractPendingMes
     protected boolean enableAudit=true;
     protected ActiveMQMessageAudit audit;
     protected boolean useCache=true;
+    protected boolean cacheEnabled=true;
     private boolean started=false;
     protected MessageReference last = null;
     protected final boolean prioritizedMessages;
@@ -327,4 +328,8 @@ public abstract class AbstractPendingMes
         return result;
 
     }
+
+    public boolean isCacheEnabled() {
+        return cacheEnabled;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1061859&r1=1061858&r2=1061859&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Fri Jan 21 15:53:29 2011
@@ -34,7 +34,6 @@ public abstract class AbstractStoreCurso
     protected final Destination regionDestination;
     private final PendingList batchList;
     private Iterator<MessageReference> iterator = null;
-    protected boolean cacheEnabled=false;
     protected boolean batchResetNeeded = true;
     private boolean storeHasMessages = false;
     protected int size;
@@ -59,9 +58,7 @@ public abstract class AbstractStoreCurso
             resetBatch();
             this.size = getStoreSize();
             this.storeHasMessages=this.size > 0;
-            if (!this.storeHasMessages&&useCache) {
-                cacheEnabled=true;
-            }
+            cacheEnabled = !this.storeHasMessages&&useCache;
         } 
     }
     
@@ -171,6 +168,12 @@ public abstract class AbstractStoreCurso
     
     
     public final synchronized void addMessageLast(MessageReference node) throws Exception
{
+        if (!cacheEnabled && size==0 && isStarted() && useCache &&
hasSpace()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() +
" enabling cache on empty add");
+            }
+            cacheEnabled=true;
+        }
         if (cacheEnabled && hasSpace()) {
             recoverMessage(node.getMessage(),true);
             lastCachedId = node.getMessageId();
@@ -210,12 +213,6 @@ public abstract class AbstractStoreCurso
         if (last != null) {
             last.decrementReferenceCount();
         }
-        if (size==0 && isStarted() && useCache && hasSpace() &&
isStoreEmpty()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() +
" enabling cache on last remove");
-            }
-            cacheEnabled=true;
-        }
     }
 
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=1061859&r1=1061858&r2=1061859&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Fri Jan 21 15:53:29 2011
@@ -63,12 +63,10 @@ public class FilePendingMessageCursor ex
     /**
      * @param broker
      * @param name
-     * @param prioritizedMessages 
-     * @param store
+     * @param prioritizedMessages
      */
     public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages)
{
         super(prioritizedMessages);
-        this.useCache = false;
         this.broker = broker;
         // the store can be null if the BrokerService has persistence
         // turned off
@@ -201,6 +199,7 @@ public class FilePendingMessageCursor ex
                     if (hasSpace() || this.store == null) {
                         memoryList.add(node);
                         node.incrementReferenceCount();
+                        cacheEnabled = true;
                         return true;
                     }
                 }
@@ -248,6 +247,7 @@ public class FilePendingMessageCursor ex
                     if (hasSpace()) {
                         memoryList.addFirst(node);
                         node.incrementReferenceCount();
+                        cacheEnabled = true;
                         return;
                     }
                 }
@@ -428,6 +428,7 @@ public class FilePendingMessageCursor ex
 
             }
             memoryList.clear();
+            cacheEnabled = false;
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=1061859&r1=1061858&r2=1061859&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
Fri Jan 21 15:53:29 2011
@@ -285,7 +285,7 @@ public interface PendingMessageCursor ex
     public void setUseCache(boolean useCache);
     
     /**
-     * @return true if a cache is being used
+     * @return true if a cache may be used
      */
     public boolean isUseCache();
     
@@ -294,5 +294,10 @@ public interface PendingMessageCursor ex
      * @param id
      */
     public void rollback(MessageId id);
+
+    /**
+     * @return true if cache is being used
+     */
+    public boolean isCacheEnabled();
    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=1061859&r1=1061858&r2=1061859&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Fri Jan 21 15:53:29 2011
@@ -180,7 +180,7 @@ public class StoreQueueCursor extends Ab
      * Informs the Broker if the subscription needs to intervention to recover
      * it's state e.g. DurableTopicSubscriber may do
      * 
-     * @see org.apache.activemq.region.cursors.PendingMessageCursor
+     * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
      * @return true if recovery required
      */
     public boolean isRecoveryRequired() {
@@ -294,4 +294,18 @@ public class StoreQueueCursor extends Ab
         }
         return currentCursor;
     }
+
+    @Override
+    public boolean isCacheEnabled() {
+        cacheEnabled = isUseCache();
+        if (cacheEnabled) {
+            if (persistent != null) {
+                cacheEnabled &= persistent.isCacheEnabled();
+            }
+            if (nonPersistent != null) {
+                cacheEnabled &= nonPersistent.isCacheEnabled();
+            }
+        }
+        return cacheEnabled;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=1061859&r1=1061858&r2=1061859&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
Fri Jan 21 15:53:29 2011
@@ -37,7 +37,6 @@ public class VMPendingMessageCursor exte
     
     public VMPendingMessageCursor(boolean prioritizedMessages) {
         super(prioritizedMessages);
-        this.useCache = false;
         if (this.prioritizedMessages) {
             this.list= new PrioritizedPendingList();
         }else {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ExpiryHogTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ExpiryHogTest.java?rev=1061859&r1=1061858&r2=1061859&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ExpiryHogTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ExpiryHogTest.java Fri
Jan 21 15:53:29 2011
@@ -33,7 +33,7 @@ public class ExpiryHogTest extends JmsMu
 
     int numMessages = 4;
 
-    public void testHog() throws Exception {
+    public void testImmediateDispatchWhenCacheDisabled() throws Exception {
         ConnectionFactory f = createConnectionFactory();
         destination = createDestination();
         startConsumers(f, destination);
@@ -53,7 +53,7 @@ public class ExpiryHogTest extends JmsMu
         bs.setDestinationPolicy(policyMap);
 
         KahaDBPersistenceAdapter ad = (KahaDBPersistenceAdapter) bs.getPersistenceAdapter();
-        ad.setConcurrentStoreAndDispatchQueues(false);
+        ad.setConcurrentStoreAndDispatchQueues(true);
         return bs;
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1061859&r1=1061858&r2=1061859&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Fri Jan 21 15:53:29 2011
@@ -156,6 +156,8 @@ public class MBeanTest extends EmbeddedB
         // check memory usage migration
         assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() >
0);
         assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
+        assertTrue("use cache", queueNew.isUseCache());
+        assertTrue("cache enabled", queueNew.isCacheEnabled());
     }
 
     public void testRetryMessages() throws Exception {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java?rev=1061859&r1=1061858&r2=1061859&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
Fri Jan 21 15:53:29 2011
@@ -96,6 +96,7 @@ public class QueuePurgeTest extends Test
         proxy.purge();
         assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
                 proxy.getQueueSize());
+        assertTrue("cache is disabled, temp store being used", !proxy.isCacheEnabled());
     }
 
     public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception {       

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java?rev=1061859&r1=1061858&r2=1061859&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
Fri Jan 21 15:53:29 2011
@@ -34,7 +34,7 @@ import org.apache.activemq.usage.SystemU
 
 /**
  * @author gtully
- * @see https://issues.apache.org/activemq/browse/AMQ-2020
+ * https://issues.apache.org/activemq/browse/AMQ-2020
  **/
 public class StoreQueueCursorNoDuplicateTest extends TestCase {
     ActiveMQQueue destination = new ActiveMQQueue("queue-"
@@ -83,6 +83,7 @@ public class StoreQueueCursorNoDuplicate
         underTest.setSystemUsage(systemUsage);
         underTest.setEnableAudit(false);
         underTest.start();
+        assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
 
         final ConnectionContext contextNotInTx = new ConnectionContext();
         for (int i = 0; i < count; i++) {
@@ -93,6 +94,7 @@ public class StoreQueueCursorNoDuplicate
             underTest.addMessageLast(msg);
         }
 
+        assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
         int dequeueCount = 0;
 
         underTest.setMaxBatchSize(2);

Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java
(from r1061365, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java&r1=1061365&r2=1061859&rev=1061859&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java Fri
Jan 21 15:53:29 2011
@@ -14,10 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.region;
-
-import java.io.File;
+package org.apache.activemq.bugs;
 
+import java.util.concurrent.TimeUnit;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
@@ -29,9 +28,6 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
-
-import junit.framework.TestCase;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
@@ -39,12 +35,18 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-public class QueuePurgeTest extends TestCase {
-    private static final Log LOG = LogFactory.getLog(QueuePurgeTest.class);
+public class AMQ3145Test {
+    private static final Log LOG = LogFactory.getLog(AMQ3145Test.class);
     private final String MESSAGE_TEXT = new String(new byte[1024]);
     BrokerService broker;
     ConnectionFactory factory;
@@ -53,22 +55,26 @@ public class QueuePurgeTest extends Test
     Queue queue;
     MessageConsumer consumer;
 
-    protected void setUp() throws Exception {
+    @Before
+    public void createBroker() throws Exception {
+        createBroker(true);
+    }
+
+    public void createBroker(boolean deleteAll) throws Exception {
         broker = new BrokerService();
-        broker.setDataDirectory("target/activemq-data");
+        broker.setDeleteAllMessagesOnStartup(deleteAll);
+        broker.setDataDirectory("target/AMQ3145Test");
         broker.setUseJmx(true);
-        broker.setDeleteAllMessagesOnStartup(true);
-        KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
-        persistenceAdapter.setDirectory(new File("target/activemq-data/kahadb/QueuePurgeTest"));
-        broker.setPersistenceAdapter(persistenceAdapter);
         broker.addConnector("tcp://localhost:0");
         broker.start();
         factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri().toString());
         connection = factory.createConnection();
         connection.start();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
     }
 
-    protected void tearDown() throws Exception {
+    @After
+    public void tearDown() throws Exception {
         if (consumer != null) {
             consumer.close();
         }
@@ -78,41 +84,20 @@ public class QueuePurgeTest extends Test
         broker.stop();
     }
 
-    public void testPurgeQueueWithActiveConsumer() throws Exception {
-        createProducerAndSendMessages(10000);
-        QueueViewMBean proxy = getProxyToQueueViewMBean();
-        createConsumer();
-        proxy.purge();
-        assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
-                proxy.getQueueSize());
-    }
-    
-    
-    public void testPurgeLargeQueue() throws Exception {       
-        applyBrokerSpoolingPolicy();
-        createProducerAndSendMessages(90000);
-        QueueViewMBean proxy = getProxyToQueueViewMBean();
-        LOG.info("purging..");
-        proxy.purge();
-        assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
-                proxy.getQueueSize());
-    }
-
-    public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception {       
-        applyBrokerSpoolingPolicy();
-        final int exprityPeriod = 1000;
-        applyExpiryDuration(exprityPeriod);
-        createProducerAndSendMessages(90000);
+    @Test
+    public void testCacheDisableReEnable() throws Exception {
+        createProducerAndSendMessages(1);
         QueueViewMBean proxy = getProxyToQueueViewMBean();
-        LOG.info("waiting for expiry to kick in a bunch of times to verify it does not blow
mem");
-        Thread.sleep(10000);
-        assertEquals("Queue size is has not changed " + proxy.getQueueSize(), 90000,
-                proxy.getQueueSize());
-    }
-    
-
-    private void applyExpiryDuration(int i) {
-        broker.getDestinationPolicy().getDefaultEntry().setExpireMessagesPeriod(i);
+        assertTrue("cache is enabled", proxy.isCacheEnabled());
+        tearDown();
+        createBroker(false);
+        proxy = getProxyToQueueViewMBean();
+        assertEquals("one pending message", 1, proxy.getQueueSize());
+        assertTrue("cache is disabled when there is a pending message", !proxy.isCacheEnabled());
+
+        createConsumer(1);
+        createProducerAndSendMessages(1);
+        assertTrue("cache is enabled again on next send when there are no messages", proxy.isCacheEnabled());
     }
 
     private void applyBrokerSpoolingPolicy() {
@@ -125,21 +110,6 @@ public class QueuePurgeTest extends Test
         broker.setDestinationPolicy(policyMap);
     }
 
-    
-    public void testPurgeLargeQueueWithConsumer() throws Exception {       
-        applyBrokerSpoolingPolicy();
-        createProducerAndSendMessages(90000);
-        QueueViewMBean proxy = getProxyToQueueViewMBean();
-        createConsumer();
-        long start = System.currentTimeMillis();
-        LOG.info("purging..");
-        proxy.purge();
-        LOG.info("purge done: " + (System.currentTimeMillis() - start) + "ms");
-        assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
-                proxy.getQueueSize());
-        assertEquals("usage goes to duck", 0, proxy.getMemoryPercentUsage());
-    }
-
     private QueueViewMBean getProxyToQueueViewMBean()
             throws MalformedObjectNameException, JMSException {
         ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq"
@@ -152,7 +122,6 @@ public class QueuePurgeTest extends Test
     }
 
     private void createProducerAndSendMessages(int numToSend) throws Exception {
-        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
         queue = session.createQueue("test1");
         MessageProducer producer = session.createProducer(queue);
         for (int i = 0; i < numToSend; i++) {
@@ -165,13 +134,13 @@ public class QueuePurgeTest extends Test
         producer.close();
     }
 
-    private void createConsumer() throws Exception {
+    private void createConsumer(int numToConsume) throws Exception {
         consumer = session.createConsumer(queue);
         // wait for buffer fill out
-        Thread.sleep(5 * 1000);
-        for (int i = 0; i < 500; ++i) {
-            Message message = consumer.receive();
+        for (int i = 0; i < numToConsume; ++i) {
+            Message message = consumer.receive(2000);
             message.acknowledge();
         }
+        consumer.close();
     }
-}
+}
\ No newline at end of file



Mime
View raw message