activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r812214 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Queue.java test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Date Mon, 07 Sep 2009 16:08:06 GMT
Author: gtully
Date: Mon Sep  7 16:08:06 2009
New Revision: 812214

URL: http://svn.apache.org/viewvc?rev=812214&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-1112 - resolve issues with optimizedDispatch,
no need for message expiry to respect optimizeDispatch - iterate can always use the task runner
in the expiry case, similar to usage change

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=812214&r1=812213&r2=812214&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Sep  7 16:08:06 2009
@@ -1266,7 +1266,7 @@
         } catch (IOException e) {
             LOG.error("Failed to remove expired Message from the store ",e);
         }
-        wakeup();
+        asyncWakeup();
     }
     
     protected ConnectionContext createConnectionContext() {
@@ -1297,15 +1297,18 @@
     public void wakeup() {
         if (optimizedDispatch || isSlave()) {
             iterate();
-        }else {
-            try {
-                taskRunner.wakeup();
-            } catch (InterruptedException e) {
-                LOG.warn("Task Runner failed to wakeup ", e);
-            }
+        } else {
+            asyncWakeup();
         }
     }
     
+    public void asyncWakeup() {
+        try {
+            this.taskRunner.wakeup();
+        } catch (InterruptedException e) {
+            LOG.warn("Async task tunner failed to wakeup ", e);
+        }
+    }
   
     private boolean isSlave() {
         return broker.getBrokerService().isSlave();
@@ -1371,6 +1374,7 @@
     }
 
     private void doDispatch(List<QueueMessageReference> list) throws Exception {
+        boolean doWakeUp = false;
         synchronized(dispatchMutex) {
        
             synchronized (pagedInPendingDispatch) {
@@ -1391,11 +1395,14 @@
                                 pagedInPendingDispatch.add(qmr);
                             }
                         }
-                        wakeup();
+                        doWakeUp  = true;
                     }
                 }
             }
         } 
+        if (doWakeUp) {
+            wakeup();
+        }
     }
     
     /**
@@ -1603,11 +1610,7 @@
 
     public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
         if (oldPercentUsage > newPercentUsage) {
-            try {
-                this.taskRunner.wakeup();
-            } catch (InterruptedException e) {
-                LOG.warn(getName() + " failed to wakeup task runner on usageChange: " + e);
-            }
+            asyncWakeup();
         }
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=812214&r1=812213&r2=812214&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Mon Sep  7 16:08:06 2009
@@ -76,7 +76,7 @@
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry defaultEntry = new PolicyEntry();
         // TODO Optimize dispatch makes this test hang
-        //defaultEntry.setOptimizedDispatch(true);
+        defaultEntry.setOptimizedDispatch(true);
         defaultEntry.setExpireMessagesPeriod(100);
         defaultEntry.setMaxExpirePageSize(800);
 



Mime
View raw message