activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r911650 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/usage/ main/java/org/apache/activemq/util/ test/java/org/apache/...
Date Thu, 18 Feb 2010 23:49:41 GMT
Author: gtully
Date: Thu Feb 18 23:49:41 2010
New Revision: 911650

URL: http://svn.apache.org/viewvc?rev=911650&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2610 - org.apache.activemq.broker.region.cursors.PendingMessageCursor.next()
now increments the reference count before returning a message reference. this allows control
over references when browsing or peeking rather than moving/removing

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/UnlimitedEnqueueTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=911650&r1=911649&r2=911650&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Thu Feb 18 23:49:41 2010
@@ -164,6 +164,7 @@
                 pending.reset();
                 while (pending.hasNext()) {
                     MessageReference node = pending.next();
+                    node.decrementReferenceCount();
                     if (node.getMessageId().equals(mdn.getMessageId())) {
                         // Synchronize between dispatched list and removal of messages from
pending list
                         // related to remove subscription action
@@ -575,6 +576,7 @@
                             // related to remove subscription action
                             synchronized(dispatchLock) {
                                 pending.remove();
+                                node.decrementReferenceCount();
                                 if( !isDropped(node) && canDispatch(node)) {
 
                                     // Message may have been sitting in the pending

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=911650&r1=911649&r2=911650&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Feb 18 23:49:41 2010
@@ -808,32 +808,34 @@
     }
 
     public Message[] browse() {
-        List<Message> l = new ArrayList<Message>();
-        doBrowse(l, getMaxBrowsePageSize());
-        return l.toArray(new Message[l.size()]);
+        List<Message> browseList = new ArrayList<Message>();
+        doBrowse(browseList, getMaxBrowsePageSize());
+        return browseList.toArray(new Message[browseList.size()]);
     }
 
-    public void doBrowse(List<Message> l, int max) {
+    public void doBrowse(List<Message> browseList, int max) {
         final ConnectionContext connectionContext = createConnectionContext();
         try {
             pageInMessages(false);
             List<MessageReference> toExpire = new ArrayList<MessageReference>();
             synchronized (dispatchMutex) {
                 synchronized (pagedInPendingDispatch) {
-                    addAll(pagedInPendingDispatch, l, max, toExpire);
+                    addAll(pagedInPendingDispatch, browseList, max, toExpire);
                     for (MessageReference ref : toExpire) {
                         pagedInPendingDispatch.remove(ref);
                         if (broker.isExpired(ref)) {
+                            LOG.debug("expiring from pagedInPending: " + ref);
                             messageExpired(connectionContext, ref);
                         }
                     }
                 }
                 toExpire.clear();
                 synchronized (pagedInMessages) {
-                    addAll(pagedInMessages.values(), l, max, toExpire);
+                    addAll(pagedInMessages.values(), browseList, max, toExpire);
                 }
                 for (MessageReference ref : toExpire) {
                     if (broker.isExpired(ref)) {
+                        LOG.debug("expiring from pagedInMessages: " + ref);
                         messageExpired(connectionContext, ref);
                     } else {
                         synchronized (pagedInMessages) {
@@ -842,23 +844,25 @@
                     }
                 }
 
-                if (l.size() < getMaxBrowsePageSize()) {
+                if (browseList.size() < getMaxBrowsePageSize()) {
                     synchronized (messages) {
                         try {
                             messages.reset();
-                            while (messages.hasNext() && l.size() < max) {
+                            while (messages.hasNext() && browseList.size() < max)
{
                                 MessageReference node = messages.next();
                                 if (node.isExpired()) {
                                     if (broker.isExpired(node)) {
+                                        LOG.debug("expiring from messages: " + node);
                                         messageExpired(connectionContext, createMessageReference(node.getMessage()));
                                     }
                                     messages.remove();
                                 } else {
                                     messages.rollback(node.getMessageId());
-                                    if (l.contains(node.getMessage()) == false) {
-                                        l.add(node.getMessage());
+                                    if (browseList.contains(node.getMessage()) == false)
{
+                                        browseList.add(node.getMessage());
                                     }
                                 }
+                                node.decrementReferenceCount();
                             }
                         } finally {
                             messages.release();
@@ -897,6 +901,7 @@
                     while (messages.hasNext()) {
                         try {
                             MessageReference r = messages.next();
+                            r.decrementReferenceCount();
                             messages.rollback(r.getMessageId());
                             if (msgId.equals(r.getMessageId())) {
                                 Message m = r.getMessage();
@@ -1444,12 +1449,13 @@
                         messages.reset();
                         while (messages.hasNext() && count < toPageIn) {
                             MessageReference node = messages.next();
-                            node.incrementReferenceCount();
                             messages.remove();
                             QueueMessageReference ref = createMessageReference(node.getMessage());
                             if (ref.isExpired()) {
                                 if (broker.isExpired(ref)) {
                                     messageExpired(createConnectionContext(), ref);
+                                } else {
+                                    ref.decrementReferenceCount();
                                 }
                             } else {
                                 result.add(ref);
@@ -1467,6 +1473,8 @@
                         if (!pagedInMessages.containsKey(ref.getMessageId())) {
                             pagedInMessages.put(ref.getMessageId(), ref);
                             resultList.add(ref);
+                        } else {
+                            ref.decrementReferenceCount();
                         }
                     }
                 }
@@ -1657,7 +1665,6 @@
                         messages.reset();
                         while (messages.hasNext()) {
                             MessageReference node = messages.next();
-                            node.incrementReferenceCount();
                             messages.remove();
                             if (messageId.equals(node.getMessageId())) {
                                 message = this.createMessageReference(node.getMessage());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=911650&r1=911649&r2=911650&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Thu Feb 18 23:49:41 2010
@@ -161,6 +161,7 @@
             matched.reset();
             while (matched.hasNext()) {
                 MessageReference node = matched.next();
+                node.decrementReferenceCount();
                 if (broker.isExpired(node)) {
                     matched.remove();
                     dispatchedCounter.incrementAndGet();
@@ -181,6 +182,7 @@
                 matched.reset();
                 while (matched.hasNext()) {
                     MessageReference node = matched.next();
+                    node.decrementReferenceCount();
                     if (node.getMessageId().equals(mdn.getMessageId())) {
                         matched.remove();
                         dispatchedCounter.incrementAndGet();
@@ -384,8 +386,8 @@
                     matched.reset();
                    
                     while (matched.hasNext() && !isFull()) {
-                        MessageReference message = (MessageReference) matched
-                                .next();
+                        MessageReference message = (MessageReference) matched.next();
+                        message.decrementReferenceCount();
                         matched.remove();
                         // Message may have been sitting in the matched list a
                         // while

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=911650&r1=911649&r2=911650&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Thu Feb 18 23:49:41 2010
@@ -151,6 +151,9 @@
             result = this.iterator.next().getValue();
         }
         last = result;
+        if (result != null) {
+            result.incrementReferenceCount();
+        }
         return result;
     }
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=911650&r1=911649&r2=911650&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Thu Feb 18 23:49:41 2010
@@ -162,6 +162,7 @@
         int count = 0;
         for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() &&
count < maxItems;) {
             MessageReference ref = i.next();
+            ref.incrementReferenceCount();
             result.add(ref);
             count++;
         }
@@ -282,8 +283,8 @@
             // got from disk
             message.setRegionDestination(regionDestination);
             message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
-            message.incrementReferenceCount();
         }
+        message.incrementReferenceCount();
         return message;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=911650&r1=911649&r2=911650&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
Thu Feb 18 23:49:41 2010
@@ -108,7 +108,7 @@
     boolean hasNext();
 
     /**
-     * @return the next pending message
+     * @return the next pending message with its reference count increment
      */
     MessageReference next();
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=911650&r1=911649&r2=911650&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
Thu Feb 18 23:49:41 2010
@@ -117,6 +117,9 @@
      */
     public synchronized MessageReference next() {
         last = (MessageReference)iter.next();
+        if (last != null) {
+            last.incrementReferenceCount();
+        }
         return last;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=911650&r1=911649&r2=911650&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java Thu Feb
18 23:49:41 2010
@@ -407,7 +407,7 @@
     }
     
     static {
-        executor = new ThreadPoolExecutor(10, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+        executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
             public Thread newThread(Runnable runnable) {
                 Thread thread = new Thread(runnable, "Usage Async Task");
                 thread.setDaemon(true);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java?rev=911650&r1=911649&r2=911650&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
Thu Feb 18 23:49:41 2010
@@ -26,13 +26,14 @@
      * track the stack trace of callers
      * @param name the method being tracked
      */
-    public static void track(String name) {
+    public static void track(final String name) {
         Tracker t;
+        final String key = name.intern();
         synchronized(trackers) {
-            t = trackers.get(name);
+            t = trackers.get(key);
             if (t == null) {
                 t = new Tracker();
-                trackers.put(name, t);
+                trackers.put(key, t);
             }
         }
         t.track();
@@ -56,23 +57,30 @@
 @SuppressWarnings("serial")
 class Trace extends Throwable {
     public int count = 1;
-    public final int size;
+    public final long id;
     Trace() {
         super();
-        size = this.getStackTrace().length;
+        id = calculateIdentifier();
+    }
+    private long calculateIdentifier() {
+        int len = 0;
+        for (int i=0; i<this.getStackTrace().length; i++) {
+            len += this.getStackTrace()[i].toString().intern().hashCode();
+        }
+        return len;
     }
 }
 
 @SuppressWarnings("serial")
-class Tracker extends HashMap<Integer, Trace> {
+class Tracker extends HashMap<Long, Trace> {
     public void track() {
         Trace current = new Trace();
         synchronized(this) {
-            Trace exist = get(current.size);
+            Trace exist = get(current.id);
             if (exist != null) {
                 exist.count++;
             } else {
-                put(current.size, current);
+                put(current.id, current);
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java?rev=911650&r1=911649&r2=911650&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java Thu Feb
18 23:49:41 2010
@@ -17,6 +17,7 @@
 package org.apache.activemq;
 
 import java.io.File;
+import java.util.Map;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -25,6 +26,11 @@
 import javax.jms.TextMessage;
 
 import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -139,4 +145,32 @@
             recursiveDelete(new File(System.getProperty("derby.system.home")));
         }
     }
+    
+    public static DestinationStatistics getDestinationStatistics(BrokerService broker, ActiveMQDestination
destination) {
+        DestinationStatistics result = null;
+        org.apache.activemq.broker.region.Destination dest = getDestination(broker, destination);
+        if (dest != null) {
+            result = dest.getDestinationStatistics();
+        }
+        return result;
+    }
+    
+    public static org.apache.activemq.broker.region.Destination getDestination(BrokerService
target, ActiveMQDestination destination) {
+        org.apache.activemq.broker.region.Destination result = null;
+        for (org.apache.activemq.broker.region.Destination dest : getDestinationMap(target,
destination).values()) {
+            if (dest.getName().equals(destination.getPhysicalName())) {
+                result = dest;
+                break;
+            }
+        }
+        return result;
+    }
+
+    private static Map<ActiveMQDestination, org.apache.activemq.broker.region.Destination>
getDestinationMap(BrokerService target,
+            ActiveMQDestination destination) {
+        RegionBroker regionBroker = (RegionBroker) target.getRegionBroker();
+        return destination.isQueue() ?
+                    regionBroker.getQueueRegion().getDestinationMap() :
+                        regionBroker.getTopicRegion().getDestinationMap();
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java?rev=911650&r1=911649&r2=911650&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
Thu Feb 18 23:49:41 2010
@@ -16,6 +16,12 @@
  */
 package org.apache.activemq.broker;
 
+import javax.jms.JMSException;
+
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.ThreadTracker;
+
 public class TopicSubscriptionTest extends QueueSubscriptionTest {
 
     protected void setUp() throws Exception {
@@ -23,6 +29,11 @@
         durable = true;
         topic = true;
     }
+    
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        ThreadTracker.result();
+    }
 
     public void testManyProducersManyConsumers() throws Exception {
         consumerCount = 40;
@@ -34,6 +45,7 @@
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * producerCount * consumerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
     public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception {
@@ -46,6 +58,7 @@
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
     public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
@@ -58,6 +71,7 @@
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
     public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception
{
@@ -82,6 +96,7 @@
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
     public void testOneProducerManyConsumersFewMessages() throws Exception {
@@ -94,6 +109,7 @@
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
     public void testOneProducerManyConsumersManyMessages() throws Exception {
@@ -106,6 +122,7 @@
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * consumerCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
 
 
@@ -119,5 +136,12 @@
         doMultipleClientsTest();
 
         assertTotalMessagesReceived(messageCount * producerCount * consumerCount);
+        assertDestinationMemoryUsageGoesToZero();
     }
+    
+    private void assertDestinationMemoryUsageGoesToZero() throws Exception {
+        assertEquals("destination memory is back to 0", 0, 
+                TestSupport.getDestination(broker, ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage());
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java?rev=911650&r1=911649&r2=911650&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java
Thu Feb 18 23:49:41 2010
@@ -24,35 +24,38 @@
 import java.util.Date;
 
 import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import junit.framework.TestCase;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.usage.SystemUsage;
 
-import junit.framework.TestCase;
-
 public class StoreBasedCursorTest extends TestCase {
     protected String bindAddress = "tcp://localhost:60706";
     BrokerService broker;
-    ConnectionFactory factory;
+    ActiveMQConnectionFactory factory;
     Connection connection;
     Session session;
     Queue queue;
     int messageSize = 1024;
-    int memoryLimit = 5 * messageSize;
+    // actual message is messageSize*2, and 4*MessageSize would allow 2 messages be delivered,
but the flush of the cache is async so the flush
+    // triggered on 2nd message maxing out the usage may not be in effect for the 3rd message
to succeed. Making the memory usage more lenient
+    // gives the usageChange listener in the cursor an opportunity to kick in.
+    int memoryLimit = 12 * messageSize;
     
     protected void setUp() throws Exception {
         super.setUp();
         if (broker == null) {
             broker = new BrokerService();
+            broker.setAdvisorySupport(false);
         }
     }
 
@@ -67,6 +70,7 @@
     protected void start() throws Exception {
         broker.start();
         factory = new ActiveMQConnectionFactory("vm://localhost?jms.alwaysSyncSend=true");
+        factory.setWatchTopicAdvisories(false);
         connection = factory.createConnection();
         connection.start();
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java?rev=911650&r1=911649&r2=911650&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
Thu Feb 18 23:49:41 2010
@@ -99,6 +99,7 @@
         underTest.reset();
         while (underTest.hasNext() && dequeueCount < count) {
             MessageReference ref = underTest.next();
+            ref.decrementReferenceCount();
             underTest.remove();
             assertEquals(dequeueCount++, ref.getMessageId()
                     .getProducerSequenceId());

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=911650&r1=911649&r2=911650&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
Thu Feb 18 23:49:41 2010
@@ -43,6 +43,9 @@
 import org.apache.activemq.util.Wait;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import static org.apache.activemq.TestSupport.getDestination;
+import static org.apache.activemq.TestSupport.getDestinationStatistics;
+
 
 public class ExpiredMessagesTest extends CombinationTestSupport {
 
@@ -124,7 +127,7 @@
         producingThread.join();
         session.close();
         
-        final DestinationStatistics view = this.getDestinationStatistics(destination);
+        final DestinationStatistics view = getDestinationStatistics(broker, destination);
 
         // wait for all to inflight to expire
         assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() {
@@ -165,7 +168,7 @@
         final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueues().getCount();
         final long totalExpiredCount = view.getExpired().getCount() + expiredBeforeEnqueue;
         
-        final DestinationStatistics dlqView = getDestinationStatistics(dlqDestination);
+        final DestinationStatistics dlqView = getDestinationStatistics(broker, dlqDestination);
         LOG.info("DLQ stats: size= " + dlqView.getMessages().getCount() + ", enqueues: "
+ dlqView.getDequeues().getCount() + ", dequeues: " + dlqView.getDequeues().getCount()
                 + ", dispatched: " + dlqView.getDispatched().getCount() + ", inflight: "
+ dlqView.getInflight().getCount() + ", expiries: " + dlqView.getExpired().getCount());
         
@@ -177,8 +180,8 @@
         assertEquals("dlq contains all expired", totalExpiredCount, dlqView.getMessages().getCount());
         
         // memory check
-        assertEquals("memory usage is back to duck egg", 0, this.getDestination(destination).getMemoryUsage().getPercentUsage());
-        assertTrue("memory usage is increased ", 0 < this.getDestination(dlqDestination).getMemoryUsage().getPercentUsage());
   
+        assertEquals("memory usage is back to duck egg", 0, getDestination(broker, destination).getMemoryUsage().getPercentUsage());
+        assertTrue("memory usage is increased ", 0 < getDestination(broker, dlqDestination).getMemoryUsage().getPercentUsage());
   
         
         // verify DLQ
         MessageConsumer dlqConsumer = createDlqConsumer(connection);
@@ -243,7 +246,7 @@
         producingThread.start();
         producingThread.join();
 
-        DestinationStatistics view = getDestinationStatistics(destination);
+        DestinationStatistics view = getDestinationStatistics(broker, destination);
         LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: " 
                 + view.getEnqueues().getCount() + ", dequeues: "
                 + view.getDequeues().getCount() + ", dispatched: "
@@ -263,7 +266,7 @@
         
         Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
-                DestinationStatistics view = getDestinationStatistics(destination);
+                DestinationStatistics view = getDestinationStatistics(broker, destination);
                 LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: "
                         + view.getEnqueues().getCount() + ", dequeues: "
                         + view.getDequeues().getCount() + ", dispatched: "
@@ -275,7 +278,7 @@
             }
         });
         
-        view = getDestinationStatistics(destination);
+        view = getDestinationStatistics(broker, destination);
         assertEquals("Expect empty queue, QueueSize: ", 0, view.getMessages().getCount());
         assertEquals("all dequeues were expired", view.getDequeues().getCount(), view.getExpired().getCount());
     }
@@ -305,26 +308,7 @@
         return broker;
 	}
     
-    private DestinationStatistics getDestinationStatistics(ActiveMQDestination destination)
{
-        DestinationStatistics result = null;
-        org.apache.activemq.broker.region.Destination dest = getDestination(destination);
-        if (dest != null) {
-            result = dest.getDestinationStatistics();
-        }
-        return result;
-    }
     
-    private org.apache.activemq.broker.region.Destination getDestination(ActiveMQDestination
destination) {
-        org.apache.activemq.broker.region.Destination result = null;
-        RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
-        for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values())
{
-            if (dest.getName().equals(destination.getPhysicalName())) {
-                result = dest;
-                break;
-            }
-        }
-        return result;
-    }
 
 	protected void tearDown() throws Exception {
 		connection.stop();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=911650&r1=911649&r2=911650&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Thu Feb 18 23:49:41 2010
@@ -33,8 +33,11 @@
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.util.Wait;
@@ -53,6 +56,7 @@
 	MessageProducer producer;
 	public ActiveMQDestination destination = new ActiveMQQueue("test");
     public boolean optimizedDispatch = true;
+    public PendingQueueMessageStoragePolicy pendingQueuePolicy;
 	
     public static Test suite() {
         return suite(ExpiredMessagesWithNoConsumerTest.class);
@@ -82,6 +86,8 @@
         defaultEntry.setOptimizedDispatch(optimizedDispatch );
         defaultEntry.setExpireMessagesPeriod(800);
         defaultEntry.setMaxExpirePageSize(800);
+        
+        defaultEntry.setPendingQueuePolicy(pendingQueuePolicy);
 
         if (memoryLimit) {
             // so memory is not consumed by DLQ turn if off
@@ -99,6 +105,7 @@
 		
     public void initCombosForTestExpiredMessagesWithNoConsumer() {
         addCombinationValues("optimizedDispatch", new Object[] {Boolean.TRUE, Boolean.FALSE});
+        addCombinationValues("pendingQueuePolicy", new Object[] {null, new VMPendingQueueMessageStoragePolicy(),
new FilePendingQueueMessageStoragePolicy()});
     }
     
 	public void testExpiredMessagesWithNoConsumer() throws Exception {
@@ -111,7 +118,7 @@
 		producer = session.createProducer(destination);
 		producer.setTimeToLive(1000);
 		connection.start();
-		final long sendCount = 2000;		
+		final long sendCount = 2000;
 		
 		final Thread producingThread = new Thread("Producing Thread") {
             public void run() {
@@ -154,6 +161,7 @@
                 + ", size= " + view.getQueueSize());
         
         assertEquals("All sent have expired", sendCount, view.getExpiredCount());
+        assertEquals("memory usage goes to duck egg", 0, view.getMemoryPercentUsage());
 	}
     
 	// first ack delivered after expiry

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/UnlimitedEnqueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/UnlimitedEnqueueTest.java?rev=911650&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/UnlimitedEnqueueTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/UnlimitedEnqueueTest.java
Thu Feb 18 23:49:41 2010
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.usecases;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.Before;
+import org.junit.Test;
+
+public class UnlimitedEnqueueTest  {
+
+    BrokerService brokerService = null;
+    final long numMessages = 50000;
+    final long numThreads = 10;
+
+    @Test
+    public void testEnqueueIsOnlyLimitedByDisk() throws Exception {
+        ExecutorService executor = Executors.newCachedThreadPool();
+        for (int i=0; i<numThreads; i++) {
+            executor.execute(new Producer(numMessages/numThreads));
+        }
+        
+        executor.shutdown();
+        executor.awaitTermination(30*60, TimeUnit.SECONDS);
+    }
+    
+    @Before
+    public void createBrokerService() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setAdvisorySupport(false);
+        
+        // optional, reduce the usage limit so that spooling will occur faster
+        brokerService.getSystemUsage().getMemoryUsage().setLimit(10 * 1024 * 1024);
+        PolicyMap policyMap = new PolicyMap();
+        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
+        PolicyEntry policy = new PolicyEntry();
+        
+        // NB: ensure queue cursor limit is below the default 70% usage that the destination
will use
+        // if they are the same, the queue memory limit and flow control will kick in first
+        policy.setCursorMemoryHighWaterMark(20);
+        
+        // on by default
+        //policy.setProducerFlowControl(true);
+        policy.setQueue(">");
+        
+        // policy that will spool references to disk
+        policy.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
+        entries.add(policy);
+        policyMap.setPolicyEntries(entries);
+        brokerService.setDestinationPolicy(policyMap);
+        
+        brokerService.start();
+    }
+    
+    public class Producer implements Runnable{
+
+        private final long numberOfMessages;
+
+        public Producer(final long n){
+            this.numberOfMessages = n;
+        }
+
+        public void run(){
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
+            try {
+                Connection conn = factory.createConnection();
+                conn.start();
+                for (int i = 0; i < numberOfMessages; i++) {
+                    Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
+                    Destination destination = session.createQueue("test-queue");
+                    MessageProducer producer = session.createProducer(destination);
+                    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+                    BytesMessage message = session.createBytesMessage();
+                    byte[] bytes = new byte[1024*10];
+                    message.writeBytes(bytes);
+                    try {
+                        producer.send(message);
+                    } catch (ResourceAllocationException e) {
+                        e.printStackTrace();
+                    }
+                    session.close();
+                }
+            } catch (JMSException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/UnlimitedEnqueueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/UnlimitedEnqueueTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message