activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r911759 - in /activemq/branches/activemq-5.3/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/usage/ main/java/org/apache/activemq/util/ test/...
Date Fri, 19 Feb 2010 09:35:16 GMT
Author: gtully
Date: Fri Feb 19 09:35:16 2010
New Revision: 911759

URL: http://svn.apache.org/viewvc?rev=911759&view=rev
Log:
merge -c 911650 https://svn.apache.org/repos/asf/activemq/trunk - resolve https://issues.apache.org/activemq/browse/AMQ-2610
- org.apache.activemq.broker.region.cursors.PendingMessageCursor.next() now increments the
reference count before returning a message reference. this allows control over references
when browsing or peeking rather than moving/removing

Added:
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/UnlimitedEnqueueTest.java
      - copied unchanged from r911650, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/UnlimitedEnqueueTest.java
Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/TestSupport.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=911759&r1=911758&r2=911759&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Fri Feb 19 09:35:16 2010
@@ -164,6 +164,7 @@
                 pending.reset();
                 while (pending.hasNext()) {
                     MessageReference node = pending.next();
+                    node.decrementReferenceCount();
                     if (node.getMessageId().equals(mdn.getMessageId())) {
                         // Synchronize between dispatched list and removal of messages from
pending list
                         // related to remove subscription action
@@ -575,6 +576,7 @@
                             // related to remove subscription action
                             synchronized(dispatchLock) {
                                 pending.remove();
+                                node.decrementReferenceCount();
                                 if( !isDropped(node) && canDispatch(node)) {
 
                                     // Message may have been sitting in the pending

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=911759&r1=911758&r2=911759&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Feb 19 09:35:16 2010
@@ -805,32 +805,34 @@
     }
 
     public Message[] browse() {
-        List<Message> l = new ArrayList<Message>();
-        doBrowse(l, getMaxBrowsePageSize());
-        return l.toArray(new Message[l.size()]);
+        List<Message> browseList = new ArrayList<Message>();
+        doBrowse(browseList, getMaxBrowsePageSize());
+        return browseList.toArray(new Message[browseList.size()]);
     }
 
-    public void doBrowse(List<Message> l, int max) {
+    public void doBrowse(List<Message> browseList, int max) {
         final ConnectionContext connectionContext = createConnectionContext();
         try {
             pageInMessages(false);
             List<MessageReference> toExpire = new ArrayList<MessageReference>();
             synchronized (dispatchMutex) {
                 synchronized (pagedInPendingDispatch) {
-                    addAll(pagedInPendingDispatch, l, max, toExpire);
+                    addAll(pagedInPendingDispatch, browseList, max, toExpire);
                     for (MessageReference ref : toExpire) {
                         pagedInPendingDispatch.remove(ref);
                         if (broker.isExpired(ref)) {
+                            LOG.debug("expiring from pagedInPending: " + ref);
                             messageExpired(connectionContext, ref);
                         }
                     }
                 }
                 toExpire.clear();
                 synchronized (pagedInMessages) {
-                    addAll(pagedInMessages.values(), l, max, toExpire);
+                    addAll(pagedInMessages.values(), browseList, max, toExpire);
                 }
                 for (MessageReference ref : toExpire) {
                     if (broker.isExpired(ref)) {
+                        LOG.debug("expiring from pagedInMessages: " + ref);
                         messageExpired(connectionContext, ref);
                     } else {
                         synchronized (pagedInMessages) {
@@ -839,23 +841,25 @@
                     }
                 }
 
-                if (l.size() < getMaxBrowsePageSize()) {
+                if (browseList.size() < getMaxBrowsePageSize()) {
                     synchronized (messages) {
                         try {
                             messages.reset();
-                            while (messages.hasNext() && l.size() < max) {
+                            while (messages.hasNext() && browseList.size() < max)
{
                                 MessageReference node = messages.next();
                                 if (node.isExpired()) {
                                     if (broker.isExpired(node)) {
+                                        LOG.debug("expiring from messages: " + node);
                                         messageExpired(connectionContext, createMessageReference(node.getMessage()));
                                     }
                                     messages.remove();
                                 } else {
                                     messages.rollback(node.getMessageId());
-                                    if (l.contains(node.getMessage()) == false) {
-                                        l.add(node.getMessage());
+                                    if (browseList.contains(node.getMessage()) == false)
{
+                                        browseList.add(node.getMessage());
                                     }
                                 }
+                                node.decrementReferenceCount();
                             }
                         } finally {
                             messages.release();
@@ -894,6 +898,7 @@
                     while (messages.hasNext()) {
                         try {
                             MessageReference r = messages.next();
+                            r.decrementReferenceCount();
                             messages.rollback(r.getMessageId());
                             if (msgId.equals(r.getMessageId())) {
                                 Message m = r.getMessage();
@@ -1441,12 +1446,13 @@
                         messages.reset();
                         while (messages.hasNext() && count < toPageIn) {
                             MessageReference node = messages.next();
-                            node.incrementReferenceCount();
                             messages.remove();
                             QueueMessageReference ref = createMessageReference(node.getMessage());
                             if (ref.isExpired()) {
                                 if (broker.isExpired(ref)) {
                                     messageExpired(createConnectionContext(), ref);
+                                } else {
+                                    ref.decrementReferenceCount();
                                 }
                             } else {
                                 result.add(ref);
@@ -1464,6 +1470,8 @@
                         if (!pagedInMessages.containsKey(ref.getMessageId())) {
                             pagedInMessages.put(ref.getMessageId(), ref);
                             resultList.add(ref);
+                        } else {
+                            ref.decrementReferenceCount();
                         }
                     }
                 }
@@ -1654,7 +1662,6 @@
                         messages.reset();
                         while (messages.hasNext()) {
                             MessageReference node = messages.next();
-                            node.incrementReferenceCount();
                             messages.remove();
                             if (messageId.equals(node.getMessageId())) {
                                 message = this.createMessageReference(node.getMessage());

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=911759&r1=911758&r2=911759&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Fri Feb 19 09:35:16 2010
@@ -161,6 +161,7 @@
             matched.reset();
             while (matched.hasNext()) {
                 MessageReference node = matched.next();
+                node.decrementReferenceCount();
                 if (broker.isExpired(node)) {
                     matched.remove();
                     dispatchedCounter.incrementAndGet();
@@ -181,6 +182,7 @@
                 matched.reset();
                 while (matched.hasNext()) {
                     MessageReference node = matched.next();
+                    node.decrementReferenceCount();
                     if (node.getMessageId().equals(mdn.getMessageId())) {
                         matched.remove();
                         dispatchedCounter.incrementAndGet();
@@ -384,8 +386,8 @@
                     matched.reset();
                    
                     while (matched.hasNext() && !isFull()) {
-                        MessageReference message = (MessageReference) matched
-                                .next();
+                        MessageReference message = (MessageReference) matched.next();
+                        message.decrementReferenceCount();
                         matched.remove();
                         // Message may have been sitting in the matched list a
                         // while

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=911759&r1=911758&r2=911759&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Fri Feb 19 09:35:16 2010
@@ -151,6 +151,9 @@
             result = this.iterator.next().getValue();
         }
         last = result;
+        if (result != null) {
+            result.incrementReferenceCount();
+        }
         return result;
     }
     

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=911759&r1=911758&r2=911759&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Fri Feb 19 09:35:16 2010
@@ -155,7 +155,9 @@
         LinkedList<MessageReference> result = new LinkedList<MessageReference>();
         int count = 0;
         for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() &&
count < maxItems;) {
-            result.add(i.next());
+            MessageReference ref = i.next();
+            ref.incrementReferenceCount();
+            result.add(ref);
             count++;
         }
         if (count < maxItems && !isDiskListEmpty()) {
@@ -271,8 +273,8 @@
             // got from disk
             message.setRegionDestination(regionDestination);
             message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
-            message.incrementReferenceCount();
         }
+        message.incrementReferenceCount();
         return message;
     }
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=911759&r1=911758&r2=911759&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
Fri Feb 19 09:35:16 2010
@@ -108,7 +108,7 @@
     boolean hasNext();
 
     /**
-     * @return the next pending message
+     * @return the next pending message with its reference count increment
      */
     MessageReference next();
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=911759&r1=911758&r2=911759&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
Fri Feb 19 09:35:16 2010
@@ -117,6 +117,9 @@
      */
     public synchronized MessageReference next() {
         last = (MessageReference)iter.next();
+        if (last != null) {
+            last.incrementReferenceCount();
+        }
         return last;
     }
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=911759&r1=911758&r2=911759&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
Fri Feb 19 09:35:16 2010
@@ -407,7 +407,7 @@
     }
     
     static {
-        executor = new ThreadPoolExecutor(10, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+        executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
             public Thread newThread(Runnable runnable) {
                 Thread thread = new Thread(runnable, "Usage Async Task");
                 thread.setDaemon(true);

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java?rev=911759&r1=911758&r2=911759&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
Fri Feb 19 09:35:16 2010
@@ -26,13 +26,14 @@
      * track the stack trace of callers
      * @param name the method being tracked
      */
-    public static void track(String name) {
+    public static void track(final String name) {
         Tracker t;
+        final String key = name.intern();
         synchronized(trackers) {
-            t = trackers.get(name);
+            t = trackers.get(key);
             if (t == null) {
                 t = new Tracker();
-                trackers.put(name, t);
+                trackers.put(key, t);
             }
         }
         t.track();
@@ -56,23 +57,30 @@
 @SuppressWarnings("serial")
 class Trace extends Throwable {
     public int count = 1;
-    public final int size;
+    public final long id;
     Trace() {
         super();
-        size = this.getStackTrace().length;
+        id = calculateIdentifier();
+    }
+    private long calculateIdentifier() {
+        int len = 0;
+        for (int i=0; i<this.getStackTrace().length; i++) {
+            len += this.getStackTrace()[i].toString().intern().hashCode();
+        }
+        return len;
     }
 }
 
 @SuppressWarnings("serial")
-class Tracker extends HashMap<Integer, Trace> {
+class Tracker extends HashMap<Long, Trace> {
     public void track() {
         Trace current = new Trace();
         synchronized(this) {
-            Trace exist = get(current.size);
+            Trace exist = get(current.id);
             if (exist != null) {
                 exist.count++;
             } else {
-                put(current.size, current);
+                put(current.id, current);
             }
         }
     }

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/TestSupport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/TestSupport.java?rev=911759&r1=911758&r2=911759&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/TestSupport.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/TestSupport.java
Fri Feb 19 09:35:16 2010
@@ -17,6 +17,7 @@
 package org.apache.activemq;
 
 import java.io.File;
+import java.util.Map;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -25,6 +26,11 @@
 import javax.jms.TextMessage;
 
 import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -139,4 +145,32 @@
             recursiveDelete(new File(System.getProperty("derby.system.home")));
         }
     }
+    
+    public static DestinationStatistics getDestinationStatistics(BrokerService broker, ActiveMQDestination
destination) {
+        DestinationStatistics result = null;
+        org.apache.activemq.broker.region.Destination dest = getDestination(broker, destination);
+        if (dest != null) {
+            result = dest.getDestinationStatistics();
+        }
+        return result;
+    }
+    
+    public static org.apache.activemq.broker.region.Destination getDestination(BrokerService
target, ActiveMQDestination destination) {
+        org.apache.activemq.broker.region.Destination result = null;
+        for (org.apache.activemq.broker.region.Destination dest : getDestinationMap(target,
destination).values()) {
+            if (dest.getName().equals(destination.getPhysicalName())) {
+                result = dest;
+                break;
+            }
+        }
+        return result;
+    }
+
+    private static Map<ActiveMQDestination, org.apache.activemq.broker.region.Destination>
getDestinationMap(BrokerService target,
+            ActiveMQDestination destination) {
+        RegionBroker regionBroker = (RegionBroker) target.getRegionBroker();
+        return destination.isQueue() ?
+                    regionBroker.getQueueRegion().getDestinationMap() :
+                        regionBroker.getTopicRegion().getDestinationMap();
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java?rev=911759&r1=911758&r2=911759&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
Fri Feb 19 09:35:16 2010
@@ -16,6 +16,12 @@
  */
 package org.apache.activemq.broker;
 
+import javax.jms.JMSException;
+
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.ThreadTracker;
+
 public class TopicSubscriptionTest extends QueueSubscriptionTest {
 
     protected void setUp() throws Exception {
@@ -23,6 +29,11 @@
         durable = true;
         topic = true;
     }
+    
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        ThreadTracker.result();
+    }
 
     public void testManyProducersManyConsumers() throws Exception {
         consumerCount = 40;
@@ -34,6 +45,7 @@
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * producerCount * consumerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
     public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
@@ -46,6 +58,7 @@
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
     public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
@@ -58,6 +71,7 @@
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
     public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception
{
@@ -82,6 +96,7 @@
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
     public void testOneProducerManyConsumersFewMessages() throws Exception {
@@ -94,6 +109,7 @@
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
     public void testOneProducerManyConsumersManyMessages() throws Exception {
@@ -106,6 +122,7 @@
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
 
@@ -119,5 +136,12 @@
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * producerCount * consumerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
+    
+    private void assertDestinationMemoryUsageGoesToZero() throws Exception {
+        assertEquals("destination memory is back to 0", 0, 
+                TestSupport.getDestination(broker, ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage());
+    }
+
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java?rev=911759&r1=911758&r2=911759&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java
Fri Feb 19 09:35:16 2010
@@ -24,35 +24,38 @@
 import java.util.Date;
 
 import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import junit.framework.TestCase;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.usage.SystemUsage;
 
-import junit.framework.TestCase;
-
 public class StoreBasedCursorTest extends TestCase {
     protected String bindAddress = "tcp://localhost:60706";
     BrokerService broker;
-    ConnectionFactory factory;
+    ActiveMQConnectionFactory factory;
     Connection connection;
     Session session;
     Queue queue;
     int messageSize = 1024;
-    int memoryLimit = 5 * messageSize;
+    // actual message is messageSize*2, and 4*MessageSize would allow 2 messages be delivered,
but the flush of the cache is async so the flush
+    // triggered on 2nd message maxing out the usage may not be in effect for the 3rd message
to succeed. Making the memory usage more lenient
+    // gives the usageChange listener in the cursor an opportunity to kick in.
+    int memoryLimit = 12 * messageSize;
     
     protected void setUp() throws Exception {
         super.setUp();
         if (broker == null) {
             broker = new BrokerService();
+            broker.setAdvisorySupport(false);
         }
     }
 
@@ -67,6 +70,7 @@
     protected void start() throws Exception {
         broker.start();
         factory = new ActiveMQConnectionFactory("vm://localhost?jms.alwaysSyncSend=true");
+        factory.setWatchTopicAdvisories(false);
         connection = factory.createConnection();
         connection.start();
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java?rev=911759&r1=911758&r2=911759&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
Fri Feb 19 09:35:16 2010
@@ -99,6 +99,7 @@
         underTest.reset();
         while (underTest.hasNext() && dequeueCount < count) {
             MessageReference ref = underTest.next();
+            ref.decrementReferenceCount();
             underTest.remove();
             assertEquals(dequeueCount++, ref.getMessageId()
                     .getProducerSequenceId());

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=911759&r1=911758&r2=911759&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
Fri Feb 19 09:35:16 2010
@@ -43,6 +43,9 @@
 import org.apache.activemq.util.Wait;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import static org.apache.activemq.TestSupport.getDestination;
+import static org.apache.activemq.TestSupport.getDestinationStatistics;
+
 
 public class ExpiredMessagesTest extends CombinationTestSupport {
 
@@ -124,7 +127,7 @@
         producingThread.join();
         session.close();
         
-        final DestinationStatistics view = this.getDestinationStatistics(destination);
+        final DestinationStatistics view = getDestinationStatistics(broker, destination);
 
         // wait for all to inflight to expire
         assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() {
@@ -165,7 +168,7 @@
         final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueues().getCount();
         final long totalExpiredCount = view.getExpired().getCount() + expiredBeforeEnqueue;
         
-        final DestinationStatistics dlqView = getDestinationStatistics(dlqDestination);
+        final DestinationStatistics dlqView = getDestinationStatistics(broker, dlqDestination);
         LOG.info("DLQ stats: size= " + dlqView.getMessages().getCount() + ", enqueues: "
+ dlqView.getDequeues().getCount() + ", dequeues: " + dlqView.getDequeues().getCount()
                 + ", dispatched: " + dlqView.getDispatched().getCount() + ", inflight: "
+ dlqView.getInflight().getCount() + ", expiries: " + dlqView.getExpired().getCount());
         
@@ -177,8 +180,8 @@
         assertEquals("dlq contains all expired", totalExpiredCount, dlqView.getMessages().getCount());
         
         // memory check
-        assertEquals("memory usage is back to duck egg", 0, this.getDestination(destination).getMemoryUsage().getPercentUsage());
-        assertTrue("memory usage is increased ", 0 < this.getDestination(dlqDestination).getMemoryUsage().getPercentUsage());
   
+        assertEquals("memory usage is back to duck egg", 0, getDestination(broker, destination).getMemoryUsage().getPercentUsage());
+        assertTrue("memory usage is increased ", 0 < getDestination(broker, dlqDestination).getMemoryUsage().getPercentUsage());
   
         
         // verify DLQ
         MessageConsumer dlqConsumer = createDlqConsumer(connection);
@@ -243,7 +246,7 @@
         producingThread.start();
         producingThread.join();
 
-        DestinationStatistics view = getDestinationStatistics(destination);
+        DestinationStatistics view = getDestinationStatistics(broker, destination);
         LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: " 
                 + view.getEnqueues().getCount() + ", dequeues: "
                 + view.getDequeues().getCount() + ", dispatched: "
@@ -263,7 +266,7 @@
         
         Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
-                DestinationStatistics view = getDestinationStatistics(destination);
+                DestinationStatistics view = getDestinationStatistics(broker, destination);
                 LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: "
                         + view.getEnqueues().getCount() + ", dequeues: "
                         + view.getDequeues().getCount() + ", dispatched: "
@@ -275,7 +278,7 @@
             }
         });
         
-        view = getDestinationStatistics(destination);
+        view = getDestinationStatistics(broker, destination);
         assertEquals("Expect empty queue, QueueSize: ", 0, view.getMessages().getCount());
         assertEquals("all dequeues were expired", view.getDequeues().getCount(), view.getExpired().getCount());
     }
@@ -304,26 +307,7 @@
         return broker;
 	}
     
-    private DestinationStatistics getDestinationStatistics(ActiveMQDestination destination)
{
-        DestinationStatistics result = null;
-        org.apache.activemq.broker.region.Destination dest = getDestination(destination);
-        if (dest != null) {
-            result = dest.getDestinationStatistics();
-        }
-        return result;
-    }
     
-    private org.apache.activemq.broker.region.Destination getDestination(ActiveMQDestination
destination) {
-        org.apache.activemq.broker.region.Destination result = null;
-        RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
-        for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values())
{
-            if (dest.getName().equals(destination.getPhysicalName())) {
-                result = dest;
-                break;
-            }
-        }
-        return result;
-    }
 
 	protected void tearDown() throws Exception {
 		connection.stop();

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=911759&r1=911758&r2=911759&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Fri Feb 19 09:35:16 2010
@@ -33,8 +33,11 @@
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.util.Wait;
@@ -53,6 +56,7 @@
 	MessageProducer producer;
 	public ActiveMQDestination destination = new ActiveMQQueue("test");
     public boolean optimizedDispatch = true;
+    public PendingQueueMessageStoragePolicy pendingQueuePolicy;
 	
     public static Test suite() {
         return suite(ExpiredMessagesWithNoConsumerTest.class);
@@ -82,6 +86,8 @@
         defaultEntry.setOptimizedDispatch(optimizedDispatch );
         defaultEntry.setExpireMessagesPeriod(800);
         defaultEntry.setMaxExpirePageSize(800);
+        
+        defaultEntry.setPendingQueuePolicy(pendingQueuePolicy);
 
         if (memoryLimit) {
             // so memory is not consumed by DLQ turn if off
@@ -99,6 +105,7 @@
 		
     public void initCombosForTestExpiredMessagesWithNoConsumer() {
         addCombinationValues("optimizedDispatch", new Object[] {Boolean.TRUE, Boolean.FALSE});
+        addCombinationValues("pendingQueuePolicy", new Object[] {null, new VMPendingQueueMessageStoragePolicy(),
new FilePendingQueueMessageStoragePolicy()});
     }
     
 	public void testExpiredMessagesWithNoConsumer() throws Exception {
@@ -111,7 +118,7 @@
 		producer = session.createProducer(destination);
 		producer.setTimeToLive(1000);
 		connection.start();
-		final long sendCount = 2000;		
+		final long sendCount = 2000;
 		
 		final Thread producingThread = new Thread("Producing Thread") {
             public void run() {
@@ -154,6 +161,7 @@
                 + ", size= " + view.getQueueSize());
         
         assertEquals("All sent have expired", sendCount, view.getExpiredCount());
+        assertEquals("memory usage goes to duck egg", 0, view.getMemoryPercentUsage());
 	}
     
 	// first ack delivered after expiry



Mime
View raw message