activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r887251 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/test/java/org/apache/activemq/broker/ activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ activemq-dispatcher/src/test/java/org/apache/a...
Date Fri, 04 Dec 2009 16:20:35 GMT
Author: chirino
Date: Fri Dec  4 16:20:34 2009
New Revision: 887251

URL: http://svn.apache.org/viewvc?rev=887251&view=rev
Log:
Eliminated the Dispatchable class to simpliy interfaces in the advanced package.


Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispachableAdapter.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatchable.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=887251&r1=887250&r2=887251&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
Fri Dec  4 16:20:34 2009
@@ -7,7 +7,6 @@
 import org.apache.activemq.apollo.broker.Destination;
 import org.apache.activemq.apollo.broker.MessageDelivery;
 import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatchable;
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.ISinkController;
@@ -16,7 +15,7 @@
 import org.apache.activemq.metric.MetricCounter;
 import org.apache.activemq.transport.TransportFactory;
 
-abstract public class RemoteProducer extends Connection implements Dispatchable, FlowUnblockListener<MessageDelivery>
{
+abstract public class RemoteProducer extends Connection implements FlowUnblockListener<MessageDelivery>
{
 
     protected final MetricCounter rate = new MetricCounter();
 
@@ -58,10 +57,38 @@
         
         setupProducer();
         
-        dispatchContext = getDispatcher().register(this, name + "-client");
+        dispatchContext = getDispatcher().register(new Runnable(){
+            public void run() {
+                dispatch();
+            }
+        }, name + "-client");
         dispatchContext.requestDispatch();
 
     }
+    
+    public void dispatch() {
+        while(true)
+        {
+            
+            if(next == null)
+            {
+                createNextMessage();
+            }
+            
+            //If flow controlled stop until flow control is lifted.
+            if(outboundController.isSinkBlocked())
+            {
+                if(outboundController.addUnblockListener(this))
+                {
+                    return;
+                }
+            }
+            
+            outboundQueue.add(next, null);
+            rate.increment();
+            next = null;
+        }
+    }
 
     abstract protected void setupProducer() throws Exception;
     
@@ -77,30 +104,6 @@
 		dispatchContext.requestDispatch();
 	}
 
-	public boolean dispatch() {
-		while(true)
-		{
-			
-			if(next == null)
-			{
-	            createNextMessage();
-			}
-	        
-			//If flow controlled stop until flow control is lifted.
-			if(outboundController.isSinkBlocked())
-			{
-				if(outboundController.addUnblockListener(this))
-				{
-					return true;
-				}
-			}
-			
-			outboundQueue.add(next, null);
-	        rate.increment();
-	        next = null;
-		}
-	}
-
     protected String createPayload() {
         if( payloadSize>=0 ) {
             StringBuilder sb = new StringBuilder(payloadSize);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java?rev=887251&r1=887250&r2=887251&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java
Fri Dec  4 16:20:34 2009
@@ -22,17 +22,15 @@
 public interface Dispatcher extends Executor {
 
     /**
-     * Registers a {@link Dispatchable} with this dispatcher, and returns a
+     * Registers a {@link Runnable} with this dispatcher, and returns a
      * {@link DispatchContext} that the caller can use to request dispatch.
      * 
-     * @param dispatchable
-     *            The {@link Dispatchable}
+     * @param runnable
+     *            The {@link Runnable}
      * @param name
      *            An identifier for the dispatcher.
      * @return A {@link DispatchContext} that can be used to request dispatch
      */
-    public DispatchContext register(Dispatchable dispatchable, String name);
-    
     public DispatchContext register(Runnable runnable, String name);
 
     /**

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java?rev=887251&r1=887250&r2=887251&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java
Fri Dec  4 16:20:34 2009
@@ -163,10 +163,6 @@
         }
     }
 
-    public DispatchContext register(Dispatchable dispatchable, String name) {
-        return chooseDispatcher().register(dispatchable, name);
-    }
-
     public DispatchContext register(Runnable runnable, String name) {
         return chooseDispatcher().register(runnable, name);
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=887251&r1=887250&r2=887251&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
Fri Dec  4 16:20:34 2009
@@ -144,10 +144,6 @@
         }
     }
 
-    public DispatchContext register(Dispatchable dispatchable, String name) {
-        return register(new DispachableAdapter(dispatcherPool, dispatchable), name);
-    }
-    
     public DispatchContext register(Runnable runnable, String name) {
         return new PriorityDispatchContext(runnable, true, name);
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java?rev=887251&r1=887250&r2=887251&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
Fri Dec  4 16:20:34 2009
@@ -55,7 +55,7 @@
         counter.await();
     }
     
-    private static final class Work implements Dispatchable {
+    private static final class Work implements Runnable {
         private final CountDownLatch counter;
         private final DispatchContext context;
 
@@ -64,12 +64,11 @@
             this.context = pooledDispatcher.register(this , "test");
         }
 
-        public boolean dispatch() {
+        public void run() {
             counter.countDown();
             if( counter.getCount()>0 ) {
                 context.requestDispatch();
             }
-            return true;
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=887251&r1=887250&r2=887251&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
Fri Dec  4 16:20:34 2009
@@ -289,7 +289,7 @@
         }
     }
 
-    class Producer implements Dispatchable, FlowUnblockListener<OpenWireMessageDelivery>
{
+    class Producer implements FlowUnblockListener<OpenWireMessageDelivery> {
         private AtomicBoolean stopped = new AtomicBoolean(false);
         private String name;
         protected final MetricCounter sendRate = new MetricCounter();
@@ -312,7 +312,11 @@
             this.name = name;
             sendRate.name("Producer " + name + " Rate");
             totalProducerRate.add(sendRate);
-            dispatchContext = dispatcher.register(this, name);
+            dispatchContext = dispatcher.register(new Runnable(){
+                public void run() {
+                    dispatch();
+                }
+            }, name);
             // create a 1024 byte payload (2 bytes per char):
             payload = new String(new byte[512]);
             producerId = new ProducerId(name);
@@ -362,18 +366,18 @@
             stopped.set(true);
         }
 
-        public boolean dispatch() {
+        public void dispatch() {
             // If flow controlled stop until flow control is lifted.
             if (outboundController.isSinkBlocked()) {
                 if (outboundController.addUnblockListener(this)) {
-                    return true;
+                    return;
                 }
             }
 
             if( TEST_MAX_STORE_LATENCY ) {
             	// We can't send again until we get persist ack.
             	if( waitingForAck.get() ) {
-                    return true;
+                    return;
             	}
             }
             
@@ -392,14 +396,16 @@
                 } catch (JMSException e) {
                     e.printStackTrace();
                     stopped.set(true);
-                    return true;
+                    return;
                 }
             }
 
             sendRate.increment();
             outboundQueue.add(next, null);
             next = null;
-            return stopped.get();
+            if ( !stopped.get() ) {
+                dispatchContext.requestDispatch();
+            }
         }
 
         private OpenWireMessageDelivery createNextMessage() throws JMSException {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=887251&r1=887250&r2=887251&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
Fri Dec  4 16:20:34 2009
@@ -20,17 +20,16 @@
 import java.util.Collection;
 
 import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatchable;
 import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 
 /**
- * Base class for a {@link Dispatchable} {@link FlowControllable}
+ * Base class for a {@link FlowControllable}
  * {@link IFlowQueue}.
  * 
  * @param <E>
  */
-public abstract class AbstractFlowQueue<E> extends AbstractFlowRelay<E> implements
FlowControllable<E>, IFlowQueue<E>, Dispatchable {
+public abstract class AbstractFlowQueue<E> extends AbstractFlowRelay<E> implements
FlowControllable<E>, IFlowQueue<E> {
 
     protected Dispatcher dispatcher;
     protected DispatchContext dispatchContext;
@@ -58,14 +57,6 @@
         this.listener = listener;
     }
 
-    public final boolean dispatch() {
-
-        // while (pollingDispatch());
-        // return true;
-
-        return !pollingDispatch();
-    }
-
     protected final FlowControllable<E> getFlowControllableHook() {
         return this;
     }
@@ -143,7 +134,12 @@
      */
     public synchronized void setDispatcher(Dispatcher dispatcher) {
         this.dispatcher = dispatcher;
-        dispatchContext = dispatcher.register(this, getResourceName());
+        dispatchContext = dispatcher.register(new Runnable(){
+            public void run() {
+                if( pollingDispatch() ) {
+                    dispatchContext.requestDispatch();
+                }
+            }}, getResourceName());
         dispatchContext.updatePriority(dispatchPriority);
         super.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities()
- 1));
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java?rev=887251&r1=887250&r2=887251&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
Fri Dec  4 16:20:34 2009
@@ -3,7 +3,6 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatchable;
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;
@@ -12,7 +11,7 @@
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.MetricCounter;
 
-public class RemoteProducer extends ClientConnection implements Dispatchable, FlowUnblockListener<Message>
{
+public class RemoteProducer extends ClientConnection implements FlowUnblockListener<Message>
{
 
     private final MetricCounter rate = new MetricCounter();
 
@@ -46,7 +45,11 @@
 
         super.start();
         outboundController = outputQueue.getFlowController(outboundFlow);
-        dispatchContext = getDispatcher().register(this, name + "-client");
+        dispatchContext = getDispatcher().register(new Runnable() {
+            public void run() {
+                dispatch();
+            }
+        }, name + "-client");
         dispatchContext.requestDispatch();
     }
 
@@ -59,7 +62,7 @@
         dispatchContext.requestDispatch();
     }
 
-    public boolean dispatch() {
+    public void dispatch() {
         while (true) {
 
             if (next == null) {
@@ -77,14 +80,14 @@
             // If flow controlled stop until flow control is lifted.
             if (outboundController.isSinkBlocked()) {
                 if (outboundController.addUnblockListener(this)) {
-                    return true;
+                    return;
                 }
             }
 
             getSink().add(next, null);
             rate.increment();
             next = null;
-            return false;
+            dispatchContext.requestDispatch();
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=887251&r1=887250&r2=887251&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
Fri Dec  4 16:20:34 2009
@@ -14,7 +14,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatchable;
 import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
 import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.FutureResponse;
@@ -37,7 +36,7 @@
 
     static protected final HashMap<String, PipeTransportServer> servers = new HashMap<String,
PipeTransportServer>();
 
-    protected static class PipeTransport implements DispatchableTransport, Dispatchable,
Runnable, ReadReadyListener<Object> {
+    protected static class PipeTransport implements DispatchableTransport, Runnable, ReadReadyListener<Object>
{
 
         private final Pipe<Object> pipe;
         private TransportListener listener;
@@ -77,7 +76,11 @@
         }
 
         public void setDispatcher(Dispatcher dispatcher) {
-            readContext = dispatcher.register((Dispatchable)this, name);
+            readContext = dispatcher.register(new Runnable() {
+                public void run() {
+                    dispatch();
+                }
+            }, name);
         }
 
         public void onReadReady(Pipe<Object> pipe) {
@@ -108,13 +111,13 @@
              */
         }
 
-        public boolean dispatch() {
+        public void dispatch() {
             while (true) {
                 try {
                     Object o = pipe.poll();
                     if (o == null) {
                         pipe.setReadReadyListener(this);
-                        return true;
+                        return;
                     } else {
                     	if(o == EOF_TOKEN) {
                     		throw new EOFException();
@@ -124,7 +127,8 @@
                         } else {
                             listener.onCommand(o);
                         }
-                        return false;
+                        readContext.requestDispatch();
+                        return;
                     }
                 } catch (IOException e) {
                     listener.onException(e);



Mime
View raw message