activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r634588 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: Queue.java TempQueue.java policy/PolicyEntry.java
Date Fri, 07 Mar 2008 09:02:42 GMT
Author: rajdavies
Date: Fri Mar  7 01:02:37 2008
New Revision: 634588

URL: http://svn.apache.org/viewvc?rev=634588&view=rev
Log:
re-introduce optimize dispatch for queues

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=634588&r1=634587&r2=634588&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Mar  7 01:02:37 2008
@@ -78,6 +78,7 @@
  */
 public class Queue extends BaseDestination implements Task {
     protected final Log log;
+    protected TaskRunnerFactory taskFactory;
     protected TaskRunner taskRunner;    
     protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
     protected PendingMessageCursor messages;
@@ -93,6 +94,7 @@
     private boolean useConsumerPriority=true;
     private boolean strictOrderDispatch=false;
     private QueueDispatchSelector  dispatchSelector;
+    private boolean optimizedDispatch=false;
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
         public void run() {
             wakeup();
@@ -109,11 +111,9 @@
     public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore
store,DestinationStatistics parentStats,
                  TaskRunnerFactory taskFactory) throws Exception {
         super(brokerService, store, destination, parentStats);
-        
-        
+        this.taskFactory=taskFactory;       
         this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
         this.dispatchSelector=new QueueDispatchSelector(destination);
-       
     }
         
     public void initialize() throws Exception {
@@ -133,17 +133,20 @@
             memoryUsage.setParent(systemUsage.getMemoryUsage());
         }
         
-       
-        this.executor =  Executors.newSingleThreadExecutor(new ThreadFactory() {
-            public Thread newThread(Runnable runnable) {
-                Thread thread = new Thread(runnable, "QueueThread:"+destination);
-                thread.setDaemon(true);
-                thread.setPriority(Thread.NORM_PRIORITY);
-                return thread;
-            }
-        });
-           
-        this.taskRunner = new DeterministicTaskRunner(this.executor,this);
+        if (isOptimizedDispatch()) {
+            this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue:  " + destination.getPhysicalName());
+        }else {
+            this.executor =  Executors.newSingleThreadExecutor(new ThreadFactory() {
+                public Thread newThread(Runnable runnable) {
+                    Thread thread = new Thread(runnable, "QueueThread:"+destination);
+                    thread.setDaemon(true);
+                    thread.setPriority(Thread.NORM_PRIORITY);
+                    return thread;
+                }
+            });
+               
+            this.taskRunner = new DeterministicTaskRunner(this.executor,this);
+        }
         super.initialize();
         if (store != null) {
             // Restore the persistent messages.
@@ -583,6 +586,15 @@
     public void setStrictOrderDispatch(boolean strictOrderDispatch) {
         this.strictOrderDispatch = strictOrderDispatch;
     }
+    
+
+    public boolean isOptimizedDispatch() {
+        return optimizedDispatch;
+    }
+
+    public void setOptimizedDispatch(boolean optimizedDispatch) {
+        this.optimizedDispatch = optimizedDispatch;
+    }
 
     // Implementation methods
     // -------------------------------------------------------------------------
@@ -956,10 +968,14 @@
     }
     
     protected void wakeup() {
-        try {
-            taskRunner.wakeup();
-        } catch (InterruptedException e) {
-            log.warn("Task Runner failed to wakeup ", e);
+        if (optimizedDispatch) {
+            iterate();
+        }else {
+            try {
+                taskRunner.wakeup();
+            } catch (InterruptedException e) {
+                log.warn("Task Runner failed to wakeup ", e);
+            }
         }
     }
 
@@ -1075,4 +1091,5 @@
     private void removeFromConsumerList(Subscription sub) {
         consumers.remove(sub);
     }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=634588&r1=634587&r2=634588&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
Fri Mar  7 01:02:37 2008
@@ -32,7 +32,7 @@
  */
 public class TempQueue extends Queue{
     private final ActiveMQTempDestination tempDest;
-    private TaskRunnerFactory taskFactory;
+   
     
     /**
      * @param brokerService
@@ -48,7 +48,6 @@
             throws Exception {
         super(brokerService, destination, store, parentStats, taskFactory);
         this.tempDest = (ActiveMQTempDestination) destination;
-        this.taskFactory=taskFactory;
     }
     
     public void initialize() throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=634588&r1=634587&r2=634588&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Fri Mar  7 01:02:37 2008
@@ -86,6 +86,7 @@
         queue.setMinimumMessageSize((int) getMinimumMessageSize());
         queue.setUseConsumerPriority(isUseConsumerPriority());
         queue.setStrictOrderDispatch(isStrictOrderDispatch());
+        queue.setOptimizedDispatch(isOptimizedDispatch());
     }
 
     public void configure(Topic topic) {



Mime
View raw message