Author: chirino
Date: Thu Dec 17 20:21:04 2009
New Revision: 891866
URL: http://svn.apache.org/viewvc?rev=891866&view=rev
Log:
adding a broker use case perf benchmark to exersise actor thread model.
Added:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java
- copied, changed from r891451, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/AsmActor.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableSupport.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorProxyTest.java
- copied, changed from r891451, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DeliveryTarget.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Message.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MessageGenerator.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Router.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/Transport.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactory.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportFactorySystem.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportHandler.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServer.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/TransportServerHandler.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/Actor.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/AsmActor.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/Message.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableCountDownLatch.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/IPizzaService.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/PizzaServiceCustomProxy.java
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java (from r891451, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/AsmActor.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/AsmActor.java&r1=891451&r2=891866&rev=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/AsmActor.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/actor/ActorProxy.java Thu Dec 17 20:21:04 2009
@@ -16,7 +16,7 @@
import static org.objectweb.asm.ClassWriter.*;
-public class AsmActor implements Opcodes {
+public class ActorProxy {
public static <T> T create(Class<T> interfaceClass, T target, DispatchQueue queue) throws IllegalArgumentException {
return create(target.getClass().getClassLoader(), interfaceClass, target, queue);
@@ -46,10 +46,10 @@
}
static private String proxyName(Class<?> clazz) {
- return "org.apache.activemq.actor.generated."+clazz.getName();
+ return clazz.getName()+"$__ACTOR_PROXY__";
}
- private static final class Generator<T> {
+ private static final class Generator<T> implements Opcodes {
private static final String RUNNABLE = "java/lang/Runnable";
private static final String OBJECT_CLASS = "java/lang/Object";
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java Thu Dec 17 20:21:04 2009
@@ -17,6 +17,10 @@
package org.apache.activemq.dispatch;
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
public enum DispatchOption {
/**
* Updates the target queue to be the
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchPriority.java Thu Dec 17 20:21:04 2009
@@ -17,6 +17,10 @@
package org.apache.activemq.dispatch;
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
public enum DispatchPriority {
HIGH,
DEFAULT,
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java Thu Dec 17 20:21:04 2009
@@ -19,7 +19,10 @@
import java.nio.channels.SelectableChannel;
-
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
public interface Dispatcher extends Retained {
public DispatchQueue getGlobalQueue();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java Thu Dec 17 20:21:04 2009
@@ -1,11 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.activemq.dispatch;
/**
* Handy interface to signal classes which would like an Dispatcher instance
* injected into them.
- *
- * @author chirino
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public interface DispatcherAware {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java Thu Dec 17 20:21:04 2009
@@ -19,6 +19,10 @@
import org.apache.activemq.dispatch.internal.simple.DispatcherThread;
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
public interface DispatcherObserver {
public void onThreadCreate(DispatcherThread thread);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java Thu Dec 17 20:21:04 2009
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.dispatch.internal;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.activemq.dispatch.DispatchObject;
import org.apache.activemq.dispatch.DispatchQueue;
@@ -27,6 +29,7 @@
protected volatile Object context;
protected volatile DispatchQueue targetQueue;
+ protected final AtomicInteger suspendCounter = new AtomicInteger();
@SuppressWarnings("unchecked")
public <Context> Context getContext() {
@@ -45,5 +48,17 @@
return this.targetQueue;
}
+ public void resume() {
+ if( suspendCounter.decrementAndGet() == 0 ) {
+ onResume();
+ }
+ }
+
+ public void suspend() {
+ suspendCounter.incrementAndGet();
+ }
+
+ protected void onResume() {
+ }
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java Thu Dec 17 20:21:04 2009
@@ -36,7 +36,6 @@
abstract public class AbstractSerialDispatchQueue extends AbstractDispatchObject implements DispatchQueue, Runnable {
protected final String label;
- protected final AtomicInteger suspendCounter = new AtomicInteger();
protected final AtomicInteger executeCounter = new AtomicInteger();
protected final AtomicLong externalQueueSize = new AtomicLong();
@@ -64,14 +63,9 @@
return label;
}
- public void resume() {
- if( suspendCounter.decrementAndGet() == 0 ) {
- dispatchSelfAsync();
- }
- }
-
- public void suspend() {
- suspendCounter.incrementAndGet();
+ @Override
+ protected void onResume() {
+ dispatchSelfAsync();
}
public void execute(Runnable command) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java Thu Dec 17 20:21:04 2009
@@ -1,12 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.activemq.dispatch.internal;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
public class BaseRetained {
final protected AtomicInteger retainCounter = new AtomicInteger(0);
- final protected ArrayList<Runnable> shutdownHandlers = new ArrayList<Runnable>();
+ final protected ArrayList<Runnable> shutdownHandlers = new ArrayList<Runnable>(1);
public void addShutdownWatcher(Runnable shutdownHandler) {
synchronized(shutdownHandlers) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableCountDownLatch.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableCountDownLatch.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableCountDownLatch.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableCountDownLatch.java Thu Dec 17 20:21:04 2009
@@ -19,6 +19,10 @@
import java.util.concurrent.CountDownLatch;
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
public class RunnableCountDownLatch extends CountDownLatch implements Runnable {
public RunnableCountDownLatch(int count) {
super(count);
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableSupport.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableSupport.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/RunnableSupport.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class RunnableSupport {
+
+ private static Runnable NO_OP = new Runnable() {
+ public void run() {
+ }
+ };
+
+ public static Runnable runNoop() {
+ return NO_OP;
+ }
+
+ public static Runnable runOnceAfter(final Runnable runnable, int count) {
+ if( count <= 0 || runnable==null ) {
+ return NO_OP;
+ }
+ if( count == 1 ) {
+ return runnable;
+ }
+ final AtomicInteger counter = new AtomicInteger(count);
+ return new Runnable() {
+ public void run() {
+ if( counter.decrementAndGet()==0 ) {
+ runnable.run();
+ }
+ }
+ };
+ }
+
+ public static Runnable runAfter(final Runnable runnable, int count) {
+ if( count <= 0 || runnable==null ) {
+ return NO_OP;
+ }
+ if( count == 1 ) {
+ return runnable;
+ }
+ final AtomicInteger counter = new AtomicInteger(count);
+ return new Runnable() {
+ public void run() {
+ if( counter.decrementAndGet()<=0 ) {
+ runnable.run();
+ }
+ }
+ };
+ }
+
+ public static Runnable runOnceAfter(final DispatchQueue queue, final Runnable runnable, int count) {
+ if( count <= 0 || runnable==null ) {
+ return NO_OP;
+ }
+ final AtomicInteger counter = new AtomicInteger(count);
+ return new Runnable() {
+ public void run() {
+ if( counter.decrementAndGet()==0 ) {
+ queue.dispatchAsync(runnable);
+ }
+ }
+ };
+ }
+
+ public static Runnable runAfter(final DispatchQueue queue, final Runnable runnable, int count) {
+ if( count <= 0 || runnable==null ) {
+ return NO_OP;
+ }
+ final AtomicInteger counter = new AtomicInteger(count);
+ return new Runnable() {
+ public void run() {
+ if( counter.decrementAndGet()<=0 ) {
+ queue.dispatchAsync(runnable);
+ }
+ }
+ };
+ }
+
+
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/IntegerCounter.java Thu Dec 17 20:21:04 2009
@@ -1,10 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.activemq.dispatch.internal.simple;
-
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ *
+ */
public class IntegerCounter {
int counter;
+ public IntegerCounter() {
+ }
+
+ public IntegerCounter(int count) {
+ this.counter = count;
+ }
+
@Override
public boolean equals(Object obj) {
if (this == obj)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java Thu Dec 17 20:21:04 2009
@@ -24,6 +24,10 @@
import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
public final class SerialDispatchQueue extends AbstractSerialDispatchQueue implements SimpleQueue {
private final SimpleDispatcher dispatcher;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java Thu Dec 17 20:21:04 2009
@@ -1,8 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.activemq.dispatch.internal.simple;
import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.DispatchPriority;
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ *
+ */
public interface SimpleQueue extends DispatchQueue {
DispatchPriority getPriority();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java Thu Dec 17 20:21:04 2009
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.activemq.dispatch.internal.simple;
import java.util.ArrayList;
@@ -8,6 +24,10 @@
import static org.apache.activemq.dispatch.internal.simple.TimerThread.Type.*;
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
final public class TimerThread extends Thread {
enum Type {
RELATIVE,
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorBenchmark.java Thu Dec 17 20:21:04 2009
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.activemq.actor;
import java.util.concurrent.TimeUnit;
@@ -7,26 +23,21 @@
import static java.lang.String.*;
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
public class ActorBenchmark {
public static class PizzaService implements IPizzaService {
long counter;
- @Message
public void order(long count) {
counter += count;
}
}
@Test
- public void benchmarkCGLibProxy() throws Exception {
- String name = "cglib proxy";
- PizzaService service = new PizzaService();
- IPizzaService proxy = Actor.create(service, createQueue());
- benchmark(name, service, proxy);
- }
-
- @Test
public void benchmarkCustomProxy() throws Exception {
String name = "custom proxy";
PizzaService service = new PizzaService();
@@ -38,7 +49,7 @@
public void benchmarkAsmProxy() throws Exception {
String name = "asm proxy";
PizzaService service = new PizzaService();
- IPizzaService proxy = AsmActor.create(IPizzaService.class, service, createQueue());
+ IPizzaService proxy = ActorProxy.create(IPizzaService.class, service, createQueue());
benchmark(name, service, proxy);
}
@@ -47,12 +58,8 @@
public void dispatchAsync(Runnable runnable) {
runnable.run();
}
-
public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
- throw new RuntimeException("TODO: implement me.");
-
}
-
};
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorProxyTest.java (from r891451, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorProxyTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorProxyTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java&r1=891451&r2=891866&rev=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/AsmActorTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorProxyTest.java Thu Dec 17 20:21:04 2009
@@ -1,23 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.activemq.actor;
import java.util.concurrent.TimeUnit;
-import junit.framework.Assert;
-
import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
import org.junit.Test;
import static junit.framework.Assert.*;
-import static junit.framework.Assert.*;
-
-import static junit.framework.Assert.*;
-
-import static junit.framework.Assert.*;
-
-import static junit.framework.Assert.*;
-
-public class AsmActorTest {
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ActorProxyTest {
public static interface TestInterface {
void strings(String value, String[] value2);
@@ -65,7 +75,7 @@
}
};
- proxy = AsmActor.create(TestInterface.class, service, createQueue());
+ proxy = ActorProxy.create(TestInterface.class, service, createQueue());
proxy.strings(expected1, expected2);
}
@@ -83,7 +93,7 @@
}
};
- proxy = AsmActor.create(TestInterface.class, service, createQueue());
+ proxy = ActorProxy.create(TestInterface.class, service, createQueue());
proxy.shorts(expected1, expected2);
}
@@ -97,7 +107,7 @@
}
};
- proxy = AsmActor.create(TestInterface.class, service, createQueue());
+ proxy = ActorProxy.create(TestInterface.class, service, createQueue());
String actual = proxy.returnString();
assertNull(actual);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java Thu Dec 17 20:21:04 2009
@@ -17,52 +17,44 @@
import org.apache.activemq.dispatch.DispatcherConfig;
import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatcher;
-import static org.apache.activemq.dispatch.DispatchOption.*;
-
-
-/**
+/**
* ActorTest
* <p>
* Description:
* </p>
+ *
* @author cmacnaug
* @version 1.0
*/
public class ActorTest extends TestCase {
- public static class ActorTestObject
- {
- @Message
- public void actorInvocation(CountDownLatch latch)
- {
- latch.countDown();
- }
-
- public void straightThrough(CountDownLatch latch)
- {
+ interface TestObjectActor {
+ public void actorInvocation(CountDownLatch latch);
+ }
+
+ public static class TestObject implements TestObjectActor {
+ public void actorInvocation(CountDownLatch latch) {
latch.countDown();
}
-
}
-
- public void testActorInvocation() throws Exception
- {
+
+ public void testActorInvocation() throws Exception {
Dispatcher advancedSystem = new AdvancedDispatcher(new DispatcherConfig());
advancedSystem.retain();
-
- DispatchQueue queue = advancedSystem.createSerialQueue("test", STICK_TO_CALLER_THREAD);
- ActorTestObject testObject = Actor.create(new ActorTestObject(), queue);
-
+
+ DispatchQueue queue = advancedSystem.createSerialQueue("test");
+ TestObjectActor actor = ActorProxy.create(TestObjectActor.class, new TestObject(), queue);
+
CountDownLatch latch = new CountDownLatch(1);
- testObject.actorInvocation(latch);
+ actor.actorInvocation(latch);
assertTrue(latch.await(1, TimeUnit.SECONDS));
-
+
queue.suspend();
latch = new CountDownLatch(1);
- testObject.actorInvocation(latch);
+ actor.actorInvocation(latch);
assertFalse("Suspended Queue shouldn't invoked method", latch.await(2, TimeUnit.SECONDS));
-
+
queue.resume();
assertTrue("Resumed Queue should invoke method", latch.await(2, TimeUnit.SECONDS));
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/IPizzaService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/IPizzaService.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/IPizzaService.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/IPizzaService.java Thu Dec 17 20:21:04 2009
@@ -17,7 +17,9 @@
package org.apache.activemq.actor;
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
public interface IPizzaService {
- @Message
public void order(long count);
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/PizzaServiceCustomProxy.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/PizzaServiceCustomProxy.java?rev=891866&r1=891865&r2=891866&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/PizzaServiceCustomProxy.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/PizzaServiceCustomProxy.java Thu Dec 17 20:21:04 2009
@@ -19,6 +19,9 @@
import org.apache.activemq.dispatch.DispatchQueue;
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
public class PizzaServiceCustomProxy implements IPizzaService {
private final DispatchQueue queue;
private final IPizzaService target;
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.dispatch.internal.BaseRetained;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.FlowControl;
+import org.apache.activemq.flow.Commands.Destination.DestinationBean;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBean;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
+import org.apache.activemq.queue.actor.transport.Transport;
+import org.apache.activemq.queue.actor.transport.TransportHandler;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class BaseConnection extends BaseRetained {
+
+ protected interface Protocol extends TransportHandler {
+ void start();
+ void shutdown(Runnable onShutdown);
+ }
+
+ protected String name;
+ protected Dispatcher dispatcher;
+
+ protected DispatchQueue dispatchQueue;
+ protected Protocol actor;
+
+
+ @Override
+ protected void startup() {
+ super.startup();
+ dispatchQueue = dispatcher.createSerialQueue(name);
+ createActor();
+ actor.start();
+ }
+
+ @Override
+ protected void shutdown() {
+ actor.shutdown(new Runnable() {
+ public void run() {
+ // notifies registered shutdown handlers
+ BaseConnection.super.shutdown();
+ }
+ });
+ }
+
+ public static class WindowController extends WindowLimiter {
+
+ private int maxSize;
+ private int processed;
+ private int creditsAt;
+
+ public int processed(int count) {
+ int rc = 0;
+ processed += count;
+ if( processed >= creditsAt ) {
+ change(processed);
+ rc = processed;
+ processed = 0;
+ }
+ return rc;
+ }
+
+ int maxSize(int newMaxSize) {
+ int change = newMaxSize-maxSize;
+ this.maxSize=newMaxSize;
+ this.creditsAt = maxSize/2;
+ change(change);
+ return change;
+ }
+
+ int maxSize() {
+ return maxSize;
+ }
+
+ }
+
+ public static class WindowLimiter {
+
+ private int opensAt = 1;
+ private int size;
+ private boolean closed;
+
+ public WindowLimiter() {
+ this.closed = true;
+ }
+
+ int size() {
+ return size;
+ }
+
+ WindowLimiter size(int size) {
+ this.size = size;
+ return this;
+ }
+
+ public boolean isOpen() {
+ return !closed;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public void change(int change) {
+ size += change;
+ if( change > 0 && closed && size >= opensAt) {
+ closed = false;
+ } else if( change < 0 && !closed && size <= 0) {
+ closed = true;
+ }
+ }
+
+ }
+
+ abstract protected void createActor();
+
+ // The actor pattern ensures that this object is only accessed in
+ // serial execution context. So synchronization is required.
+ // It also places a restriction that all operations should
+ // avoid mutex contention and avoid blocking IO calls.
+ protected class ProtocolImpl implements Protocol {
+
+ final protected WindowController inboundSessionWindow = new WindowController();
+ final protected WindowLimiter outboundSessionWindow = new WindowLimiter();
+ final protected WindowLimiter outboundTransportWindow = new WindowLimiter();
+
+ protected Transport transport;
+ protected Runnable onShutdown;
+ protected boolean disconnected;
+ protected Exception failure;
+
+ ProtocolImpl() {
+ outboundTransportWindow.size(100);
+ }
+
+ public void start() {
+
+ transport.setTargetQueue(dispatchQueue);
+ transport.setHandler(this);
+ transport.resume();
+ }
+
+ public void shutdown(Runnable onShutdown) {
+ if( disconnected ) {
+ onShutdown.run();
+ } else {
+ this.onShutdown = onShutdown;
+ transport.release();
+ }
+ }
+
+ public void onConnect() {
+ sendFlowControl(inboundSessionWindow.maxSize(1000));
+ }
+
+ public void onDisconnect() {
+ disconnected = true;
+ if( onShutdown!=null ) {
+ shutdown(onShutdown);
+ onShutdown=null;
+ }
+ }
+
+ public void onFailure(Exception failure) {
+ failure.printStackTrace();
+ this.failure = failure;
+ }
+
+ public void onRecevie(Object command) {
+ if (command.getClass() == Message.class) {
+ // We should not be getting messages
+ // when the window is closed..
+ if( inboundSessionWindow.isClosed() ) {
+ onFailure(new Exception("Session overrun: " + command));
+ }
+ outboundSessionWindow.change(-1);
+ onReceiveMessage((Message) command);
+ } else if (command.getClass() == FlowControlBean.class || command.getClass() == FlowControlBuffer.class) {
+ onReceiveFlowControl((FlowControl) command);
+ } else if (command.getClass() == String.class) {
+ onReceiveString((String)command);
+ } else if (command.getClass() == DestinationBuffer.class || command.getClass() == DestinationBean.class) {
+ onReceiveDestination((Destination)command);
+ } else {
+ onFailure(new Exception("Unrecognized command: " + command));
+ }
+ }
+
+ public void sessionSend(Message message) {
+ transportSend(message);
+ }
+
+ protected void onReceiveDestination(Destination command) {
+ }
+
+ protected void onReceiveString(String command) {
+ }
+
+ protected void onReceiveMessage(Message msg) {
+ sendFlowControl(inboundSessionWindow.processed(1));
+ }
+
+ private void sendFlowControl(int credits) {
+ if( credits!=0 ) {
+ FlowControlBean fc = new FlowControlBean();
+ fc.setCredit(credits);
+ transportSend(fc);
+ }
+ }
+
+ public void transportSend(Object message) {
+ outboundTransportWindow.change(-1);
+ transport.send(message, onSendCompleted, dispatchQueue);
+ }
+
+ private final Runnable onSendCompleted = new Runnable() {
+ public void run() {
+ boolean wasClosed = outboundTransportWindow.isClosed();
+ outboundTransportWindow.change(1);
+ if( !wasClosed && !isSessionSendBlocked() ) {
+ onSessionResume();
+ }
+ }
+ };
+
+ protected void onReceiveFlowControl(FlowControl command) {
+ boolean wasClosed = outboundSessionWindow.isClosed();
+ outboundSessionWindow.change(command.getCredit());
+ if( wasClosed && !isSessionSendBlocked() ) {
+ onSessionResume();
+ }
+
+ }
+
+ protected boolean isSessionSendBlocked() {
+ return outboundTransportWindow.isClosed() || outboundSessionWindow.isClosed();
+ }
+
+ protected void onSessionResume() {
+ }
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public Dispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public void setDispatcher(Dispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import java.util.LinkedList;
+
+import org.apache.activemq.actor.ActorProxy;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.queue.actor.transport.Transport;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class BrokerConnection extends BaseConnection implements DeliveryTarget {
+
+ interface BrokerProtocol extends Protocol {
+ public void onBrokerDispatch(Message msg, Runnable r);
+ }
+
+ public static class DispatchRequest {
+
+ private final Message message;
+ private final Runnable onComplete;
+
+ public DispatchRequest(Message message, Runnable onComplete) {
+ this.message = message;
+ this.onComplete = onComplete;
+ }
+
+ }
+
+ private MockBroker broker;
+ private BrokerProtocol brokerActor;
+ private Transport transport;
+ private int priorityLevels;
+
+ protected void createActor() {
+ actor = brokerActor = ActorProxy.create(BrokerProtocol.class, new BrokerProtocolImpl(), dispatchQueue);
+ }
+
+ protected class BrokerProtocolImpl extends ProtocolImpl implements BrokerProtocol {
+
+ String name;
+
+ @Override
+ public void start() {
+ this.transport = BrokerConnection.this.transport;
+ super.start();
+ }
+
+ // TODO: to increase fairness: we might want to have a pendingQueue per sender
+ final LinkedList<DispatchRequest> pendingQueue = new LinkedList<DispatchRequest>();
+
+ @Override
+ protected void onReceiveString(String remoteName) {
+ name = "broker->"+remoteName;
+ }
+
+ @Override
+ protected void onReceiveMessage(final Message msg) {
+ // We don't dish out flow control credit until the broker
+ // lets us know that the message routing completed.
+ // In the slow consumer case, it could take a while for him
+ // to complete the routing and we don't want to have th producer
+ // send us more messages than the max session protocol window
+ // is configured with.
+ broker.router.route(msg, dispatchQueue, new Runnable() {
+ public void run() {
+ BrokerProtocolImpl.super.onReceiveMessage(msg);
+ }
+ });
+ }
+
+ @Override
+ protected void onReceiveDestination(Destination destination) {
+ broker.subscribe(destination, BrokerConnection.this);
+ }
+
+ public void onBrokerDispatch(Message message, Runnable onComplete) {
+ if( !isSessionSendBlocked() ) {
+ sessionSend(message);
+ onComplete.run();
+ } else {
+ pendingQueue.add(new DispatchRequest(message, onComplete));
+ }
+ }
+
+ @Override
+ protected void onSessionResume() {
+ while( !isSessionSendBlocked() ) {
+ DispatchRequest request = pendingQueue.poll();
+ if( request==null ) {
+ return;
+ }
+ sessionSend(request.message);
+ request.onComplete.run();
+ }
+ }
+
+ }
+
+ public void add(Message msg, Runnable r) {
+ brokerActor.onBrokerDispatch(msg, r);
+ }
+
+ public boolean hasSelector() {
+ return false;
+ }
+
+ public boolean match(Message message) {
+ return true;
+ }
+
+ public MockBroker getBroker() {
+ return broker;
+ }
+
+ public void setBroker(MockBroker broker) {
+ this.broker = broker;
+ }
+
+ public Transport getTransport() {
+ return transport;
+ }
+
+ public void setTransport(Transport transport) {
+ this.transport = transport;
+ }
+
+ public int getPriorityLevels() {
+ return priorityLevels;
+ }
+
+ public void setPriorityLevels(int priorityLevels) {
+ this.priorityLevels = priorityLevels;
+ }
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import org.apache.activemq.actor.ActorProxy;
+import org.apache.activemq.queue.actor.transport.TransportFactorySystem;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ClientConnection extends BaseConnection {
+
+ protected String connectUri;
+
+ public void setConnectUri(String uri) {
+ this.connectUri = uri;
+ }
+
+ protected void createActor() {
+ actor = ActorProxy.create(Protocol.class, new ClientProtocolImpl(), dispatchQueue);
+ }
+
+ protected class ClientProtocolImpl extends ProtocolImpl {
+
+ @Override
+ public void start() {
+ transport = TransportFactorySystem.connect(dispatcher, connectUri);
+ super.start();
+ }
+
+ public void onConnect() {
+ super.onConnect();
+ super.transportSend(name);
+ }
+
+ }
+
+
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.actor.ActorProxy;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ConsumerConnection extends ClientConnection {
+
+ private MetricAggregator totalConsumerRate;
+ private long thinkTime;
+ private Destination destination;
+ private String selector;
+ private boolean schedualWait = true;
+ private final MetricCounter rate = new MetricCounter();
+
+ protected void createActor() {
+ actor = ActorProxy.create(Protocol.class, new ProducerProtocolImpl(), dispatchQueue);
+ }
+
+ class ProducerProtocolImpl extends ClientProtocolImpl {
+
+ @Override
+ public void start() {
+ rate.name("Consumer " + name + " Rate");
+ totalConsumerRate.add(rate);
+ super.start();
+ }
+
+ @Override
+ public void onConnect() {
+ super.onConnect();
+ transportSend(destination);
+ }
+
+ @Override
+ protected void onReceiveMessage(final Message msg) {
+ if (thinkTime > 0) {
+ dispatchQueue.dispatchAfter(new Runnable() {
+ public void run() {
+ rate.increment();
+ ProducerProtocolImpl.super.onReceiveMessage(msg);
+ }
+ }, thinkTime, TimeUnit.MILLISECONDS);
+
+ } else {
+ rate.increment();
+ super.onReceiveMessage(msg);
+ }
+ }
+
+
+ }
+
+ public MetricAggregator getTotalConsumerRate() {
+ return totalConsumerRate;
+ }
+
+ public void setTotalConsumerRate(MetricAggregator totalConsumerRate) {
+ this.totalConsumerRate = totalConsumerRate;
+ }
+
+ public long getThinkTime() {
+ return thinkTime;
+ }
+
+ public void setThinkTime(long thinkTime) {
+ this.thinkTime = thinkTime;
+ }
+
+ public Destination getDestination() {
+ return destination;
+ }
+
+ public void setDestination(Destination destination) {
+ this.destination = destination;
+ }
+
+ public String getSelector() {
+ return selector;
+ }
+
+ public void setSelector(String selector) {
+ this.selector = selector;
+ }
+
+ public boolean isSchedualWait() {
+ return schedualWait;
+ }
+
+ public void setSchedualWait(boolean schedualWait) {
+ this.schedualWait = schedualWait;
+ }
+
+ public MetricCounter getRate() {
+ return rate;
+ }
+
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DeliveryTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DeliveryTarget.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DeliveryTarget.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DeliveryTarget.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.queue.actor.perf;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface DeliveryTarget {
+
+ /**
+ * @return true if this sub has a selector
+ */
+ public boolean hasSelector();
+
+ public boolean match(Message message);
+
+ public void add(Message msg, Runnable r);
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Message.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Message.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Message.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/Message.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import java.io.Serializable;
+
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.Message.MessageBean;
+import org.apache.activemq.flow.Commands.Message.MessageBuffer;
+import org.apache.activemq.util.Mapper;
+import org.apache.activemq.util.buffer.UTF8Buffer;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Message implements Serializable {
+
+ private static final long serialVersionUID = 6759761889075451996L;
+
+ public static final Mapper<Integer, Message> PRIORITY_MAPPER = new Mapper<Integer, Message>() {
+ public Integer map(Message element) {
+ return element.getPriority();
+ }
+ };
+
+ public static final int MAX_USER_PRIORITY = 10;
+ public static final int MAX_PRIORITY = MAX_USER_PRIORITY + 1;
+ public static final int SYSTEM_PRIORITY = MAX_PRIORITY;
+
+ public static final short TYPE_NORMAL = 0;
+ public static final short TYPE_FLOW_CONTROL = 1;
+ public static final short TYPE_FLOW_OPEN = 2;
+ public static final short TYPE_FLOW_CLOSE = 3;
+
+ transient Flow flow;
+ private MessageBuffer message;
+
+ Message(long msgId, int producerId, String msg, Flow flow, Destination dest, int priority) {
+ MessageBean message = new MessageBean();
+ message.setMsgId(msgId);
+ message.setProducerId(producerId);
+ message.setMsg(new UTF8Buffer(msg));
+ message.setDest(dest);
+ message.setPriority(priority);
+ this.message = message.freeze();
+ this.flow = flow;
+ }
+
+ Message(Message m) {
+ this.message = m.message;
+ this.flow = m.flow;
+ }
+
+ public Message(MessageBuffer m) {
+ this.message=m;
+ }
+
+ public short type() {
+ return TYPE_NORMAL;
+ }
+
+ public void setProperty(String matchProp) {
+ message = message.copy().addProperty(matchProp).freeze();
+ }
+
+ public boolean match(String matchProp) {
+ if (!message.hasProperty()) {
+ return false;
+ }
+ return message.getPropertyList().contains(matchProp);
+ }
+
+ public boolean isSystem() {
+ return false;
+ }
+
+ public void incrementHopCount() {
+ message = message.copy().setHopCount(message.getHopCount()).freeze();
+ }
+
+ public final int getHopCount() {
+ return message.getHopCount();
+ }
+
+ public final Destination getDestination() {
+ return message.getDest();
+ }
+
+ public Flow getFlow() {
+ return flow;
+ }
+
+ public int getFlowLimiterSize() {
+ return 1;
+ }
+
+ public int getPriority() {
+ return message.getPriority();
+ }
+
+ public String toString() {
+ return "Message: " + message.getMsg() + " flow + " + flow + " dest: " + message.getDest();
+ }
+
+ public long getMsgId() {
+ return message.getMsgId();
+ }
+
+ public int getProducerId() {
+ return message.getProducerId();
+ }
+
+ public MessageBuffer getProto() {
+ return message;
+ }
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MessageGenerator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MessageGenerator.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MessageGenerator.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MessageGenerator.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface MessageGenerator {
+ interface MessageReadyListener {
+ public void onMessageReady(Message m);
+ }
+
+ public void addMessageReadyListener(MessageReadyListener listener);
+
+ public Message pollMessage();
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java?rev=891866&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java Thu Dec 17 20:21:04 2009
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.dispatch.DispatcherConfig;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.queue.actor.transport.Transport;
+import org.apache.activemq.queue.actor.transport.TransportFactorySystem;
+import org.apache.activemq.queue.actor.transport.TransportServer;
+import org.apache.activemq.queue.actor.transport.TransportServerHandler;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class MockBroker implements TransportServerHandler {
+
+ final Router router = new Router();
+
+ final ArrayList<BrokerConnection> connections = new ArrayList<BrokerConnection>();
+ final ArrayList<BrokerConnection> brokerConnections = new ArrayList<BrokerConnection>();
+ final HashMap<Destination, MockQueue> queues = new HashMap<Destination, MockQueue>();
+
+ private TransportServer transportServer;
+ private String uri;
+ private String name;
+ protected Dispatcher dispatcher;
+ private final AtomicBoolean stopping = new AtomicBoolean();
+ private boolean useInputQueues = false;
+
+ private DispatchQueue brokerDispatchQueue;
+
+ public boolean isUseInputQueues() {
+ return useInputQueues;
+ }
+
+ public void setUseInputQueues(boolean useInputQueues) {
+ this.useInputQueues = useInputQueues;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void subscribe(Destination destination, DeliveryTarget deliveryTarget) {
+ if (destination.getPtp()) {
+ queues.get(destination).addConsumer(deliveryTarget);
+ } else {
+ router.bind(deliveryTarget, destination);
+ }
+ }
+
+ public void addQueue(MockQueue queue) {
+ router.bind(queue, queue.getDestination());
+ queues.put(queue.getDestination(), queue);
+ }
+
+ public final void stopServices() throws Exception {
+ stopping.set(true);
+ transportServer.release();
+
+ for (BrokerConnection connection : connections) {
+ connection.release();
+ }
+ for (BrokerConnection connection : brokerConnections) {
+ connection.release();
+ }
+ for (MockQueue queue : queues.values()) {
+ queue.stop();
+ }
+ }
+
+ public final void startServices() throws Exception {
+ brokerDispatchQueue = dispatcher.createSerialQueue("broker");
+ transportServer = TransportFactorySystem.bind(dispatcher, uri);
+ transportServer.setTargetQueue(brokerDispatchQueue);
+ transportServer.setHandler(this);
+ transportServer.resume();
+
+ for (MockQueue queue : queues.values()) {
+ queue.start();
+ }
+
+ for (BrokerConnection connection : brokerConnections) {
+ connection.retain();
+ }
+ }
+
+ public void onBind() {
+ }
+
+ public void onUnbind() {
+ }
+
+ public void onAccept(final Transport transport) {
+ BrokerConnection connection = new BrokerConnection();
+ connection.setBroker(this);
+ connection.setTransport(transport);
+ connection.setPriorityLevels(MockBrokerTest.PRIORITY_LEVELS);
+ connection.setDispatcher(dispatcher);
+ connections.add(connection);
+ try {
+ connection.retain();
+ } catch (Exception e1) {
+ onFailure(e1);
+ }
+ }
+
+ public void onFailure(Exception error) {
+ System.out.println("Accept error: " + error);
+ error.printStackTrace();
+ }
+
+ public Dispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setDispatcher(Dispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ public String getConnectURI() {
+ return transportServer.getConnectURI();
+ }
+
+ public boolean isStopping() {
+ return stopping.get();
+ }
+
+ protected void createDispatcher() {
+ if (dispatcher == null) {
+ dispatcher = DispatcherConfig.create("mock-broker", Runtime.getRuntime().availableProcessors());
+ }
+ }
+
+ /**
+ * Run the broker as a standalone app
+ *
+ * @param args
+ * The arguments.
+ */
+ public static void main(String[] args) {
+ if (args.length < 1) {
+ System.err.println("Must supply a bind uri");
+ }
+ String uri = args[0];
+
+ final MockBroker broker = new MockBroker();
+ broker.setUri(uri);
+ broker.setName("Broker");
+ broker.createDispatcher();
+ try {
+ broker.getDispatcher().retain();
+ broker.startServices();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ try {
+ broker.stopServices();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ public void onAccept(org.apache.activemq.transport.Transport transport) {
+ }
+
+}
\ No newline at end of file
|