activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r650763 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/ test/java/org/apache/activemq/broker/jmx/
Date Wed, 23 Apr 2008 06:58:12 GMT
Author: rajdavies
Date: Tue Apr 22 23:57:51 2008
New Revision: 650763

URL: http://svn.apache.org/viewvc?rev=650763&view=rev
Log:
Fixes for https://issues.apache.org/activemq/browse/AMQ-1678

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=650763&r1=650762&r2=650763&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Tue Apr 22 23:57:51 2008
@@ -30,7 +30,7 @@
  * @version $Revision: 1.12 $
  */
 public abstract class BaseDestination implements Destination {
-
+    public static final int DEFAULT_PAGE_SIZE=100;
     protected final ActiveMQDestination destination;
     protected final Broker broker;
     protected final MessageStore store;
@@ -40,7 +40,7 @@
     private int maxProducersToAudit=1024;
     private int maxAuditDepth=2048;
     private boolean enableAudit=true;
-    private int maxPageSize=100;
+    private int maxPageSize=DEFAULT_PAGE_SIZE;
     private boolean useCache=true;
     private int minimumMessageSize=1024;
     private boolean lazyDispatch=false;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=650763&r1=650762&r2=650763&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Apr 22 23:57:51 2008
@@ -23,14 +23,14 @@
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.locks.ReentrantLock;
-
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
-
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -69,6 +69,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+
 /**
  * The Queue is a List of MessageEntry objects that are dispatched to matching
  * subscriptions.
@@ -749,27 +750,27 @@
      */
     public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages)
throws Exception {
         int movedCounter = 0;
-        int count = 0;
+        Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
         ConnectionContext context = createConnectionContext();
-        List<MessageReference> list = null;
         do {
             pageInMessages();
             synchronized (pagedInMessages) {
-                list = new ArrayList<MessageReference>(pagedInMessages.values());
+                set.addAll(pagedInMessages.values());
             }
+            List <MessageReference>list = new ArrayList<MessageReference>(set);
             for (MessageReference ref : list) {
                 IndirectMessageReference r = (IndirectMessageReference) ref;
                 if (filter.evaluate(context, r)) {
 
                     removeMessage(context, r);
+                    set.remove(r);
                     if (++movedCounter >= maximumMessages
                             && maximumMessages > 0) {
                         return movedCounter;
                     }
                 }
-                count++;
             }
-        } while (count < this.destinationStatistics.getMessages().getCount());
+        } while (set.size() < this.destinationStatistics.getMessages().getCount());
         return movedCounter;
     }
 
@@ -808,16 +809,21 @@
     public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter,
ActiveMQDestination dest, int maximumMessages) throws Exception {
         int movedCounter = 0;
         int count = 0;
-        List<MessageReference> list = null;
+        Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
         do {
+            int oldMaxSize=getMaxPageSize();
+            setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
             pageInMessages();
+            setMaxPageSize(oldMaxSize);
             synchronized (pagedInMessages) {
-                list = new ArrayList<MessageReference>(pagedInMessages.values());
+                set.addAll(pagedInMessages.values());
             }
+            List <MessageReference>list = new ArrayList<MessageReference>(set);
             for (MessageReference ref : list) {
                 IndirectMessageReference r = (IndirectMessageReference) ref;
                 if (filter.evaluate(context, r)) {
-                    r.incrementReferenceCount();
+                    
+                    r.incrementReferenceCount();                    
                     try {
                         Message m = r.getMessage();
                         BrokerSupport.resend(context, m, dest);
@@ -865,14 +871,14 @@
      */
     public int moveMatchingMessagesTo(ConnectionContext context,MessageReferenceFilter filter,
ActiveMQDestination dest,int maximumMessages) throws Exception {
         int movedCounter = 0;
-        int count = 0;
-        List<MessageReference> list = null;
+        Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
         do {
             pageInMessages();
             synchronized (pagedInMessages) {
-                list = new ArrayList<MessageReference>(pagedInMessages.values());
+                set.addAll(pagedInMessages.values());
             }
-            for (MessageReference ref : list) {
+            List <MessageReference>list = new ArrayList<MessageReference>(set);
+            for (MessageReference ref:list) {
                 IndirectMessageReference r = (IndirectMessageReference) ref;
                 if (filter.evaluate(context, r)) {
                     // We should only move messages that can be locked.
@@ -881,6 +887,7 @@
                         Message m = r.getMessage();
                         BrokerSupport.resend(context, m, dest);
                         removeMessage(context, r);
+                        set.remove(r);
                         if (++movedCounter >= maximumMessages
                                 && maximumMessages > 0) {
                             return movedCounter;
@@ -889,9 +896,9 @@
                         r.decrementReferenceCount();
                     }
                 }
-                count++;
+                
             }
-        } while (count < this.destinationStatistics.getMessages().getCount());
+        } while (set.size() < this.destinationStatistics.getMessages().getCount());
         return movedCounter;
     }
     
@@ -1065,12 +1072,12 @@
             }
         }
     }
-
+    
+  
     private List<QueueMessageReference> doPageIn(boolean force) throws Exception {
         List<QueueMessageReference> result = null;
         dispatchLock.lock();
         try{
-        
             int toPageIn = (getMaxPageSize()+(int)destinationStatistics.getInflight().getCount())
- pagedInMessages.size();
             if (isLazyDispatch()&& !force) {
              // Only page in the minimum number of messages which can be dispatched immediately.

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java?rev=650763&r1=650762&r2=650763&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java
Tue Apr 22 23:57:51 2008
@@ -43,7 +43,6 @@
      * @throws Exception
      */
     public void testReceiveBrowseReceive() throws Exception {
-
         Session session = connection.createSession(false, 0);
         ActiveMQQueue destination = new ActiveMQQueue("TEST");
         MessageProducer producer = session.createProducer(destination);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=650763&r1=650762&r2=650763&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Tue Apr 22 23:57:51 2008
@@ -18,7 +18,6 @@
 
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
-
 import javax.jms.Connection;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
@@ -29,11 +28,10 @@
 import javax.management.ObjectName;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.TabularData;
-
 import junit.textui.TestRunner;
 import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.advisory.TempDestDeleteTest;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.BaseDestination;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -56,7 +54,7 @@
     protected Connection connection;
     protected boolean transacted;
     protected int authMode = Session.AUTO_ACKNOWLEDGE;
-    protected int messageCount = 10;
+    protected static final int MESSAGE_COUNT = 2*BaseDestination.DEFAULT_PAGE_SIZE;
 
     /**
      * When you run this test case from the command line it will pause before
@@ -93,8 +91,8 @@
         queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination="
+ newDestination + ",BrokerName=localhost");
 
         queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
-
-        assertTrue("Should have at least one message in the queue: " + queueViewMBeanName,
queue.getQueueSize() > 0);
+        int movedSize = MESSAGE_COUNT-3;
+        assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
 
         // now lets remove them by selector
         queue.removeMatchingMessages("counter > 2");
@@ -114,16 +112,14 @@
         long queueSize = queue.getQueueSize();
         queue.copyMatchingMessagesTo("counter > 2", newDestination);
 
-        assertEquals("Should have same number of messages in the queue: " + queueViewMBeanName,
queueSize, queueSize);
+        
 
         queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination="
+ newDestination + ",BrokerName=localhost");
 
         queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
 
         LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + "
message(s)");
-
-        assertTrue("Should have at least one message in the queue: " + queueViewMBeanName,
queue.getQueueSize() > 0);
-
+        assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3,
queue.getQueueSize());
         // now lets remove them by selector
         queue.removeMatchingMessages("counter > 2");
 
@@ -165,20 +161,20 @@
         echo("Found tabular data: " + table);
         assertTrue("Table should not be empty!", table.size() > 0);
 
-        assertEquals("Queue size", 10, proxy.getQueueSize());
+        assertEquals("Queue size", MESSAGE_COUNT, proxy.getQueueSize());
 
         String messageID = messageIDs[0];
         String newDestinationName = "queue://dummy.test.cheese";
         echo("Attempting to copy: " + messageID + " to destination: " + newDestinationName);
         proxy.copyMessageTo(messageID, newDestinationName);
 
-        assertEquals("Queue size", 10, proxy.getQueueSize());
+        assertEquals("Queue size", MESSAGE_COUNT, proxy.getQueueSize());
 
         messageID = messageIDs[1];
         echo("Attempting to remove: " + messageID);
         proxy.removeMessage(messageID);
 
-        assertEquals("Queue size", 9, proxy.getQueueSize());
+        assertEquals("Queue size", MESSAGE_COUNT-1, proxy.getQueueSize());
 
         echo("Worked!");
     }
@@ -296,8 +292,9 @@
 
     protected BrokerService createBroker() throws Exception {
         BrokerService answer = new BrokerService();
+        answer.setDeleteAllMessagesOnStartup(true);
         answer.setUseJmx(true);
-        answer.setEnableStatistics(true);
+        //answer.setEnableStatistics(true);
         answer.setPersistent(false);
         answer.addConnector(bindAddress);
         return answer;
@@ -309,7 +306,7 @@
         Session session = connection.createSession(transacted, authMode);
         destination = createDestination();
         MessageProducer producer = session.createProducer(destination);
-        for (int i = 0; i < messageCount; i++) {
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
             Message message = session.createTextMessage("Message: " + i);
             message.setIntProperty("counter", i);
             producer.send(message);



Mime
View raw message