activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r892348 - in /activemq/sandbox/activemq-apollo-actor: activemq-dispatcher/src/main/java/org/apache/activemq/actor/ activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/ activemq-protobuf/activemq-protobuf/src/main/java/or...
Date Fri, 18 Dec 2009 18:58:35 GMT
Author: chirino
Date: Fri Dec 18 18:58:35 2009
New Revision: 892348

URL: http://svn.apache.org/viewvc?rev=892348&view=rev
Log:
Fixing up flow control.. still needs tweaking tho.


Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-protobuf/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/IntrospectionSupport.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableSupport.java
    activemq/sandbox/activemq-apollo-actor/activemq-protobuf/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/CommandLineSupport.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Message.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Router.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowController.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowLimiter.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java?rev=892348&r1=892347&r2=892348&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java
Fri Dec 18 18:58:35 2009
@@ -81,7 +81,7 @@
             // Define all the runnable classes used for each method.
             Method[] methods = interfaceClass.getMethods();
             for (int index = 0; index < methods.length; index++) {
-                String name = runnable(index).replace('/', '.');
+                String name = runnable(index, methods[index]).replace('/', '.');
                 byte[] clazzBytes = dumpRunnable(index, methods[index]);
                 defineClass(name, clazzBytes);
             }
@@ -170,7 +170,7 @@
                         mv.visitLabel(start);
                         mv.visitVarInsn(ALOAD, 0);
                         mv.visitFieldInsn(GETFIELD, proxyName, "queue", sig(DISPATCH_QUEUE));
-                        mv.visitTypeInsn(NEW, runnable(index));
+                        mv.visitTypeInsn(NEW, runnable(index, methods[index]));
                         mv.visitInsn(DUP);
                         mv.visitVarInsn(ALOAD, 0);
                         mv.visitFieldInsn(GETFIELD, proxyName, "target", sig(interfaceName));
@@ -179,7 +179,7 @@
                             mv.visitVarInsn(types[i].getOpcode(ILOAD), 1+i);
                         }
                         
-                        mv.visitMethodInsn(INVOKESPECIAL, runnable(index), "<init>",
"(" + sig(interfaceName) + sig(params) +")V");
+                        mv.visitMethodInsn(INVOKESPECIAL, runnable(index, methods[index]),
"<init>", "(" + sig(interfaceName) + sig(params) +")V");
                         mv.visitMethodInsn(INVOKEINTERFACE, DISPATCH_QUEUE, "dispatchAsync",
"(" + sig(RUNNABLE) + ")V");
                         
                         Type returnType = Type.getType(method.getReturnType());
@@ -239,7 +239,8 @@
             Label start, end;
     
             // example: final class OrderRunnable implements Runnable
-            cw.visit(V1_4, ACC_FINAL + ACC_SUPER, runnable(index), null, OBJECT_CLASS, new
String[] { RUNNABLE });
+            String runnableClassName = runnable(index, method);
+            cw.visit(V1_4, ACC_FINAL + ACC_SUPER, runnableClassName, null, OBJECT_CLASS,
new String[] { RUNNABLE });
             {
     
                 // example: private final IPizzaService target;
@@ -271,7 +272,7 @@
                     // example: this.target = target;
                     mv.visitVarInsn(ALOAD, 0);
                     mv.visitVarInsn(ALOAD, 1);
-                    mv.visitFieldInsn(PUTFIELD, runnable(index), "target", sig(interfaceName));
+                    mv.visitFieldInsn(PUTFIELD, runnableClassName, "target", sig(interfaceName));
                     
                     // example: this.count = count;
                     for (int i = 0; i < params.length; i++) {
@@ -279,7 +280,7 @@
                         // TODO: figure out how to do the right loads. it varies with the
type.
                         mv.visitVarInsn(ALOAD, 0);
                         mv.visitVarInsn(types[i].getOpcode(ILOAD), 2+i);
-                        mv.visitFieldInsn(PUTFIELD, runnable(index), "param"+i, sig(params[i]));
+                        mv.visitFieldInsn(PUTFIELD, runnableClassName, "param"+i, sig(params[i]));
                         
                     }
                     
@@ -288,7 +289,7 @@
                     
                     end = new Label();
                     mv.visitLabel(end);
-                    mv.visitLocalVariable("this", sig(runnable(index)), null, start, end,
0);
+                    mv.visitLocalVariable("this", sig(runnableClassName), null, start, end,
0);
                     mv.visitLocalVariable("target", sig(interfaceName), null, start, end,
1);
                     
                     for (int i = 0; i < params.length; i++) {
@@ -307,11 +308,11 @@
                     start = new Label();
                     mv.visitLabel(start);
                     mv.visitVarInsn(ALOAD, 0);
-                    mv.visitFieldInsn(GETFIELD, runnable(index), "target", sig(interfaceName));
+                    mv.visitFieldInsn(GETFIELD, runnableClassName, "target", sig(interfaceName));
                     
                     for (int i = 0; i < params.length; i++) {
                         mv.visitVarInsn(ALOAD, 0);
-                        mv.visitFieldInsn(GETFIELD, runnable(index), "param"+i, sig(params[i]));
+                        mv.visitFieldInsn(GETFIELD, runnableClassName, "param"+i, sig(params[i]));
                     }
                     
                     String methodSig = Type.getMethodDescriptor(method);
@@ -327,7 +328,7 @@
             
                     end = new Label();
                     mv.visitLabel(end);
-                    mv.visitLocalVariable("this", sig(runnable(index)), null, start, end,
0);
+                    mv.visitLocalVariable("this", sig(runnableClassName), null, start, end,
0);
                     mv.visitMaxs(0, 0);
                 }
                 mv.visitEnd();
@@ -350,8 +351,8 @@
             return Type.getDescriptor(clazz);
         }
     
-        private String runnable(int index) {
-            return proxyName+"$method"+index;
+        private String runnable(int index, Method method) {
+            return proxyName+"$"+index+"$"+method.getName();
         }
     
         private String sig(String name) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableSupport.java?rev=892348&r1=892347&r2=892348&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableSupport.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableSupport.java
Fri Dec 18 18:58:35 2009
@@ -29,6 +29,9 @@
     private static Runnable NO_OP = new Runnable() {
         public void run() {
         }
+        public String toString() {
+            return "{}";
+        };
     };
     
     public static Runnable runNoop() {
@@ -49,6 +52,9 @@
                     runnable.run();
                 }
             }
+            public String toString() {
+                return "{"+runnable+"}";
+            };
         };
     }
     
@@ -66,6 +72,9 @@
                     runnable.run();
                 }
             }
+            public String toString() {
+                return "{"+runnable+"}";
+            };
         };
     }
     
@@ -80,6 +89,9 @@
                     queue.dispatchAsync(runnable);
                 }
             }
+            public String toString() {
+                return "{"+runnable+"}";
+            };
         };
     }
     
@@ -94,6 +106,9 @@
                     queue.dispatchAsync(runnable);
                 }
             }
+            public String toString() {
+                return "{"+runnable.toString()+"}";
+            };
         };
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-protobuf/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/CommandLineSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-protobuf/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/CommandLineSupport.java?rev=892348&r1=892347&r2=892348&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-protobuf/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/CommandLineSupport.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-protobuf/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/CommandLineSupport.java
Fri Dec 18 18:58:35 2009
@@ -19,6 +19,8 @@
 
 import java.util.ArrayList;
 
+import org.apache.activemq.util.IntrospectionSupport;
+
 /**
  * Support utility that can be used to set the properties on any object
  * using command line arguments.

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java?rev=892348&r1=892347&r2=892348&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java
Fri Dec 18 18:58:35 2009
@@ -76,7 +76,7 @@
         protected Exception failure;
 
         ConnectionState() {
-            outboundTransportWindow.size(100);
+            outboundTransportWindow.size(500).opensAt(499);
         }
         
         public void onStart() {
@@ -127,10 +127,6 @@
             }
         }
 
-        public void sessionSend(Message message) {
-            transportSend(message);
-        }
-        
         protected void onReceiveDestination(Destination command) {
         }
 
@@ -149,16 +145,22 @@
             }
         }
         
+        public void sessionSend(Message message) {
+            outboundSessionWindow.change(-1);
+            transportSend(message);
+        }
+        
         public void transportSend(Object message) {
-            outboundTransportWindow.change(-1);
-            transport.send(message, onSendCompleted, dispatchQueue);
+//            outboundTransportWindow.change(-1);
+//            transport.send(message, onSendCompleted, dispatchQueue);
+            transport.send(message);
         }
         
         private final Runnable onSendCompleted = new Runnable() {
             public void run() {
                 boolean wasClosed = outboundTransportWindow.isClosed();
                 outboundTransportWindow.change(1);
-                if( !wasClosed && !isSessionSendBlocked() ) {
+                if( wasClosed && !isSessionSendBlocked() ) {
                     onSessionResume();
                 }
             }
@@ -167,6 +169,7 @@
         protected void onReceiveFlowControl(FlowControl command) {
             boolean wasClosed = outboundSessionWindow.isClosed();
             outboundSessionWindow.change(command.getCredit());
+            
             if( wasClosed && !isSessionSendBlocked() ) {
                 onSessionResume();
             }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java?rev=892348&r1=892347&r2=892348&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java
Fri Dec 18 18:58:35 2009
@@ -20,7 +20,9 @@
 
 import org.apache.activemq.actor.ActorProxy;
 import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.FlowControl;
 import org.apache.activemq.queue.actor.transport.Transport;
+import org.apache.activemq.util.IntrospectionSupport;
 
 /**
  * 
@@ -42,6 +44,10 @@
             this.onComplete = onComplete;
         }
         
+        @Override
+        public String toString() {
+            return IntrospectionSupport.toString(this);
+        }
     }
     
     private MockBroker broker;
@@ -65,6 +71,7 @@
         
         // TODO: to increase fairness: we might want to have a pendingQueue per sender
         final LinkedList<DispatchRequest> pendingQueue = new LinkedList<DispatchRequest>();

+        final LinkedList<Runnable> dispatchedQueue = new LinkedList<Runnable>();

         
         @Override
         protected void onReceiveString(String remoteName) {
@@ -79,10 +86,16 @@
             // to complete the routing and we don't want to have th producer
             // send us more messages than the max session protocol window
             // is configured with.
-            broker.router.route(msg, dispatchQueue, new Runnable() {
+            broker.router.route(msg, 
+                    dispatchQueue, new Runnable() {
                 public void run() {
                     BrokerConnectionState.super.onReceiveMessage(msg);
                 }
+                
+                @Override
+                public String toString() {
+                    return IntrospectionSupport.toString(BrokerConnectionState.this, "name",
"inboundSessionWindow");
+                }
             });
         }
         
@@ -92,23 +105,35 @@
         }
 
         public void onBrokerDispatch(Message message, Runnable onComplete) {
-            if( !isSessionSendBlocked() ) {
-                sessionSend(message);
-                onComplete.run();
-            } else {
-                pendingQueue.add(new DispatchRequest(message, onComplete));
-            }
+            pendingQueue.add(new DispatchRequest(message, onComplete));
+            dispatchPendingQueue();
         }
         
         @Override
         protected void onSessionResume() {
+            dispatchPendingQueue();
+        }
+
+        private void dispatchPendingQueue() {
             while( !isSessionSendBlocked() ) {
                 DispatchRequest request = pendingQueue.poll();
                 if( request==null ) {
                     return;
                 }
+                dispatchedQueue.add(request.onComplete);
                 sessionSend(request.message);
-                request.onComplete.run();
+            }
+        }
+     
+        @Override
+        protected void onReceiveFlowControl(FlowControl command) {
+            super.onReceiveFlowControl(command);
+            int credit = command.getCredit();
+            for( int i=0; i < credit; i++) {
+                Runnable onComplete = dispatchedQueue.poll();
+                if( onComplete!=null ) {
+                    onComplete.run();
+                }
             }
         }
         

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java?rev=892348&r1=892347&r2=892348&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java
Fri Dec 18 18:58:35 2009
@@ -30,10 +30,9 @@
 public class ConsumerConnection extends ClientConnection {
     
     private MetricAggregator totalConsumerRate;
-    private long thinkTime;
+    volatile private long thinkTime;
     private Destination destination;
     private String selector;
-    private boolean schedualWait = true;
     private final MetricCounter rate = new MetricCounter();
 
     protected void createActor() {
@@ -106,14 +105,6 @@
         this.selector = selector;
     }
 
-    public boolean isSchedualWait() {
-        return schedualWait;
-    }
-
-    public void setSchedualWait(boolean schedualWait) {
-        this.schedualWait = schedualWait;
-    }
-
     public MetricCounter getRate() {
         return rate;
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Message.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Message.java?rev=892348&r1=892347&r2=892348&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Message.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Message.java
Fri Dec 18 18:58:35 2009
@@ -22,6 +22,7 @@
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.Commands.Message.MessageBean;
 import org.apache.activemq.flow.Commands.Message.MessageBuffer;
+import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.buffer.UTF8Buffer;
 
@@ -115,7 +116,7 @@
     }
 
     public String toString() {
-        return "Message: " + message.getMsg() + " flow + " + flow + " dest: " + message.getDest();
+        return IntrospectionSupport.toString(this);
     }
 
     public long getMsgId() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java?rev=892348&r1=892347&r2=892348&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java
Fri Dec 18 18:58:35 2009
@@ -165,10 +165,17 @@
         client.setDestCount(1);
         client.setNumConsumers(1);
 
-        createConnections("test_1_1_1", 1);
+        createConnections("test_10_1_1", 1);
+//        setProducerThinkTime(1);
         runTestCase();
     }
 
+    private void setProducerThinkTime(int thinkTime) {
+        for( int i=0; i < client.getNumProducers(); i++ ) {
+            client.producer(i).setThinkTime(thinkTime);
+        }
+    }
+
     @Test
     public void test_1_1_10() throws Exception {
         client = new MockClient();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java?rev=892348&r1=892347&r2=892348&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java
Fri Dec 18 18:58:35 2009
@@ -230,6 +230,7 @@
         // Start 'em up.
         startServices();
         try {
+//            Thread.sleep(1000*1000*1000);
             reportRates();
         } finally {
             stopServices();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java?rev=892348&r1=892347&r2=892348&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java
Fri Dec 18 18:58:35 2009
@@ -23,6 +23,8 @@
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.MetricCounter;
 
+import static java.util.concurrent.TimeUnit.*;
+
 /**
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -38,16 +40,20 @@
     private int payloadSize = 0;
     private final MetricCounter rate = new MetricCounter();
     private AtomicLong messageIdGenerator;
+    volatile private long thinkTime;
 
     protected void createActor() {
         actor = ActorProxy.create(ConnectionStateActor.class, new ProducerConnectionState(),
dispatchQueue);
     }
+    
+    private static final long MATCH_WINDOW = MILLISECONDS.toNanos(100);
 
     class ProducerConnectionState extends ClientConnectionState {
         
         private String filler;
         private int payloadCounter;
         private boolean stopped;
+        private boolean schedualed;
 
         @Override
         public void onStart() {
@@ -80,20 +86,39 @@
             super.onStop();
         }
         
+        
         private void produceMessages() {
-            while( !isSessionSendBlocked() && !stopped ) {
-                int p = priority;
-                if (priorityMod > 0) {
-                    p = payloadCounter % priorityMod == 0 ? 0 : p;
+            while( !isSessionSendBlocked() && !stopped && !schedualed ) {
+                sendMessage();
+                if( thinkTime > 0 ) {
+                    schedualNextSend();
+                    return;
                 }
+            }
+        }
 
-                Message next = new Message(messageIdGenerator.incrementAndGet(), producerId,
createPayload(), null, destination, p);
-                if (property != null) {
-                    next.setProperty(property);
+        private void schedualNextSend() {
+            schedualed=true;
+            dispatchQueue.dispatchAfter(new Runnable() {
+                public void run() {
+                   schedualed = false;
+                   produceMessages(); 
                 }
-                sessionSend(next);
-                rate.increment();
+            }, thinkTime, MILLISECONDS);
+        }
+
+        private void sendMessage() {
+            int p = priority;
+            if (priorityMod > 0) {
+                p = payloadCounter % priorityMod == 0 ? 0 : p;
             }
+
+            Message next = new Message(messageIdGenerator.incrementAndGet(), producerId,
createPayload(), null, destination, p);
+            if (property != null) {
+                next.setProperty(property);
+            }
+            sessionSend(next);
+            rate.increment();
         }
         
         private String createPayload() {
@@ -184,6 +209,14 @@
     public void setMessageIdGenerator(AtomicLong messageIdGenerator) {
         this.messageIdGenerator = messageIdGenerator;
     }
+
+    public long getThinkTime() {
+        return thinkTime;
+    }
+
+    public void setThinkTime(long thinkTime) {
+        this.thinkTime = thinkTime;
+    }
     
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Router.java?rev=892348&r1=892347&r2=892348&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Router.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Router.java
Fri Dec 18 18:58:35 2009
@@ -33,8 +33,10 @@
     final void route(Message msg, DispatchQueue queue, Runnable onRouteCompleted) {
         AsciiBuffer key = msg.getDestination().getName();
         Collection<DeliveryTarget> targets = lookupTable.get(key);
-        if( targets == null ) 
+        if( targets == null )  {
+            onRouteCompleted.run();
             return;
+        }
         
         Runnable r = runOnceAfter(queue, onRouteCompleted, targets.size());
         for (DeliveryTarget dt : targets) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowController.java?rev=892348&r1=892347&r2=892348&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowController.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowController.java
Fri Dec 18 18:58:35 2009
@@ -17,6 +17,8 @@
 
 package org.apache.activemq.queue.actor.perf;
 
+import org.apache.activemq.util.IntrospectionSupport;
+
 /**
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -49,5 +51,10 @@
     int maxSize() {
         return maxSize;
     }
+    
+    @Override
+    public String toString() {
+        return IntrospectionSupport.toString(this);
+    }
 
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowLimiter.java?rev=892348&r1=892347&r2=892348&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowLimiter.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowLimiter.java
Fri Dec 18 18:58:35 2009
@@ -17,6 +17,8 @@
 
 package org.apache.activemq.queue.actor.perf;
 
+import org.apache.activemq.util.IntrospectionSupport;
+
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
@@ -29,6 +31,16 @@
     public WindowLimiter() {
         this.closed = true;
     }
+    
+    int opensAt() {
+        return opensAt;
+    }
+    
+    WindowLimiter opensAt(int opensAt) {
+        this.opensAt = opensAt;
+        closed = size < opensAt ;
+        return this;
+    }
 
     int size() {
         return size;
@@ -36,6 +48,7 @@
     
     WindowLimiter size(int size) {
         this.size = size;
+        closed = size < opensAt ;
         return this;
     }
     
@@ -56,4 +69,9 @@
         }
     }
 
+    @Override
+    public String toString() {
+        return IntrospectionSupport.toString(this);
+    }
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java?rev=892348&r1=892347&r2=892348&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
(original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
Fri Dec 18 18:58:35 2009
@@ -311,7 +311,7 @@
                     buffer.append("  ");
                     buffer.append(entry.getKey());
                     buffer.append(": ");
-                    buffer.append(entry.getValue());
+                    buffer.append(StringSupport.indent(entry.getValue(), 2));
                 }
                 buffer.append("\n}");
             } else {
@@ -326,7 +326,7 @@
                     buffer.append(entry.getKey());
                     buffer.append(": ");
                     String value = entry.getValue();
-                    buffer.append(StringSupport.indent(value, 2));
+                    buffer.append(value);
                 }
                 buffer.append("}");
             }
@@ -356,7 +356,7 @@
         Field[] fields = startClass.getDeclaredFields();
         for (int i = 0; i < fields.length; i++) {
             Field field = fields[i];
-            if (Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())
) {
+            if (Modifier.isStatic(field.getModifiers())) {
                 continue;
             }
 



Mime
View raw message