activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r891866 [1/2] - in /activemq/sandbox/activemq-apollo-actor: activemq-dispatcher/src/main/java/org/apache/activemq/actor/ activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/ activemq-dispatcher/src/main/java/org/apache/activemq/d...
Date Thu, 17 Dec 2009 20:21:06 GMT
Author: chirino
Date: Thu Dec 17 20:21:04 2009
New Revision: 891866

URL: http://svn.apache.org/viewvc?rev=891866&view=rev
Log:
adding a broker use case perf benchmark to exersise actor thread model. 

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java
      - copied, changed from r891451, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/AsmActor.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableSupport.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorProxyTest.java
      - copied, changed from r891451, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DeliveryTarget.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Message.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MessageGenerator.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Router.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/Transport.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactorySystem.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportHandler.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServer.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServerHandler.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/Actor.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/AsmActor.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/Message.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableCountDownLatch.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/IPizzaService.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/PizzaServiceCustomProxy.java

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java (from r891451, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/AsmActor.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/AsmActor.java&r1=891451&r2=891866&rev=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/AsmActor.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java Thu Dec 17 20:21:04 2009
@@ -16,7 +16,7 @@
 
 import static org.objectweb.asm.ClassWriter.*;
 
-public class AsmActor implements Opcodes {
+public class ActorProxy {
 
     public static <T> T create(Class<T> interfaceClass, T target, DispatchQueue queue) throws IllegalArgumentException {
         return create(target.getClass().getClassLoader(), interfaceClass, target, queue);
@@ -46,10 +46,10 @@
     }
 
     static private String proxyName(Class<?> clazz) {
-        return "org.apache.activemq.actor.generated."+clazz.getName();
+        return clazz.getName()+"$__ACTOR_PROXY__";
     }
     
-    private static final class Generator<T> {
+    private static final class Generator<T> implements Opcodes {
         
         private static final String RUNNABLE = "java/lang/Runnable";
         private static final String OBJECT_CLASS = "java/lang/Object";

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java Thu Dec 17 20:21:04 2009
@@ -17,6 +17,10 @@
 
 package org.apache.activemq.dispatch;
 
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 public enum DispatchOption {
     /**
      * Updates the target queue to be the

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java Thu Dec 17 20:21:04 2009
@@ -17,6 +17,10 @@
 
 package org.apache.activemq.dispatch;
 
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 public enum DispatchPriority {
     HIGH,
     DEFAULT,

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java Thu Dec 17 20:21:04 2009
@@ -19,7 +19,10 @@
 
 import java.nio.channels.SelectableChannel;
 
-
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 public interface Dispatcher extends Retained {
 
     public DispatchQueue getGlobalQueue();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java Thu Dec 17 20:21:04 2009
@@ -1,11 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.activemq.dispatch;
 
 
 /**
  * Handy interface to signal classes which would like an Dispatcher instance
  * injected into them.
- *  
- * @author chirino
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public interface DispatcherAware {
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java Thu Dec 17 20:21:04 2009
@@ -19,6 +19,10 @@
 
 import org.apache.activemq.dispatch.internal.simple.DispatcherThread;
 
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 public interface DispatcherObserver {
     
     public void onThreadCreate(DispatcherThread thread);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java Thu Dec 17 20:21:04 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.dispatch.internal;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.activemq.dispatch.DispatchObject;
 import org.apache.activemq.dispatch.DispatchQueue;
 
@@ -27,6 +29,7 @@
 
     protected volatile Object context;
     protected volatile DispatchQueue targetQueue;
+    protected final AtomicInteger suspendCounter = new AtomicInteger();
 
     @SuppressWarnings("unchecked")
     public <Context> Context getContext() {
@@ -45,5 +48,17 @@
         return this.targetQueue;
     }
     
+    public void resume() {
+        if( suspendCounter.decrementAndGet() == 0 ) {
+            onResume();
+        }
+    }
+
+    public void suspend() {
+        suspendCounter.incrementAndGet();
+    }
+    
+    protected void onResume() {
+    }
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java Thu Dec 17 20:21:04 2009
@@ -36,7 +36,6 @@
 abstract public class AbstractSerialDispatchQueue extends AbstractDispatchObject implements DispatchQueue, Runnable {
 
     protected final String label;
-    protected final AtomicInteger suspendCounter = new AtomicInteger();
     protected final AtomicInteger executeCounter = new AtomicInteger();
     
     protected final AtomicLong externalQueueSize = new AtomicLong();
@@ -64,14 +63,9 @@
         return label;
     }
 
-    public void resume() {
-        if( suspendCounter.decrementAndGet() == 0 ) {
-            dispatchSelfAsync();
-        }
-    }
-
-    public void suspend() {
-        suspendCounter.incrementAndGet();
+    @Override
+    protected void onResume() {
+        dispatchSelfAsync();
     }
 
     public void execute(Runnable command) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java Thu Dec 17 20:21:04 2009
@@ -1,12 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.activemq.dispatch.internal;
 
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 public class BaseRetained {
     
     final protected AtomicInteger retainCounter = new AtomicInteger(0);
-    final protected ArrayList<Runnable> shutdownHandlers = new ArrayList<Runnable>();
+    final protected ArrayList<Runnable> shutdownHandlers = new ArrayList<Runnable>(1);
 
     public void addShutdownWatcher(Runnable shutdownHandler) {
         synchronized(shutdownHandlers) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableCountDownLatch.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableCountDownLatch.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableCountDownLatch.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableCountDownLatch.java Thu Dec 17 20:21:04 2009
@@ -19,6 +19,10 @@
 
 import java.util.concurrent.CountDownLatch;
 
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 public class RunnableCountDownLatch extends CountDownLatch implements Runnable {
     public RunnableCountDownLatch(int count) {
         super(count);

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableSupport.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableSupport.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableSupport.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class RunnableSupport {
+
+    private static Runnable NO_OP = new Runnable() {
+        public void run() {
+        }
+    };
+    
+    public static Runnable runNoop() {
+        return NO_OP;
+    }
+    
+    public static Runnable runOnceAfter(final Runnable runnable, int count) {
+        if( count <= 0 || runnable==null ) {
+            return NO_OP;
+        }
+        if( count == 1 ) {
+            return runnable;
+        }
+        final AtomicInteger counter = new AtomicInteger(count);
+        return new Runnable() {
+            public void run() {
+                if( counter.decrementAndGet()==0 ) {
+                    runnable.run();
+                }
+            }
+        };
+    }
+    
+    public static Runnable runAfter(final Runnable runnable, int count) {
+        if( count <= 0 || runnable==null ) {
+            return NO_OP;
+        }
+        if( count == 1 ) {
+            return runnable;
+        }
+        final AtomicInteger counter = new AtomicInteger(count);
+        return new Runnable() {
+            public void run() {
+                if( counter.decrementAndGet()<=0 ) {
+                    runnable.run();
+                }
+            }
+        };
+    }
+    
+    public static Runnable runOnceAfter(final DispatchQueue queue, final Runnable runnable, int count) {
+        if( count <= 0 || runnable==null ) {
+            return NO_OP;
+        }
+        final AtomicInteger counter = new AtomicInteger(count);
+        return new Runnable() {
+            public void run() {
+                if( counter.decrementAndGet()==0 ) {
+                    queue.dispatchAsync(runnable);
+                }
+            }
+        };
+    }
+    
+    public static Runnable runAfter(final DispatchQueue queue,  final Runnable runnable, int count) {
+        if( count <= 0 || runnable==null ) {
+            return NO_OP;
+        }
+        final AtomicInteger counter = new AtomicInteger(count);
+        return new Runnable() {
+            public void run() {
+                if( counter.decrementAndGet()<=0 ) {
+                    queue.dispatchAsync(runnable);
+                }
+            }
+        };
+    }
+
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java Thu Dec 17 20:21:04 2009
@@ -1,10 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.activemq.dispatch.internal.simple;
 
-
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ *
+ */
 public class IntegerCounter {
     
     int counter;
 
+    public IntegerCounter() {
+    }
+
+    public IntegerCounter(int count) {
+        this.counter = count;
+    }
+
     @Override
     public boolean equals(Object obj) {
         if (this == obj)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java Thu Dec 17 20:21:04 2009
@@ -24,6 +24,10 @@
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
 
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 public final class SerialDispatchQueue extends AbstractSerialDispatchQueue implements SimpleQueue {
 
     private final SimpleDispatcher dispatcher;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java Thu Dec 17 20:21:04 2009
@@ -1,8 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.activemq.dispatch.internal.simple;
 
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchPriority;
 
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ *
+ */
 public interface SimpleQueue extends DispatchQueue {
 
     DispatchPriority getPriority();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java Thu Dec 17 20:21:04 2009
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.activemq.dispatch.internal.simple;
 
 import java.util.ArrayList;
@@ -8,6 +24,10 @@
 
 import static org.apache.activemq.dispatch.internal.simple.TimerThread.Type.*;
 
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 final public class TimerThread extends Thread {
     enum Type {
         RELATIVE,

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java Thu Dec 17 20:21:04 2009
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.activemq.actor;
 
 import java.util.concurrent.TimeUnit;
@@ -7,26 +23,21 @@
 
 import static java.lang.String.*;
 
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 public class ActorBenchmark {
 
     public static class PizzaService implements IPizzaService {
         long counter;
 
-        @Message
         public void order(long count) {
             counter += count;
         }
     }
 
     @Test
-    public void benchmarkCGLibProxy() throws Exception {
-        String name = "cglib proxy";
-        PizzaService service = new PizzaService();
-        IPizzaService proxy = Actor.create(service, createQueue());
-        benchmark(name, service, proxy);
-    }
-
-    @Test
     public void benchmarkCustomProxy() throws Exception {
         String name = "custom proxy";
         PizzaService service = new PizzaService();
@@ -38,7 +49,7 @@
     public void benchmarkAsmProxy() throws Exception {
         String name = "asm proxy";
         PizzaService service = new PizzaService();
-        IPizzaService proxy = AsmActor.create(IPizzaService.class, service, createQueue());
+        IPizzaService proxy = ActorProxy.create(IPizzaService.class, service, createQueue());
         benchmark(name, service, proxy);
     }
 
@@ -47,12 +58,8 @@
             public void dispatchAsync(Runnable runnable) {
                 runnable.run();
             }
-
             public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
-                throw new RuntimeException("TODO: implement me.");
-                
             }
-
         };
     }
 

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorProxyTest.java (from r891451, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorProxyTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorProxyTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java&r1=891451&r2=891866&rev=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorProxyTest.java Thu Dec 17 20:21:04 2009
@@ -1,23 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.activemq.actor;
 
 import java.util.concurrent.TimeUnit;
 
-import junit.framework.Assert;
-
 import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
 import org.junit.Test;
 
 import static junit.framework.Assert.*;
 
-import static junit.framework.Assert.*;
-
-import static junit.framework.Assert.*;
-
-import static junit.framework.Assert.*;
-
-import static junit.framework.Assert.*;
-
-public class AsmActorTest {
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ActorProxyTest {
 
     public static interface TestInterface {
         void strings(String value, String[] value2);
@@ -65,7 +75,7 @@
             }
         };
         
-        proxy = AsmActor.create(TestInterface.class, service, createQueue());
+        proxy = ActorProxy.create(TestInterface.class, service, createQueue());
         proxy.strings(expected1, expected2);
 
     }
@@ -83,7 +93,7 @@
             }
         };
         
-        proxy = AsmActor.create(TestInterface.class, service, createQueue());
+        proxy = ActorProxy.create(TestInterface.class, service, createQueue());
         proxy.shorts(expected1, expected2);
 
     }
@@ -97,7 +107,7 @@
             }
         };
         
-        proxy = AsmActor.create(TestInterface.class, service, createQueue());
+        proxy = ActorProxy.create(TestInterface.class, service, createQueue());
         String actual = proxy.returnString();
         assertNull(actual);
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java Thu Dec 17 20:21:04 2009
@@ -17,52 +17,44 @@
 import org.apache.activemq.dispatch.DispatcherConfig;
 import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatcher;
 
-import static org.apache.activemq.dispatch.DispatchOption.*;
-
-
-/** 
+/**
  * ActorTest
  * <p>
  * Description:
  * </p>
+ * 
  * @author cmacnaug
  * @version 1.0
  */
 public class ActorTest extends TestCase {
 
     
-    public static class ActorTestObject
-    {
-        @Message
-        public void actorInvocation(CountDownLatch latch)
-        {
-            latch.countDown();
-        }
-        
-        public void straightThrough(CountDownLatch latch)
-        {
+    interface TestObjectActor {
+        public void actorInvocation(CountDownLatch latch);
+    }
+    
+    public static class TestObject implements TestObjectActor {
+        public void actorInvocation(CountDownLatch latch) {
             latch.countDown();
         }
-        
     }
-    
-    public void testActorInvocation() throws Exception
-    {
+
+    public void testActorInvocation() throws Exception {
         Dispatcher advancedSystem = new AdvancedDispatcher(new DispatcherConfig());
         advancedSystem.retain();
-        
-        DispatchQueue queue = advancedSystem.createSerialQueue("test", STICK_TO_CALLER_THREAD);
-        ActorTestObject testObject = Actor.create(new ActorTestObject(), queue);
-        
+
+        DispatchQueue queue = advancedSystem.createSerialQueue("test");
+        TestObjectActor actor = ActorProxy.create(TestObjectActor.class, new TestObject(), queue);
+
         CountDownLatch latch = new CountDownLatch(1);
-        testObject.actorInvocation(latch);
+        actor.actorInvocation(latch);
         assertTrue(latch.await(1, TimeUnit.SECONDS));
-         
+
         queue.suspend();
         latch = new CountDownLatch(1);
-        testObject.actorInvocation(latch);
+        actor.actorInvocation(latch);
         assertFalse("Suspended Queue shouldn't invoked method", latch.await(2, TimeUnit.SECONDS));
-        
+
         queue.resume();
         assertTrue("Resumed Queue should invoke method", latch.await(2, TimeUnit.SECONDS));
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/IPizzaService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/IPizzaService.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/IPizzaService.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/IPizzaService.java Thu Dec 17 20:21:04 2009
@@ -17,7 +17,9 @@
 
 package org.apache.activemq.actor;
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 public interface IPizzaService {
-    @Message
     public void order(long count);
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/PizzaServiceCustomProxy.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/PizzaServiceCustomProxy.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/PizzaServiceCustomProxy.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/PizzaServiceCustomProxy.java Thu Dec 17 20:21:04 2009
@@ -19,6 +19,9 @@
 
 import org.apache.activemq.dispatch.DispatchQueue;
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 public class PizzaServiceCustomProxy implements IPizzaService {
     private final DispatchQueue queue;
     private final IPizzaService target;

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.dispatch.internal.BaseRetained;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.FlowControl;
+import org.apache.activemq.flow.Commands.Destination.DestinationBean;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBean;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
+import org.apache.activemq.queue.actor.transport.Transport;
+import org.apache.activemq.queue.actor.transport.TransportHandler;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class BaseConnection extends BaseRetained {
+
+    protected interface Protocol extends TransportHandler {
+        void start();
+        void shutdown(Runnable onShutdown);
+    }
+    
+    protected String name;
+    protected Dispatcher dispatcher;
+
+    protected DispatchQueue dispatchQueue;
+    protected Protocol actor;
+
+    
+    @Override
+    protected void startup() {
+        super.startup();
+        dispatchQueue = dispatcher.createSerialQueue(name);
+        createActor();
+        actor.start();
+    }
+
+    @Override
+    protected void shutdown() {
+        actor.shutdown(new Runnable() {
+            public void run() {
+                // notifies registered shutdown handlers 
+                BaseConnection.super.shutdown();
+            }
+        });
+    }
+    
+    public static class WindowController extends WindowLimiter {
+
+        private int maxSize;
+        private int processed;
+        private int creditsAt;
+        
+        public int processed(int count) {
+            int rc = 0;
+            processed += count;
+            if( processed >= creditsAt ) {
+                change(processed);
+                rc = processed;
+                processed = 0;
+            }
+            return rc;
+        }
+        
+        int maxSize(int newMaxSize) {
+            int change = newMaxSize-maxSize;
+            this.maxSize=newMaxSize;
+            this.creditsAt = maxSize/2;
+            change(change);
+            return change;
+        }
+        
+        int maxSize() {
+            return maxSize;
+        }
+
+    }
+    
+    public static class WindowLimiter {
+
+        private int opensAt = 1;
+        private int size;
+        private boolean closed;
+        
+        public WindowLimiter() {
+            this.closed = true;
+        }
+
+        int size() {
+            return size;
+        }
+        
+        WindowLimiter size(int size) {
+            this.size = size;
+            return this;
+        }
+        
+        public boolean isOpen() {
+            return !closed;
+        }
+        
+        public boolean isClosed() {
+            return closed;
+        }
+        
+        public void change(int change) {
+            size += change;
+            if( change > 0 && closed && size >= opensAt) {
+                closed = false;
+            } else if( change < 0 && !closed && size <= 0) {
+                closed = true;
+            }
+        }
+
+    }
+    
+    abstract protected void createActor();
+    
+    // The actor pattern ensures that this object is only accessed in
+    // serial execution context.  So synchronization is required.
+    // It also places a restriction that all operations should 
+    // avoid mutex contention and avoid blocking IO calls.
+    protected class ProtocolImpl implements Protocol {
+        
+        final protected WindowController inboundSessionWindow = new WindowController();
+        final protected WindowLimiter outboundSessionWindow = new WindowLimiter();
+        final protected WindowLimiter outboundTransportWindow = new WindowLimiter();
+
+        protected Transport transport;
+        protected Runnable onShutdown;
+        protected boolean disconnected;
+        protected Exception failure;
+
+        ProtocolImpl() {
+            outboundTransportWindow.size(100);
+        }
+        
+        public void start() {
+            
+            transport.setTargetQueue(dispatchQueue);
+            transport.setHandler(this);
+            transport.resume();
+        }
+        
+        public void shutdown(Runnable onShutdown) {
+            if( disconnected ) {
+                onShutdown.run();
+            } else {
+                this.onShutdown = onShutdown;
+                transport.release();
+            }
+        }
+
+        public void onConnect() {
+            sendFlowControl(inboundSessionWindow.maxSize(1000));
+        }
+        
+        public void onDisconnect() {
+            disconnected = true;
+            if( onShutdown!=null ) {
+                shutdown(onShutdown);
+                onShutdown=null;
+            }
+        }
+
+        public void onFailure(Exception failure) {
+            failure.printStackTrace();
+            this.failure = failure;
+        }
+
+        public void onRecevie(Object command) {
+            if (command.getClass() == Message.class) {
+                // We should not be getting messages
+                // when the window is closed..
+                if( inboundSessionWindow.isClosed() ) {
+                    onFailure(new Exception("Session overrun: " + command));
+                }
+                outboundSessionWindow.change(-1);
+                onReceiveMessage((Message) command);
+            } else if (command.getClass() == FlowControlBean.class || command.getClass() == FlowControlBuffer.class) {
+                onReceiveFlowControl((FlowControl) command);
+            } else if (command.getClass() == String.class) {
+                onReceiveString((String)command);
+            } else if (command.getClass() == DestinationBuffer.class || command.getClass() == DestinationBean.class) {
+                onReceiveDestination((Destination)command);
+            } else {
+                onFailure(new Exception("Unrecognized command: " + command));
+            }
+        }
+
+        public void sessionSend(Message message) {
+            transportSend(message);
+        }
+        
+        protected void onReceiveDestination(Destination command) {
+        }
+
+        protected void onReceiveString(String command) {
+        }
+
+        protected void onReceiveMessage(Message msg) {
+            sendFlowControl(inboundSessionWindow.processed(1));
+        }
+
+        private void sendFlowControl(int credits) {
+            if( credits!=0 ) {
+                FlowControlBean fc = new FlowControlBean();
+                fc.setCredit(credits);
+                transportSend(fc);
+            }
+        }
+        
+        public void transportSend(Object message) {
+            outboundTransportWindow.change(-1);
+            transport.send(message, onSendCompleted, dispatchQueue);
+        }
+        
+        private final Runnable onSendCompleted = new Runnable() {
+            public void run() {
+                boolean wasClosed = outboundTransportWindow.isClosed();
+                outboundTransportWindow.change(1);
+                if( !wasClosed && !isSessionSendBlocked() ) {
+                    onSessionResume();
+                }
+            }
+        };
+        
+        protected void onReceiveFlowControl(FlowControl command) {
+            boolean wasClosed = outboundSessionWindow.isClosed();
+            outboundSessionWindow.change(command.getCredit());
+            if( wasClosed && !isSessionSendBlocked() ) {
+                onSessionResume();
+            }
+
+        }
+
+        protected boolean isSessionSendBlocked() {
+            return outboundTransportWindow.isClosed() || outboundSessionWindow.isClosed(); 
+        }
+
+        protected void onSessionResume() {
+        }
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public Dispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setDispatcher(Dispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+    
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import java.util.LinkedList;
+
+import org.apache.activemq.actor.ActorProxy;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.queue.actor.transport.Transport;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class BrokerConnection extends BaseConnection implements DeliveryTarget {
+    
+    interface BrokerProtocol extends Protocol {
+        public void onBrokerDispatch(Message msg, Runnable r);
+    }
+    
+    public static class DispatchRequest {
+
+        private final Message message;
+        private final Runnable onComplete;
+
+        public DispatchRequest(Message message, Runnable onComplete) {
+            this.message = message;
+            this.onComplete = onComplete;
+        }
+        
+    }
+    
+    private MockBroker broker;
+    private BrokerProtocol brokerActor;
+    private Transport transport;
+    private int priorityLevels;
+
+    protected void createActor() {
+        actor = brokerActor = ActorProxy.create(BrokerProtocol.class, new BrokerProtocolImpl(), dispatchQueue);
+    }
+
+    protected class BrokerProtocolImpl extends ProtocolImpl implements BrokerProtocol {
+
+        String name;
+        
+        @Override
+        public void start() {
+            this.transport = BrokerConnection.this.transport;
+            super.start();
+        }
+        
+        // TODO: to increase fairness: we might want to have a pendingQueue per sender
+        final LinkedList<DispatchRequest> pendingQueue = new LinkedList<DispatchRequest>(); 
+        
+        @Override
+        protected void onReceiveString(String remoteName) {
+            name = "broker->"+remoteName;
+        }
+        
+        @Override
+        protected void onReceiveMessage(final Message msg) {
+            // We don't dish out flow control credit until the broker
+            // lets us know that the message routing completed.
+            // In the slow consumer case, it could take a while for him
+            // to complete the routing and we don't want to have th producer
+            // send us more messages than the max session protocol window
+            // is configured with.
+            broker.router.route(msg, dispatchQueue, new Runnable() {
+                public void run() {
+                    BrokerProtocolImpl.super.onReceiveMessage(msg);
+                }
+            });
+        }
+        
+        @Override
+        protected void onReceiveDestination(Destination destination) {
+            broker.subscribe(destination, BrokerConnection.this);
+        }
+
+        public void onBrokerDispatch(Message message, Runnable onComplete) {
+            if( !isSessionSendBlocked() ) {
+                sessionSend(message);
+                onComplete.run();
+            } else {
+                pendingQueue.add(new DispatchRequest(message, onComplete));
+            }
+        }
+        
+        @Override
+        protected void onSessionResume() {
+            while( !isSessionSendBlocked() ) {
+                DispatchRequest request = pendingQueue.poll();
+                if( request==null ) {
+                    return;
+                }
+                sessionSend(request.message);
+                request.onComplete.run();
+            }
+        }
+        
+    }
+
+    public void add(Message msg, Runnable r) {
+        brokerActor.onBrokerDispatch(msg, r);
+    }
+
+    public boolean hasSelector() {
+        return false;
+    }
+
+    public boolean match(Message message) {
+        return true;
+    }
+
+    public MockBroker getBroker() {
+        return broker;
+    }
+
+    public void setBroker(MockBroker broker) {
+        this.broker = broker;
+    }
+
+    public Transport getTransport() {
+        return transport;
+    }
+
+    public void setTransport(Transport transport) {
+        this.transport = transport;
+    }
+
+    public int getPriorityLevels() {
+        return priorityLevels;
+    }
+
+    public void setPriorityLevels(int priorityLevels) {
+        this.priorityLevels = priorityLevels;
+    }
+    
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import org.apache.activemq.actor.ActorProxy;
+import org.apache.activemq.queue.actor.transport.TransportFactorySystem;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ClientConnection extends BaseConnection {
+    
+    protected String connectUri;
+    
+    public void setConnectUri(String uri) {
+        this.connectUri = uri;
+    }
+
+    protected void createActor() {
+        actor = ActorProxy.create(Protocol.class, new ClientProtocolImpl(), dispatchQueue);
+    }
+
+    protected class ClientProtocolImpl extends ProtocolImpl  {
+        
+        @Override
+        public void start()  {
+            transport = TransportFactorySystem.connect(dispatcher, connectUri);
+            super.start();
+        }
+
+        public void onConnect() {
+            super.onConnect();
+            super.transportSend(name);
+        }
+        
+    }
+    
+
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.actor.ActorProxy;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ConsumerConnection extends ClientConnection {
+    
+    private MetricAggregator totalConsumerRate;
+    private long thinkTime;
+    private Destination destination;
+    private String selector;
+    private boolean schedualWait = true;
+    private final MetricCounter rate = new MetricCounter();
+
+    protected void createActor() {
+        actor = ActorProxy.create(Protocol.class, new ProducerProtocolImpl(), dispatchQueue);
+    }
+
+    class ProducerProtocolImpl extends ClientProtocolImpl {
+
+        @Override
+        public void start() {
+            rate.name("Consumer " + name + " Rate");
+            totalConsumerRate.add(rate);
+            super.start();
+        }
+        
+        @Override
+        public void onConnect() {
+            super.onConnect();
+            transportSend(destination);
+        }
+
+        @Override
+        protected void onReceiveMessage(final Message msg) {
+            if (thinkTime > 0) {
+                dispatchQueue.dispatchAfter(new Runnable() {
+                    public void run() {
+                        rate.increment();
+                        ProducerProtocolImpl.super.onReceiveMessage(msg);
+                    }
+                }, thinkTime, TimeUnit.MILLISECONDS);
+
+            } else {
+                rate.increment();
+                super.onReceiveMessage(msg);
+            }
+        }
+        
+        
+    }
+
+    public MetricAggregator getTotalConsumerRate() {
+        return totalConsumerRate;
+    }
+
+    public void setTotalConsumerRate(MetricAggregator totalConsumerRate) {
+        this.totalConsumerRate = totalConsumerRate;
+    }
+
+    public long getThinkTime() {
+        return thinkTime;
+    }
+
+    public void setThinkTime(long thinkTime) {
+        this.thinkTime = thinkTime;
+    }
+
+    public Destination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(Destination destination) {
+        this.destination = destination;
+    }
+
+    public String getSelector() {
+        return selector;
+    }
+
+    public void setSelector(String selector) {
+        this.selector = selector;
+    }
+
+    public boolean isSchedualWait() {
+        return schedualWait;
+    }
+
+    public void setSchedualWait(boolean schedualWait) {
+        this.schedualWait = schedualWait;
+    }
+
+    public MetricCounter getRate() {
+        return rate;
+    }
+    
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DeliveryTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DeliveryTarget.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DeliveryTarget.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DeliveryTarget.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.queue.actor.perf;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface DeliveryTarget {
+
+    /**
+     * @return true if this sub has a selector
+     */
+    public boolean hasSelector();
+    
+    public boolean match(Message message);
+
+    public void add(Message msg, Runnable r);
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Message.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Message.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Message.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Message.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import java.io.Serializable;
+
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.Message.MessageBean;
+import org.apache.activemq.flow.Commands.Message.MessageBuffer;
+import org.apache.activemq.util.Mapper;
+import org.apache.activemq.util.buffer.UTF8Buffer;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Message implements Serializable {
+
+    private static final long serialVersionUID = 6759761889075451996L;
+
+    public static final Mapper<Integer, Message> PRIORITY_MAPPER = new Mapper<Integer, Message>() {
+        public Integer map(Message element) {
+            return element.getPriority();
+        }
+    };
+
+    public static final int MAX_USER_PRIORITY = 10;
+    public static final int MAX_PRIORITY = MAX_USER_PRIORITY + 1;
+    public static final int SYSTEM_PRIORITY = MAX_PRIORITY;
+
+    public static final short TYPE_NORMAL = 0;
+    public static final short TYPE_FLOW_CONTROL = 1;
+    public static final short TYPE_FLOW_OPEN = 2;
+    public static final short TYPE_FLOW_CLOSE = 3;
+
+    transient Flow flow;
+    private MessageBuffer message;
+
+    Message(long msgId, int producerId, String msg, Flow flow, Destination dest, int priority) {
+        MessageBean message = new MessageBean();
+        message.setMsgId(msgId);
+        message.setProducerId(producerId);
+        message.setMsg(new UTF8Buffer(msg));
+        message.setDest(dest);
+        message.setPriority(priority);
+        this.message = message.freeze();
+        this.flow = flow;
+    }
+
+    Message(Message m) {
+        this.message = m.message;
+        this.flow = m.flow;
+    }
+
+    public Message(MessageBuffer m) {
+        this.message=m;
+    }
+
+    public short type() {
+        return TYPE_NORMAL;
+    }
+
+    public void setProperty(String matchProp) {
+        message = message.copy().addProperty(matchProp).freeze();
+    }
+
+    public boolean match(String matchProp) {
+        if (!message.hasProperty()) {
+            return false;
+        }
+        return message.getPropertyList().contains(matchProp);
+    }
+
+    public boolean isSystem() {
+        return false;
+    }
+
+    public void incrementHopCount() {
+        message = message.copy().setHopCount(message.getHopCount()).freeze();
+    }
+
+    public final int getHopCount() {
+        return message.getHopCount();
+    }
+
+    public final Destination getDestination() {
+        return message.getDest();
+    }
+
+    public Flow getFlow() {
+        return flow;
+    }
+
+    public int getFlowLimiterSize() {
+        return 1;
+    }
+
+    public int getPriority() {
+        return message.getPriority();
+    }
+
+    public String toString() {
+        return "Message: " + message.getMsg() + " flow + " + flow + " dest: " + message.getDest();
+    }
+
+    public long getMsgId() {
+        return message.getMsgId();
+    }
+
+    public int getProducerId() {
+        return message.getProducerId();
+    }
+
+    public MessageBuffer getProto() {
+        return message;
+    }
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MessageGenerator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MessageGenerator.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MessageGenerator.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MessageGenerator.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface MessageGenerator {
+    interface MessageReadyListener {
+        public void onMessageReady(Message m);
+    }
+
+    public void addMessageReadyListener(MessageReadyListener listener);
+
+    public Message pollMessage();
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.dispatch.DispatcherConfig;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.queue.actor.transport.Transport;
+import org.apache.activemq.queue.actor.transport.TransportFactorySystem;
+import org.apache.activemq.queue.actor.transport.TransportServer;
+import org.apache.activemq.queue.actor.transport.TransportServerHandler;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class MockBroker implements TransportServerHandler {
+
+    final Router router = new Router();
+
+    final ArrayList<BrokerConnection> connections = new ArrayList<BrokerConnection>();
+    final ArrayList<BrokerConnection> brokerConnections = new ArrayList<BrokerConnection>();
+    final HashMap<Destination, MockQueue> queues = new HashMap<Destination, MockQueue>();
+
+    private TransportServer transportServer;
+    private String uri;
+    private String name;
+    protected Dispatcher dispatcher;
+    private final AtomicBoolean stopping = new AtomicBoolean();
+    private boolean useInputQueues = false;
+
+    private DispatchQueue brokerDispatchQueue;
+
+    public boolean isUseInputQueues() {
+        return useInputQueues;
+    }
+
+    public void setUseInputQueues(boolean useInputQueues) {
+        this.useInputQueues = useInputQueues;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void subscribe(Destination destination, DeliveryTarget deliveryTarget) {
+        if (destination.getPtp()) {
+            queues.get(destination).addConsumer(deliveryTarget);
+        } else {
+            router.bind(deliveryTarget, destination);
+        }
+    }
+
+    public void addQueue(MockQueue queue) {
+        router.bind(queue, queue.getDestination());
+        queues.put(queue.getDestination(), queue);
+    }
+
+    public final void stopServices() throws Exception {
+        stopping.set(true);
+        transportServer.release();
+
+        for (BrokerConnection connection : connections) {
+            connection.release();
+        }
+        for (BrokerConnection connection : brokerConnections) {
+            connection.release();
+        }
+        for (MockQueue queue : queues.values()) {
+            queue.stop();
+        }
+    }
+
+    public final void startServices() throws Exception {
+        brokerDispatchQueue = dispatcher.createSerialQueue("broker");
+        transportServer = TransportFactorySystem.bind(dispatcher, uri);
+        transportServer.setTargetQueue(brokerDispatchQueue);
+        transportServer.setHandler(this);
+        transportServer.resume();
+
+        for (MockQueue queue : queues.values()) {
+            queue.start();
+        }
+
+        for (BrokerConnection connection : brokerConnections) {
+            connection.retain();
+        }
+    }
+
+    public void onBind() {
+    }
+    
+    public void onUnbind() {
+    }
+    
+    public void onAccept(final Transport transport) {
+        BrokerConnection connection = new BrokerConnection();
+        connection.setBroker(this);
+        connection.setTransport(transport);
+        connection.setPriorityLevels(MockBrokerTest.PRIORITY_LEVELS);
+        connection.setDispatcher(dispatcher);
+        connections.add(connection);
+        try {
+            connection.retain();
+        } catch (Exception e1) {
+            onFailure(e1);
+        }
+    }
+
+    public void onFailure(Exception error) {
+        System.out.println("Accept error: " + error);
+        error.printStackTrace();
+    }
+
+    public Dispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public void setDispatcher(Dispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public String getUri() {
+        return uri;
+    }
+
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+
+    public String getConnectURI() {
+        return transportServer.getConnectURI();
+    }
+
+    public boolean isStopping() {
+        return stopping.get();
+    }
+
+    protected void createDispatcher() {
+        if (dispatcher == null) {
+            dispatcher = DispatcherConfig.create("mock-broker", Runtime.getRuntime().availableProcessors());
+        }
+    }
+
+    /**
+     * Run the broker as a standalone app
+     * 
+     * @param args
+     *            The arguments.
+     */
+    public static void main(String[] args) {
+        if (args.length < 1) {
+            System.err.println("Must supply a bind uri");
+        }
+        String uri = args[0];
+
+        final MockBroker broker = new MockBroker();
+        broker.setUri(uri);
+        broker.setName("Broker");
+        broker.createDispatcher();
+        try {
+            broker.getDispatcher().retain();
+            broker.startServices();
+        } catch (Exception e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            public void run() {
+                try {
+                    broker.stopServices();
+                } catch (Exception e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+            }
+        });
+    }
+
+    public void onAccept(org.apache.activemq.transport.Transport transport) {
+    }
+
+}
\ No newline at end of file



Mime
View raw message