activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r615287 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Date Fri, 25 Jan 2008 17:52:54 GMT
Author: rajdavies
Date: Fri Jan 25 09:52:54 2008
New Revision: 615287

URL: http://svn.apache.org/viewvc?rev=615287&view=rev
Log:
use task runner to dispatch messages - improve concurrency

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=615287&r1=615286&r2=615287&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Jan 25 09:52:54 2008
@@ -51,7 +51,6 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.kaha.Store;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
@@ -88,10 +87,7 @@
     private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
         public void run() {
-            try {
-                taskRunner.wakeup();
-            } catch (InterruptedException e) {
-            }
+            wakeup();
         };
     };
     
@@ -878,7 +874,7 @@
      */
     public boolean iterate() {
 
-        while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
+        while (!messagesWaitingForSpace.isEmpty() &&!memoryUsage.isFull()) {
             Runnable op = messagesWaitingForSpace.removeFirst();
             op.run();
         }
@@ -930,11 +926,7 @@
         synchronized(pagedInMessages) {
             pagedInMessages.remove(reference.getMessageId());
         }
-        try {
-            taskRunner.wakeup();
-        } catch (InterruptedException e) {
-            log.warn("Task Runner failed to wakeup ", e);
-        }
+        wakeup();
     }
 
     protected boolean lockMessage(IndirectMessageReference r) {
@@ -953,7 +945,15 @@
         }
         destinationStatistics.getEnqueues().increment();
         destinationStatistics.getMessages().increment();
-        pageInMessages(false);
+        wakeup();
+    }
+    
+    final void wakeup() {
+        try {
+            taskRunner.wakeup();
+        } catch (InterruptedException e) {
+            log.warn("Task Runner failed to wakeup ", e);
+        }
     }
 
     
@@ -1020,5 +1020,7 @@
     private void pageInMessages(boolean force) throws Exception {
             doDispatch(doPageIn(force));
     }
+    
+    
 
 }



Mime
View raw message