activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r673157 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ft/ main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/broker/ft/
Date Tue, 01 Jul 2008 17:59:38 GMT
Author: rajdavies
Date: Tue Jul  1 10:59:37 2008
New Revision: 673157

URL: http://svn.apache.org/viewvc?rev=673157&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1585

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java?rev=673157&r1=673156&r2=673157&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
Tue Jul  1 10:59:37 2008
@@ -280,11 +280,13 @@
     }
 
     /**
-     * Notifiy the Broker that a dispatch has happened
-     * 
+     * Notifiy the Broker that a dispatch will happen
+     * Do in 'pre' so that slave will avoid getting ack before dispatch
+     * similar logic to send() below.
      * @param messageDispatch
      */
-    public void postProcessDispatch(MessageDispatch messageDispatch) {
+    public void preProcessDispatch(MessageDispatch messageDispatch) {
+        super.preProcessDispatch(messageDispatch);
         MessageDispatchNotification mdn = new MessageDispatchNotification();
         mdn.setConsumerId(messageDispatch.getConsumerId());
         mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
@@ -294,7 +296,6 @@
             mdn.setMessageId(msg.getMessageId());
             sendAsyncToSlave(mdn);
         }
-        super.postProcessDispatch(messageDispatch);
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=673157&r1=673156&r2=673157&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Jul  1 10:59:37 2008
@@ -197,7 +197,9 @@
                         if (!context.isInTransaction()) {
                             dequeueCounter++;
                             node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
-                            node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+                            if (!isSlave()) {
+                                node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+                            }
                             removeList.add(node);
                         } else {
                             // setup a Synchronization to remove nodes from the

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=673157&r1=673156&r2=673157&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Jul  1 10:59:37 2008
@@ -1073,7 +1073,7 @@
     }
     
     public void wakeup() {
-        if (optimizedDispatch) {
+        if (optimizedDispatch || isSlave()) {
             iterate();
         }else {
             try {
@@ -1085,6 +1085,10 @@
     }
     
   
+    private boolean isSlave() {
+        return broker.getBrokerService().isSlave();
+    }
+
     private List<QueueMessageReference> doPageIn(boolean force) throws Exception {
         List<QueueMessageReference> result = null;
         dispatchLock.lock();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java?rev=673157&r1=673156&r2=673157&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
Tue Jul  1 10:59:37 2008
@@ -47,6 +47,7 @@
             File file = new File(".");
             System.setProperty("basedir", file.getAbsolutePath());
         }
+        super.messageCount = 500;
         failureCount = super.messageCount / 2;
         super.topic = isTopic();
         createMaster();



Mime
View raw message