activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r887117 [2/2] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/java/org/apache/activemq/apollo/ activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/test/java/org/apache/activemq/broker/...
Date Fri, 04 Dec 2009 08:57:21 GMT
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java?rev=887117&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
(added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
Fri Dec  4 08:56:50 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.advanced;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.internal.QueueSupport;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ThreadDispatchQueue implements DispatchQueue {
+
+    private final String label;
+    private final DispatcherThread dispatcher;
+    private final DispatchQueuePriority priority;
+    
+    public ThreadDispatchQueue(DispatcherThread dispatcher, DispatchQueuePriority priority)
{
+        this.priority = priority;
+        this.label=priority.toString()+" "+dispatcher.getName();
+        this.dispatcher = dispatcher;
+    }
+
+    public String getLabel() {
+        return label;
+    }
+
+    public void dispatchAsync(Runnable runnable) {
+        dispatcher.execute(runnable, priority.ordinal());
+    }
+
+    public void dispatchAfter(long delayMS, Runnable runnable) {
+        dispatcher.schedule(runnable, priority.ordinal(), delayMS, TimeUnit.MILLISECONDS);
+    }
+
+    public void dispatchSync(final Runnable runnable) throws InterruptedException {
+        dispatchApply(1, runnable);
+    }
+    
+    public void dispatchApply(int iterations, final Runnable runnable) throws InterruptedException
{
+        QueueSupport.dispatchApply(this, iterations, runnable);
+    }
+
+    public void resume() {
+        throw new UnsupportedOperationException();
+    }
+
+    public void suspend() {
+        throw new UnsupportedOperationException();
+    }
+
+    public <Context> Context getContext() {
+        throw new UnsupportedOperationException();
+    }
+
+    public <Context> void setContext(Context context) {
+        throw new UnsupportedOperationException();
+    }
+
+    public void setFinalizer(Runnable finalizer) {
+        throw new UnsupportedOperationException();
+    }
+
+    public void setTargetQueue(DispatchQueue queue) {
+        throw new UnsupportedOperationException();
+    }
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/Dispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/Dispatcher.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/Dispatcher.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/Dispatcher.java
Fri Dec  4 08:56:50 2009
@@ -19,14 +19,16 @@
 
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import org.apache.activemq.dispatch.DispatchSystem;
+
 /**
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 final public class Dispatcher extends Thread {
-    private final SimpleDispatchSystem system;
+    private final SimpleDispatchSPI system;
     
-    public Dispatcher(SimpleDispatchSystem javaDispatchSystem, int ordinal) {
+    public Dispatcher(SimpleDispatchSPI javaDispatchSystem, int ordinal) {
         system = javaDispatchSystem;
         setName("dispatcher:"+(ordinal+1));
         setDaemon(true);
@@ -38,7 +40,7 @@
         GlobalDispatchQueue[] dispatchQueues = system.globalQueues;
         while( true ) {
             for (GlobalDispatchQueue queue : dispatchQueues) {
-                SimpleDispatchSystem.CURRENT_QUEUE.set(queue);
+                DispatchSystem.CURRENT_QUEUE.set(queue);
                 ConcurrentLinkedQueue<Runnable> runnables = queue.runnables;
                 Runnable runnable;
                 while( (runnable = runnables.poll())!=null ) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
Fri Dec  4 08:56:50 2009
@@ -20,6 +20,7 @@
 
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.internal.QueueSupport;
 
 /**
  * 
@@ -27,11 +28,11 @@
  */
 public class GlobalDispatchQueue implements DispatchQueue {
 
-    private final SimpleDispatchSystem system;
+    private final SimpleDispatchSPI system;
     final String label;
     final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
     
-    public GlobalDispatchQueue(SimpleDispatchSystem system, DispatchQueuePriority priority)
{
+    public GlobalDispatchQueue(SimpleDispatchSPI system, DispatchQueuePriority priority)
{
         this.system = system;
         this.label=priority.toString();
     }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java
(from r886965, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSystem.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSystem.java&r1=886965&r2=887117&rev=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSystem.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java
Fri Dec  4 08:56:50 2009
@@ -22,6 +22,8 @@
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchSource;
 import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.DispatchSystem.DispatchSPI;
+import org.apache.activemq.dispatch.internal.SerialDispatchQueue;
 
 import static org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority.*;
 
@@ -31,10 +33,8 @@
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class SimpleDispatchSystem {
-    
-    static final ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
-    
+public class SimpleDispatchSPI extends DispatchSPI {
+        
     final SerialDispatchQueue mainQueue = new SerialDispatchQueue("main");
     final GlobalDispatchQueue globalQueues[]; 
     final Dispatcher dispatchers[];
@@ -42,7 +42,7 @@
     private final Object wakeupMutex = new Object();
     final AtomicLong globalQueuedRunnables = new AtomicLong();
     
-    public SimpleDispatchSystem(int size) {
+    public SimpleDispatchSPI(int size) {
         globalQueues = new GlobalDispatchQueue[3];
         for (int i = 0; i < 3; i++) {
             globalQueues[i] = new GlobalDispatchQueue(this, DispatchQueuePriority.values()[i]
);
@@ -70,10 +70,6 @@
         return rc;
     }
     
-    public DispatchQueue getCurrentQueue() {
-        return CURRENT_QUEUE.get();
-    }
-    
     public void dispatchMain() {
         mainQueue.run();
     }
@@ -81,7 +77,7 @@
     public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue
queue) {
         return null;
     }
-
+    
     public void waitForWakeup() throws InterruptedException {
         while( globalQueuedRunnables.get()==0 ) {
             synchronized(wakeupMutex) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
Fri Dec  4 08:56:50 2009
@@ -18,8 +18,9 @@
 
 import java.util.concurrent.CountDownLatch;
 
-import org.apache.activemq.dispatch.internal.advanced.AdancedDispatchSystem;
-import org.apache.activemq.dispatch.internal.simple.SimpleDispatchSystem;
+import org.apache.activemq.dispatch.DispatchSystem.DispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.simple.SimpleDispatchSPI;
 
 import static java.lang.String.*;
 import static org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority.*;
@@ -31,22 +32,22 @@
 public class DispatchSystemTest {
     
     public static void main(String[] args) throws Exception {
-        AdancedDispatchSystem advancedSystem = new AdancedDispatchSystem(Runtime.getRuntime().availableProcessors());
-        benchmark("advanced private serial queue", advancedSystem.createQueue("test"));
-        benchmark("advanced global queue", advancedSystem.getGlobalQueue(DEFAULT));
-
-        SimpleDispatchSystem simpleSystem = new SimpleDispatchSystem(Runtime.getRuntime().availableProcessors());
-        benchmark("advanced private serial queue", simpleSystem.createQueue("test"));
-        benchmark("advancedglobal queue", simpleSystem.getGlobalQueue(DEFAULT));
+        DispatchSPI advancedSystem = new AdancedDispatchSPI(Runtime.getRuntime().availableProcessors());
+        benchmark("advanced private serial queue", advancedSystem, advancedSystem.createQueue("test"));
+        benchmark("advanced global queue", advancedSystem, advancedSystem.getGlobalQueue(DEFAULT));
+
+        DispatchSPI simpleSystem = new SimpleDispatchSPI(Runtime.getRuntime().availableProcessors());
+        benchmark("simple private serial queue", simpleSystem, simpleSystem.createQueue("test"));
+        benchmark("simple global queue", simpleSystem, simpleSystem.getGlobalQueue(DEFAULT));
     }
 
-    private static void benchmark(String name, DispatchQueue queue) throws InterruptedException
{
+    private static void benchmark(String name, DispatchSPI spi, DispatchQueue queue) throws
InterruptedException {
         // warm the JIT up..
-        benchmarkWork(queue, 100000);
+        benchmarkWork(spi, queue, 100000);
         
         int iterations = 1000*1000*20;
         long start = System.nanoTime();
-        benchmarkWork(queue, iterations);
+        benchmarkWork(spi, queue, iterations);
         long end = System.nanoTime();
         
         double durationMS = 1.0d*(end-start)/1000000d;
@@ -55,13 +56,13 @@
         System.out.println(format("name: %s, duration: %,.3f ms, rate: %,.2f executions/sec",
name, durationMS, rate));
     }
 
-    private static void benchmarkWork(final DispatchQueue queue, int iterations) throws InterruptedException
{
+    private static void benchmarkWork(final DispatchSPI spi, final DispatchQueue queue, int
iterations) throws InterruptedException {
         final CountDownLatch counter = new CountDownLatch(iterations);
         Runnable task = new Runnable(){
             public void run() {
                 counter.countDown();
                 if( counter.getCount()>0 ) {
-                    queue.dispatchAsync(this);
+                    DispatchSystem.getCurrentQueue().dispatchAsync(this);
                 }
             }
         };

Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java
Fri Dec  4 08:56:50 2009
@@ -22,8 +22,8 @@
 import javax.xml.bind.annotation.XmlRootElement;
 
 import org.apache.activemq.apollo.broker.Broker;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
-import org.apache.activemq.dispatch.internal.advanced.PriorityDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 
 @XmlRootElement(name="dispatcher")
 @XmlAccessorType(XmlAccessType.FIELD)
@@ -36,12 +36,12 @@
 	@XmlAttribute(required=false)
 	int threads = Runtime.getRuntime().availableProcessors();
 	
-	public IDispatcher createDispatcher(BrokerXml brokerXml) {
+	public Dispatcher createDispatcher(BrokerXml brokerXml) {
 		if( name == null ) {
 //			VirtualHostXml vh = brokerXml.getDefaultVirtualHost();
 			name = "Broker: ";
 		}
-		return PriorityDispatcher.createPriorityDispatchPool(name, maxPriority, threads);
+		return DispatcherThread.createPriorityDispatchPool(name, maxPriority, threads);
 	}
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java
Fri Dec  4 08:56:50 2009
@@ -26,7 +26,7 @@
 import org.apache.activemq.apollo.broker.Broker;
 import org.apache.activemq.apollo.broker.BrokerFactory;
 import org.apache.activemq.broker.store.memory.MemoryStore;
-import org.apache.activemq.dispatch.internal.advanced.AbstractPooledDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.DispatcherPool;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.junit.Test;
@@ -44,7 +44,7 @@
 		LOG.info("Loading broker configuration from the classpath with URI: " + uri);
 		Broker broker = BrokerFactory.createBroker(uri);
 		
-		AbstractPooledDispatcher p = (AbstractPooledDispatcher)broker.getDispatcher();
+		DispatcherPool p = (DispatcherPool)broker.getDispatcher();
 		assertEquals(4, p.getSize());
 		assertEquals("test dispatcher", p.getName());
 		

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
Fri Dec  4 08:56:50 2009
@@ -39,10 +39,10 @@
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
-import org.apache.activemq.dispatch.internal.advanced.PriorityDispatcher;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher.Dispatchable;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher.Dispatchable;
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
@@ -66,7 +66,7 @@
 
     private static int PERFORMANCE_SAMPLES = 5;
 
-    IDispatcher dispatcher;
+    Dispatcher dispatcher;
     BrokerDatabase database;
     BrokerQueueStore queueStore;
     private static final boolean USE_KAHA_DB = true;
@@ -83,11 +83,11 @@
     protected ArrayList<Producer> producers = new ArrayList<Producer>();
     protected ArrayList<IQueue<Long, MessageDelivery>> queues = new ArrayList<IQueue<Long,
MessageDelivery>>();
 
-    protected IDispatcher createDispatcher() {
+    protected Dispatcher createDispatcher() {
         if (THREAD_POOL_SIZE > 1) {
-            return PriorityDispatcher.createPriorityDispatchPool("TestDispatcher", Broker.MAX_PRIORITY,
THREAD_POOL_SIZE);
+            return DispatcherThread.createPriorityDispatchPool("TestDispatcher", Broker.MAX_PRIORITY,
THREAD_POOL_SIZE);
         } else {
-            return PriorityDispatcher.createPriorityDispatcher("TestDispatcher", Broker.MAX_PRIORITY);
+            return DispatcherThread.createPriorityDispatcher("TestDispatcher", Broker.MAX_PRIORITY);
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
Fri Dec  4 08:56:50 2009
@@ -19,9 +19,9 @@
 import java.util.ArrayList;
 import java.util.Collection;
 
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher.Dispatchable;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher.Dispatchable;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 
 /**
@@ -32,7 +32,7 @@
  */
 public abstract class AbstractFlowQueue<E> extends AbstractFlowRelay<E> implements
FlowControllable<E>, IFlowQueue<E>, Dispatchable {
 
-    protected IDispatcher dispatcher;
+    protected Dispatcher dispatcher;
     protected DispatchContext dispatchContext;
     protected Collection<IPollableFlowSource.FlowReadyListener<E>> readyListeners;
     private boolean notifyReady = false;
@@ -141,7 +141,7 @@
      * @param dispatcher
      *            The dispatcher to handle messages.
      */
-    public synchronized void setDispatcher(IDispatcher dispatcher) {
+    public synchronized void setDispatcher(Dispatcher dispatcher) {
         this.dispatcher = dispatcher;
         dispatchContext = dispatcher.register(this, getResourceName());
         dispatchContext.updatePriority(dispatchPriority);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java
Fri Dec  4 08:56:50 2009
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.queue;
 
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
 import org.apache.activemq.flow.IFlowRelay;
 
 public interface IFlowQueue<E> extends IBlockingFlowSource<E>, IPollableFlowSource<E>,
IFlowRelay<E> {
@@ -57,7 +57,7 @@
      * @param dispatcher
      *            The dispatcher to be used by the queue.
      */
-    public void setDispatcher(IDispatcher dispatcher);
+    public void setDispatcher(Dispatcher dispatcher);
 
     /**
      * Sets the base dispatch priority for the queue. Setting to higher value

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
Fri Dec  4 08:56:50 2009
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.queue;
 
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
 import org.apache.activemq.queue.QueueStore.PersistentQueue;
 import org.apache.activemq.util.Mapper;
 
@@ -47,7 +47,7 @@
      * @param dispatcher
      *            The dispatcher to be used by the queue.
      */
-    public void setDispatcher(IDispatcher dispatcher);
+    public void setDispatcher(Dispatcher dispatcher);
 
     /**
      * Sets the base dispatch priority for the queue. Setting to higher value

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
Fri Dec  4 08:56:50 2009
@@ -20,7 +20,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.buffer.AsciiBuffer;
@@ -30,7 +30,7 @@
     protected HashSet<Subscription<V>> subscriptions = new HashSet<Subscription<V>>();
     private HashMap<Integer, IQueue<K, V>> partitions = new HashMap<Integer,
IQueue<K, V>>();
     protected QueueStore<K, V> store;
-    protected IDispatcher dispatcher;
+    protected Dispatcher dispatcher;
     protected boolean started;
     protected boolean shutdown = false;
     protected QueueDescriptor queueDescriptor;
@@ -239,7 +239,7 @@
         this.autoRelease = autoRelease;
     }
 
-    public void setDispatcher(IDispatcher dispatcher) {
+    public void setDispatcher(Dispatcher dispatcher) {
         checkShutdown();
         this.dispatcher = dispatcher;
         synchronized (this) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java
Fri Dec  4 08:56:50 2009
@@ -10,7 +10,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
 import org.apache.activemq.flow.AbstractLimiter;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.IFlowLimiter;
@@ -65,12 +65,12 @@
     private final int inputWindowSize = 1000;
     private final int inputResumeThreshold = 500;
 
-    private IDispatcher dispatcher;
+    private Dispatcher dispatcher;
     private final AtomicBoolean stopping = new AtomicBoolean(false);
     protected boolean blockingTransport = false;
     ExecutorService blockingWriter;
 
-    public static void setInShutdown(boolean val, IDispatcher dispatcher) {
+    public static void setInShutdown(boolean val, Dispatcher dispatcher) {
         if (val != inShutdown.getAndSet(val)) {
             if (val) {
                 if (USE_RATE_BASED_LIMITER) {
@@ -274,11 +274,11 @@
         this.priorityLevels = priorityLevels;
     }
 
-    public IDispatcher getDispatcher() {
+    public Dispatcher getDispatcher() {
         return dispatcher;
     }
 
-    public void setDispatcher(IDispatcher dispatcher) {
+    public void setDispatcher(Dispatcher dispatcher) {
         this.dispatcher = dispatcher;
     }
 
@@ -455,12 +455,12 @@
 
     protected static class RateBasedLimiterCollector implements Runnable {
 
-        private IDispatcher dispatcher;
+        private Dispatcher dispatcher;
         private int samplingPeriod = 50;
         private boolean scheduled = false;
         private HashSet<RateBasedLimiter> limiters = new HashSet<RateBasedLimiter>();
 
-        public synchronized void setDispatcher(IDispatcher d) {
+        public synchronized void setDispatcher(Dispatcher d) {
             if (d != dispatcher) {
                 scheduled = false;
                 dispatcher = d;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
Fri Dec  4 08:56:50 2009
@@ -22,8 +22,8 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.dispatch.internal.advanced.DispatcherAware;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
-import org.apache.activemq.dispatch.internal.advanced.PriorityDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.transport.Transport;
@@ -53,7 +53,7 @@
     private TransportServer transportServer;
     private String uri;
     private String name;
-    protected IDispatcher dispatcher;
+    protected Dispatcher dispatcher;
     private final AtomicBoolean stopping = new AtomicBoolean();
     private boolean useInputQueues = false;
 
@@ -159,7 +159,7 @@
         error.printStackTrace();
     }
 
-    public IDispatcher getDispatcher() {
+    public Dispatcher getDispatcher() {
         return dispatcher;
     }
 
@@ -167,7 +167,7 @@
         this.name = name;
     }
 
-    public void setDispatcher(IDispatcher dispatcher) {
+    public void setDispatcher(Dispatcher dispatcher) {
         this.dispatcher = dispatcher;
     }
 
@@ -189,7 +189,7 @@
 
     protected void createDispatcher() {
         if (dispatcher == null) {
-            dispatcher = PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher",
Message.MAX_PRIORITY, Runtime.getRuntime().availableProcessors());
+            dispatcher = DispatcherThread.createPriorityDispatchPool("BrokerDispatcher",
Message.MAX_PRIORITY, Runtime.getRuntime().availableProcessors());
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
Fri Dec  4 08:56:50 2009
@@ -20,8 +20,8 @@
 
 import junit.framework.TestCase;
 
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
-import org.apache.activemq.dispatch.internal.advanced.PriorityDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.Commands.Destination.DestinationBean;
 import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
@@ -63,7 +63,7 @@
     protected MockBroker rcvBroker;
     protected MockClient client;
 
-    protected IDispatcher dispatcher;
+    protected Dispatcher dispatcher;
 
     static public final Mapper<Long, Message> KEY_MAPPER = new Mapper<Long, Message>()
{
         public Long map(Message element) {
@@ -94,8 +94,8 @@
         }
     }
 
-    protected IDispatcher createDispatcher(String name) {
-        return PriorityDispatcher.createPriorityDispatchPool(name, Message.MAX_PRIORITY,
threadsPerDispatcher);
+    protected Dispatcher createDispatcher(String name) {
+        return DispatcherThread.createPriorityDispatchPool(name, Message.MAX_PRIORITY, threadsPerDispatcher);
     }
 
     public void test_1_1_0() throws Exception {
@@ -284,7 +284,7 @@
             }
         }
 
-        IDispatcher clientDispatcher = null;
+        Dispatcher clientDispatcher = null;
         if (SEPARATE_CLIENT_DISPATCHER) {
             clientDispatcher = createDispatcher("ClientDispatcher");
             clientDispatcher.start();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
Fri Dec  4 08:56:50 2009
@@ -7,8 +7,8 @@
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
-import org.apache.activemq.dispatch.internal.advanced.PriorityDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.Commands.Destination.DestinationBean;
 import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
@@ -43,7 +43,7 @@
     protected ArrayList<MetricCounter> additionalReportMetrics = new ArrayList<MetricCounter>();
     protected boolean includeDetailedRates = false;
 
-    protected IDispatcher dispatcher;
+    protected Dispatcher dispatcher;
 
     public RemoteConsumer consumer(int index) {
         return consumers.get(index);
@@ -214,7 +214,7 @@
         return testName;
     }
 
-    public void setDispatcher(IDispatcher dispatcher) {
+    public void setDispatcher(Dispatcher dispatcher) {
         this.dispatcher = dispatcher;
     }
 
@@ -274,13 +274,13 @@
         }
     }
 
-    public IDispatcher getDispatcher() {
+    public Dispatcher getDispatcher() {
         return dispatcher;
     }
 
-    protected IDispatcher createDispatcher() {
+    protected Dispatcher createDispatcher() {
         if (dispatcher == null) {
-            dispatcher = PriorityDispatcher.createPriorityDispatchPool("ClientDispatcher",
numPriorities, threadsPerDispatcher);
+            dispatcher = DispatcherThread.createPriorityDispatchPool("ClientDispatcher",
numPriorities, threadsPerDispatcher);
         }
         return dispatcher;
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
Fri Dec  4 08:56:50 2009
@@ -2,8 +2,8 @@
 
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher.Dispatchable;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher.Dispatchable;
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
Fri Dec  4 08:56:50 2009
@@ -13,9 +13,9 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher.Dispatchable;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher.Dispatchable;
 import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
@@ -76,7 +76,7 @@
             }
         }
 
-        public void setDispatcher(IDispatcher dispatcher) {
+        public void setDispatcher(Dispatcher dispatcher) {
             readContext = dispatcher.register(this, name);
         }
 



Mime
View raw message