activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r704142 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/advisory/
Date Mon, 13 Oct 2008 15:24:31 GMT
Author: gtully
Date: Mon Oct 13 08:24:31 2008
New Revision: 704142

URL: http://svn.apache.org/viewvc?rev=704142&view=rev
Log:
fix AMQ-1970 - pagedInMessages in slave was being filled due to more than 200(pageSize) unacked
messages and slave not modifying the inflight count which is used in the pageIn logic

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=704142&r1=704141&r2=704142&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon Oct 13 08:24:31 2008
@@ -163,6 +163,7 @@
                             pending.remove();
                             createMessageDispatch(node, node.getMessage());
                             dispatched.add(node);
+                            onDispatch(node, node.getMessage());
                         }
                         return;
                     }
@@ -173,7 +174,7 @@
         }
         throw new JMSException(
                 "Slave broker out of sync with master: Dispatched message ("
-                        + mdn.getMessageId() + ") was not in the pending list");
+                        + mdn.getMessageId() + ") was not in the pending list for " + mdn.getDestination().getPhysicalName());
     }
 
     public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws
Exception {
@@ -205,9 +206,7 @@
                             if (!this.getConsumerInfo().isBrowser()) {
                                 node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
                             }
-                            if (!isSlave()) {
-                                node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
-                            }
+                            node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
                         } else {
                             // setup a Synchronization to remove nodes from the
                             // dispatched list.

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java?rev=704142&r1=704141&r2=704142&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
Mon Oct 13 08:24:31 2008
@@ -16,14 +16,26 @@
  */
 package org.apache.activemq.advisory;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.RegionBroker;
 
+
 public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
     String masterBindAddress = "tcp://localhost:61616";
     String slaveBindAddress = "tcp://localhost:62616";
     BrokerService slave;
-    
+
     /*
      * add a slave broker
      * @see org.apache.activemq.EmbeddedBrokerTestSupport#createBroker()
@@ -93,4 +105,46 @@
                 masterRb.getDestinationStatistics().getDispatched().getCount());
     }
     
+    public void testMoreThanPageSizeUnacked() throws Exception {
+        
+        final int messageCount = Queue.MAX_PAGE_SIZE + 10;
+        final CountDownLatch latch = new CountDownLatch(1);
+        
+        serverSession = serverConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQSession s = (ActiveMQSession) serverSession;
+        s.setSessionAsyncDispatch(true);
+        
+        MessageConsumer serverConsumer = serverSession.createConsumer(serverDestination);
+        serverConsumer.setMessageListener(new MessageListener() {
+           
+            public void onMessage(Message msg) {
+                try {
+                    latch.await(30L, TimeUnit.SECONDS);
+                } catch (Exception e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+            }
+        });
+            
+        MessageProducer producer = clientSession.createProducer(serverDestination);
+        for (int i =0; i< messageCount; i++) {
+            Message msg = clientSession.createMessage();
+            producer.send(msg);
+        }
+        
+        RegionBroker slaveRb = (RegionBroker) slave.getBroker().getAdaptor(
+                RegionBroker.class);
+        RegionBroker masterRb = (RegionBroker) broker.getBroker().getAdaptor(
+                RegionBroker.class);
+        
+        Thread.sleep(4000);
+        assertEquals("inflight match expected", messageCount, masterRb.getDestinationStatistics().getInflight().getCount());
       
+        assertEquals("inflight match on slave and master", slaveRb.getDestinationStatistics().getInflight().getCount(),
masterRb.getDestinationStatistics().getInflight().getCount());
+        
+        latch.countDown();
+        Thread.sleep(4000);
+        assertEquals("inflight match expected", 0, masterRb.getDestinationStatistics().getInflight().getCount());
       
+        assertEquals("inflight match on slave and master", slaveRb.getDestinationStatistics().getInflight().getCount(),
masterRb.getDestinationStatistics().getInflight().getCount());
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java?rev=704142&r1=704141&r2=704142&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
Mon Oct 13 08:24:31 2008
@@ -32,11 +32,11 @@
  * @version $Revision: 397249 $
  */
 public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
-    private Connection serverConnection;
-    private Session serverSession;
-    private Connection clientConnection;
-    private Session clientSession;
-    private Destination serverDestination;
+    protected Connection serverConnection;
+    protected Session serverSession;
+    protected Connection clientConnection;
+    protected Session clientSession;
+    protected Destination serverDestination;
     protected static final int COUNT = 2000;
 
     public void testLoadRequestReply() throws Exception {



Mime
View raw message