activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r961062 [5/14] - in /activemq/sandbox/activemq-apollo-actor: ./ activemq-all/ activemq-all/src/test/ide-resources/ activemq-all/src/test/java/org/apache/activemq/jaxb/ activemq-all/src/test/java/org/apache/activemq/legacy/ activemq-all/src/...
Date Wed, 07 Jul 2010 03:24:44 GMT
Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java Wed Jul  7 03:24:02 2010
@@ -17,10 +17,13 @@
 package org.apache.activemq.transport;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ExceptionResponse;
@@ -39,7 +42,7 @@ import org.apache.commons.logging.LogFac
 public class ResponseCorrelator extends TransportFilter {
 
     private static final Log LOG = LogFactory.getLog(ResponseCorrelator.class);
-    private final Map<Integer, FutureResponse> requestMap = new HashMap<Integer, FutureResponse>();
+    private final Map<Integer, RequestCallback> requestMap = new HashMap<Integer, RequestCallback>();
     private IntSequenceGenerator sequenceGenerator;
     private final boolean debug = LOG.isDebugEnabled();
     private IOException error;
@@ -53,36 +56,66 @@ public class ResponseCorrelator extends 
         this.sequenceGenerator = sequenceGenerator;
     }
 
-    public void oneway(Object o) throws IOException {
-        Command command = (Command)o;
-        command.setCommandId(sequenceGenerator.getNextSequenceId());
-        command.setResponseRequired(false);
-        next.oneway(command);
+    @Override
+    public void oneway(final Object o, final CompletionCallback callback) {
+        next.getDispatchQueue().execute(new Runnable(){
+            public void run() {
+                Command command = (Command)o;
+                command.setCommandId(sequenceGenerator.getNextSequenceId());
+                command.setResponseRequired(false);
+                next.oneway(command, callback);
+            }
+        });
     }
 
-    public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
-        Command command = (Command)o;
-        command.setCommandId(sequenceGenerator.getNextSequenceId());
-        command.setResponseRequired(true);
-        FutureResponse future = new FutureResponse(responseCallback);
-        synchronized (requestMap) {
-            if( this.error !=null ) {
-                throw error;
+    public <T> void request(final Object o, final RequestCallback<T> responseCallback) throws IOException {
+        next.getDispatchQueue().execute(new Runnable(){
+            public void run() {
+                Command command = (Command)o;
+                command.setCommandId(sequenceGenerator.getNextSequenceId());
+                command.setResponseRequired(true);
+                requestMap.put(command.getCommandId(), responseCallback);
+                oneway(command, null);
             }
-            requestMap.put(new Integer(command.getCommandId()), future);
-        }
-        next.oneway(command);
-        return future;
+        });
     }
 
-    public Object request(Object command) throws IOException {
-        FutureResponse response = asyncRequest(command, null);
-        return response.getResult();
+    public Object request(final Object o) throws IOException {
+        return request(o, -1);
     }
 
-    public Object request(Object command, int timeout) throws IOException {
-        FutureResponse response = asyncRequest(command, null);
-        return response.getResult(timeout);
+    public Object request(final Object o, final int timeout) throws IOException {
+        final CountDownLatch done = new CountDownLatch(1);
+        final Object[] result = new Object [2];
+        request(o, new RequestCallback() {
+            public void onCompletion(Object resp) {
+                result[0] = resp;
+                done.countDown();
+            }
+            public void onFailure(Throwable caught) {
+                result[1] = caught;
+                done.countDown();
+            }
+        });
+        try {
+            if( timeout < 0 ) {
+                done.await();
+            } else {
+                if( !done.await(timeout, TimeUnit.MILLISECONDS) ) {
+                    return null;
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new InterruptedIOException();
+        }
+        if( result[1]!=null ) {
+            if( result[1] instanceof IOException ) {
+                throw (IOException)result[1];
+            }
+            throw new RuntimeException((Throwable)result[1]);
+        }
+        return result[0];
     }
 
     public void onCommand(Object o) {
@@ -94,12 +127,10 @@ public class ResponseCorrelator extends 
         }
         if (command.isResponse()) {
             Response response = (Response)command;
-            FutureResponse future = null;
-            synchronized (requestMap) {
-                future = requestMap.remove(Integer.valueOf(response.getCorrelationId()));
-            }
-            if (future != null) {
-                future.set(response);
+            RequestCallback callback = null;
+            callback = requestMap.remove(Integer.valueOf(response.getCorrelationId()));
+            if (callback != null) {
+                callback.onCompletion(response);
             } else {
                 if (debug) {
                     LOG.debug("Received unexpected response: {" + command + "}for command id: " + response.getCorrelationId());
@@ -126,18 +157,17 @@ public class ResponseCorrelator extends 
     }
 
     private void dispose(IOException error) {
-        ArrayList<FutureResponse> requests=null; 
+        ArrayList<RequestCallback> requests=null;
         synchronized(requestMap) {
             if( this.error==null) {
                 this.error = error;
-                requests = new ArrayList<FutureResponse>(requestMap.values());
+                requests = new ArrayList<RequestCallback>(requestMap.values());
                 requestMap.clear();
             }
         }
         if( requests!=null ) {
-            for (Iterator<FutureResponse> iter = requests.iterator(); iter.hasNext();) {
-                FutureResponse fr = iter.next();
-                fr.set(new ExceptionResponse(error));
+            for (RequestCallback callback : requests) {
+                callback.onFailure(error);
             }
         }
     }
@@ -146,7 +176,4 @@ public class ResponseCorrelator extends 
         return sequenceGenerator;
     }
 
-    public String toString() {
-        return next.toString();
-    }
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java Wed Jul  7 03:24:02 2010
@@ -92,16 +92,22 @@ public class WireFormatNegotiator extend
         readyCountDownLatch.countDown();
     }
 
-    public void oneway(Object command) throws IOException {
+    public void oneway(Object command, CompletionCallback callback) {
         try {
-            if (!readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) {
-                throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");
+            try {
+                if (!readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) {
+                    throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new InterruptedIOException();
+            }
+        } catch (IOException e) {
+            if( callback!=null) {
+                callback.onFailure(e);
             }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new InterruptedIOException();
         }
-        super.oneway(command);
+        super.oneway(command, callback);
     }
 
     public void onCommand(Object o) {
@@ -165,7 +171,7 @@ public class WireFormatNegotiator extend
     }
 
     protected void sendWireFormat(WireFormatInfo info) throws IOException {
-        next.oneway(info);
+        next.oneway(info, null);
     }
 
     protected void onWireFormatNegotiated(WireFormatInfo info) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/StubConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/StubConnection.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/StubConnection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/StubConnection.java Wed Jul  7 03:24:02 2010
@@ -29,6 +29,7 @@ import org.apache.activemq.command.Respo
 import org.apache.activemq.command.ShutdownInfo;
 
 import org.apache.activemq.transport.DefaultTransportListener;
+import org.apache.activemq.transport.ResponseCorrelator;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.JMSExceptionSupport;
@@ -37,12 +38,12 @@ import org.apache.activemq.util.ServiceS
 public class StubConnection implements Service {
 
     private final BlockingQueue<Object> dispatchQueue = new LinkedBlockingQueue<Object>();
-    private Transport transport;
+    private ResponseCorrelator transport;
     private TransportListener listener;
     public AtomicReference<Throwable> error = new AtomicReference<Throwable>();
 
     public StubConnection(Transport transport) throws Exception {
-        this.transport = transport;
+        this.transport = new ResponseCorrelator(transport);
         transport.setTransportListener(new DefaultTransportListener() {
             public void onCommand(Object command) {
                 try {
@@ -79,7 +80,7 @@ public class StubConnection implements S
             message.setProducerId(message.getMessageId().getProducerId());
         }
         command.setResponseRequired(false);
-        transport.oneway(command);
+        transport.oneway(command, null);
     }
 
     public Response request(Command command) throws Exception {
@@ -105,10 +106,7 @@ public class StubConnection implements S
 
     public void stop() throws Exception {
         if (transport != null) {
-            try {
-                transport.oneway(new ShutdownInfo());
-            } catch (IOException e) {
-            }
+            transport.oneway(new ShutdownInfo(), null);
             ServiceSupport.dispose(transport);
         }
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTestScenario.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTestScenario.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTestScenario.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTestScenario.java Wed Jul  7 03:24:02 2010
@@ -117,7 +117,7 @@ public class BrokerTestScenario implemen
         return broker;
     }
 
-	public TransportServer createTransnportServer() throws IOException, URISyntaxException {
+	public TransportServer createTransnportServer() throws Exception {
 		return TransportFactory.bind(new URI(getBindURI()));
 	}
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Wed Jul  7 03:24:02 2010
@@ -38,10 +38,6 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.dispatch.DispatchPriority;
-import org.apache.activemq.dispatch.DispatchQueue;
-import org.apache.activemq.dispatch.Dispatcher;
-import org.apache.activemq.dispatch.DispatcherConfig;
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
@@ -60,6 +56,8 @@ import org.apache.activemq.queue.IQueue;
 import org.apache.activemq.queue.QueueDispatchTarget;
 import org.apache.activemq.queue.SingleFlowRelay;
 import org.apache.activemq.queue.Subscription;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
 
 import static org.apache.activemq.dispatch.DispatchOption.*;
 
@@ -67,7 +65,6 @@ public class SharedQueuePerfTest extends
 
     private static int PERFORMANCE_SAMPLES = 5;
 
-    Dispatcher dispatcher;
     BrokerDatabase database;
     BrokerQueueStore queueStore;
     private static final boolean USE_KAHA_DB = true;
@@ -84,17 +81,11 @@ public class SharedQueuePerfTest extends
     protected ArrayList<Producer> producers = new ArrayList<Producer>();
     protected ArrayList<IQueue<Long, MessageDelivery>> queues = new ArrayList<IQueue<Long, MessageDelivery>>();
 
-    protected Dispatcher createDispatcher() {
-        return DispatcherConfig.create("pref-test", THREAD_POOL_SIZE);
-    }
-
     protected int consumerStartDelay = 0;
 
     protected void startServices() throws Exception {
-        dispatcher = createDispatcher();
-        dispatcher.resume();
         database = new BrokerDatabase(createStore());
-        database.setDispatcher(dispatcher);
+        database.setDispatchQueue(Dispatch.createQueue());
         if( TEST_MAX_STORE_LATENCY ) {
         	database.setFlushDelay(0);
         	database.setStoreBypass(false);
@@ -102,13 +93,11 @@ public class SharedQueuePerfTest extends
         database.start();
         queueStore = new BrokerQueueStore();
         queueStore.setDatabase(database);
-        queueStore.setDispatcher(dispatcher);
         queueStore.loadQueues();
     }
 
     protected void stopServices() throws Exception {
         database.stop();
-        dispatcher.release();
         consumers.clear();
         producers.clear();
         queues.clear();
@@ -218,7 +207,7 @@ public class SharedQueuePerfTest extends
             };
 
             if (consumerStartDelay > 0) {
-                dispatcher.getGlobalQueue().dispatchAfter(startConsumers, consumerStartDelay, TimeUnit.SECONDS);
+                Dispatch.getGlobalQueue().dispatchAfter(consumerStartDelay, TimeUnit.SECONDS, startConsumers);
             } else {
                 startConsumers.run();
             }
@@ -310,7 +299,7 @@ public class SharedQueuePerfTest extends
             sendRate.name("Producer " + name + " Rate");
             totalProducerRate.add(sendRate);
             
-            dispatchQueue = dispatcher.createSerialQueue(name, STICK_TO_CALLER_THREAD);
+            dispatchQueue = Dispatch.createQueue(name);
             dispatchTask = new Runnable(){
                 public void run() {
                     dispatch();
@@ -334,7 +323,7 @@ public class SharedQueuePerfTest extends
 
             Flow flow = new Flow(name, true);
             outboundQueue = new SingleFlowRelay<OpenWireMessageDelivery>(flow, name, limiter);
-            outboundQueue.setFlowExecutor(dispatcher.getGlobalQueue(DispatchPriority.HIGH));
+            outboundQueue.setFlowExecutor(Dispatch.getGlobalQueue());
             outboundQueue.setDrain(new QueueDispatchTarget<OpenWireMessageDelivery>() {
 
                 public void drain(OpenWireMessageDelivery elem, ISourceController<OpenWireMessageDelivery> controller) {
@@ -451,7 +440,7 @@ public class SharedQueuePerfTest extends
 
             controller = new FlowController<MessageDelivery>(null, flow, limiter, this);
             controller.useOverFlowQueue(false);
-            controller.setExecutor(dispatcher.getGlobalQueue(DispatchPriority.HIGH));
+            controller.setExecutor(Dispatch.getGlobalQueue());
 
             rate.name("Consumer " + name + " Rate");
             totalConsumerRate.add(rate);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java Wed Jul  7 03:24:02 2010
@@ -28,6 +28,7 @@ import org.apache.activemq.flow.FlowCont
 import org.apache.activemq.flow.IFlowResource;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.fusesource.hawtdispatch.Dispatch;
 
 public class OpenwireRemoteConsumer extends RemoteConsumer {
 
@@ -40,6 +41,8 @@ public class OpenwireRemoteConsumer exte
     private ConsumerInfo consumerInfo;
 
     private Message lastMessage;
+    private int inputWindowSize;
+    private int inputResumeThreshold;
 
     protected void initialize() {
         inputWindowSize = 1000;
@@ -71,7 +74,7 @@ public class OpenwireRemoteConsumer exte
                 return null;
             }
         }, flow, limiter, inboundMutex);
-        inboundController.setExecutor(getDispatcher().getGlobalQueue(DispatchPriority.HIGH));
+        inboundController.setExecutor(Dispatch.getGlobalQueue());
 
     }
 
@@ -83,12 +86,12 @@ public class OpenwireRemoteConsumer exte
         }
 
         connectionInfo = createConnectionInfo(name);
-        transport.oneway(connectionInfo);
+        transport.oneway(connectionInfo, null);
         sessionInfo = createSessionInfo(connectionInfo);
-        transport.oneway(sessionInfo);
+        transport.oneway(sessionInfo, null);
         consumerInfo = createConsumerInfo(sessionInfo, activemqDestination, isDurable() ? name : null);
         consumerInfo.setPrefetchSize(inputWindowSize);
-        transport.oneway(consumerInfo);
+        transport.oneway(consumerInfo, null);
     }
 
     public void onCommand(Object command) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteProducer.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteProducer.java Wed Jul  7 03:24:02 2010
@@ -36,6 +36,8 @@ public class OpenwireRemoteProducer exte
     private ProducerInfo producerInfo;
     private ActiveMQDestination activemqDestination;
     private WindowLimiter<MessageDelivery> outboundLimiter;
+    private int outputWindowSize = 1024*4;
+    private int outputResumeThreshold = 1024*4;
 
     protected void setupProducer() throws Exception, IOException {
         if (destination.getDomain().equals(Router.QUEUE_DOMAIN)) {
@@ -45,12 +47,12 @@ public class OpenwireRemoteProducer exte
         }
 
         connectionInfo = createConnectionInfo(name);
-        transport.oneway(connectionInfo);
+        transport.oneway(connectionInfo, null);
         sessionInfo = createSessionInfo(connectionInfo);
-        transport.oneway(sessionInfo);
+        transport.oneway(sessionInfo, null);
         producerInfo = createProducerInfo(sessionInfo);
         producerInfo.setWindowSize(outputWindowSize);
-        transport.oneway(producerInfo);
+        transport.oneway(producerInfo, null);
     }
 
     protected void initialize() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java Wed Jul  7 03:24:02 2010
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map.Entry;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowController;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java Wed Jul  7 03:24:02 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.queue;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Wed Jul  7 03:24:02 2010
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.dispatch.Dispatcher;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.util.Mapper;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java Wed Jul  7 03:24:02 2010
@@ -17,9 +17,8 @@
 package org.apache.activemq.queue;
 
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.util.buffer.AsciiBuffer;
-import org.apache.activemq.util.buffer.Buffer;
 
 public interface QueueStore<K, V> {
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/RestoreListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/RestoreListener.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/RestoreListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/RestoreListener.java Wed Jul  7 03:24:02 2010
@@ -19,7 +19,7 @@ package org.apache.activemq.queue;
 import java.util.Collection;
 
 /**
- * A callback used with the {@link BrokerDatabase#restoreMessages(QueueDescriptor, boolean, long, long, int, RestoreListener)} method.
+ * A callback used with the {@link BrokerDatabase#restoreMessages(org.apache.activemq.broker.store.QueueDescriptor , boolean, long, long, int, RestoreListener)} method.
  */
 public interface RestoreListener<V> {
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java Wed Jul  7 03:24:02 2010
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.queue;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
+
 public interface SaveableQueueElement<V> {
 
     /**

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java Wed Jul  7 03:24:02 2010
@@ -17,6 +17,8 @@
 package org.apache.activemq.queue;
 
 import java.util.ArrayList;
+
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.flow.PrioritySizeLimiter;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java Wed Jul  7 03:24:02 2010
@@ -18,6 +18,7 @@ package org.apache.activemq.queue;
 
 import java.util.HashMap;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java Wed Jul  7 03:24:02 2010
@@ -23,6 +23,7 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowResource;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java Wed Jul  7 03:24:02 2010
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.queue;
 
-import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.flow.ISourceController;
 
 public interface Subscription<E> {

Added: activemq/sandbox/activemq-apollo-actor/activemq-scala/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-scala/pom.xml?rev=961062&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/pom.xml (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-scala/pom.xml Wed Jul  7 03:24:02 2010
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version
+  2.0 (the "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+  applicable law or agreed to in writing, software distributed under
+  the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+  OR CONDITIONS OF ANY KIND, either express or implied. See the
+  License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.activemq</groupId>
+    <artifactId>activemq-parent</artifactId>
+    <version>6.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.activemq</groupId>
+  <artifactId>activemq-scala</artifactId>
+  <packaging>jar</packaging>
+  <version>6.0-SNAPSHOT</version>
+
+  <name>ActiveMQ :: Scala</name>
+  
+  <dependencies>
+    
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-util</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.fusesource.hawtdispatch</groupId>
+      <artifactId>hawtdispatch-scala</artifactId>
+      <version>${hawtdispatch-version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <scope>compile</scope>
+      <version>${scala-version}</version>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-compiler</artifactId>
+      <version>${scala-version}</version>
+      <scope>compile</scope>
+      <optional>true</optional>
+    </dependency>
+
+    <!-- Testing -->
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest</artifactId>
+      <version>${scalatest-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>${log4j-version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <defaultGoal>install</defaultGoal>
+    <sourceDirectory>src/main/scala</sourceDirectory>
+    <testSourceDirectory>src/test/scala</testSourceDirectory>
+
+    <plugins>
+      
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <version>2.13.1</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+              <goal>testCompile</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <jvmArgs>
+            <jvmArg>-Xmx1024m</jvmArg>
+          </jvmArgs>
+          <args>
+            <arg>-deprecation</arg>
+            <arg>-Xno-varargs-conversion</arg>
+          </args>
+          <scalaVersion>${scala-version}</scalaVersion>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.4.3</version>
+        <configuration>
+          <!-- we must turn off the use of system class loader so our tests can find stuff - otherwise ScalaSupport compiler can't find stuff -->
+          <useSystemClassLoader>false</useSystemClassLoader>
+          <!--forkMode>pertest</forkMode-->
+          <childDelegation>false</childDelegation>
+          <useFile>true</useFile>
+          <failIfNoTests>false</failIfNoTests>
+        </configuration>
+      </plugin>
+
+    </plugins>
+  </build>
+
+  
+</project>

Propchange: activemq/sandbox/activemq-apollo-actor/activemq-scala/pom.xml
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/DeliveryBuffer.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/DeliveryBuffer.scala?rev=961062&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/DeliveryBuffer.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/DeliveryBuffer.scala Wed Jul  7 03:24:02 2010
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.ng
+
+import _root_.java.util.LinkedList
+import _root_.org.fusesource.hawtdispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class DeliveryBuffer(var maxSize:Int=1024*32) {
+
+  var deliveries = new LinkedList[Delivery]()
+  private var size = 0
+  var eventHandler: Runnable = null
+  
+  def full = size >= maxSize
+
+  def drain = eventHandler.run
+
+  def receive = deliveries.poll
+
+  def isEmpty = deliveries.isEmpty
+
+  def send(delivery:Delivery):Unit = {
+    delivery.retain
+    size += delivery.size
+    deliveries.addLast(delivery)
+    if( deliveries.size == 1 ) {
+      drain
+    }
+  }
+
+  def ack(delivery:Delivery) = {
+    // When a message is delivered to the consumer, we release
+    // used capacity in the outbound queue, and can drain the inbound
+    // queue
+    val wasBlocking = full
+    size -= delivery.size
+    delivery.release
+    if( !isEmpty ) {
+      drain
+    }
+  }
+
+}
+
+class DeliveryOverflowBuffer(val delivery_buffer:DeliveryBuffer) {
+
+  private var overflow = new LinkedList[Delivery]()
+
+  protected def drainOverflow:Unit = {
+    while( !overflow.isEmpty && !full ) {
+      val delivery = overflow.removeFirst
+      delivery.release
+      send_to_delivery_queue(delivery)
+    }
+  }
+
+  def send(delivery:Delivery) = {
+    if( full ) {
+      // Deliveries in the overflow queue is remain acquired by us so that
+      // producer that sent it to us gets flow controlled.
+      delivery.retain
+      overflow.addLast(delivery)
+    } else {
+      send_to_delivery_queue(delivery)
+    }
+  }
+
+  protected def send_to_delivery_queue(value:Delivery) = {
+    var delivery = Delivery(value)
+    delivery.setDisposer(^{
+      drainOverflow
+    })
+    delivery_buffer.send(delivery)
+    delivery.release
+  }
+
+  def full = delivery_buffer.full
+
+}
+
+class DeliveryCreditBufferProtocol(val delivery_buffer:DeliveryBuffer, val queue:DispatchQueue) extends BaseRetained {
+
+  var sessions = List[CreditServer]()
+
+  var session_min_credits = 1024*4;
+  var session_credit_capacity = 1024*32
+  var session_max_credits = session_credit_capacity;
+
+  queue.retain
+  setDisposer(^{
+    source.release
+    queue.release
+  })
+
+  // use a event aggregating source to coalesce multiple events from the same thread.
+  val source = createSource(new ListEventAggregator[Delivery](), queue)
+  source.setEventHandler(^{drain_source});
+  source.resume
+
+  def drain_source = {
+    val deliveries = source.getData
+    deliveries.foreach { delivery=>
+      delivery_buffer.send(delivery)
+      delivery.release
+    }
+  }
+
+
+  class CreditServer(val producer_queue:DispatchQueue) {
+    private var _capacity = 0
+
+    def capacity(value:Int) = {
+      val change = value - _capacity;
+      _capacity = value;
+      client.credit(change)
+    }
+
+    def drain(callback:Runnable) = {
+      client.drain(callback)
+    }
+
+    val client = new CreditClient()
+
+    class CreditClient() extends DeliveryOverflowBuffer(delivery_buffer) {
+
+      producer_queue.retain
+      val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
+      credit_adder.setEventHandler(^{
+        internal_credit(credit_adder.getData.intValue)
+      });
+      credit_adder.resume
+
+      private var credits = 0;
+
+      ///////////////////////////////////////////////////
+      // These methods get called from the client/producer thread...
+      ///////////////////////////////////////////////////
+      def close = {
+        credit_adder.release
+        producer_queue.release
+      }
+
+      override def full = credits <= 0
+
+      override protected def send_to_delivery_queue(value:Delivery) = {
+        var delivery = Delivery(value)
+        delivery.setDisposer(^{
+          // This is called from the server/consumer thread
+          credit_adder.merge(delivery.size);
+        })
+        internal_credit(-delivery.size)
+        source.merge(delivery)
+      }
+
+      def internal_credit(value:Int) = {
+        credits += value;
+        if( credits <= 0 ) {
+          credits = 0
+        } else {
+          drainOverflow
+        }
+      }
+
+      ///////////////////////////////////////////////////
+      // These methods get called from the server/consumer thread...
+      ///////////////////////////////////////////////////
+      def credit(value:Int) = ^{ internal_credit(value) } ->: producer_queue
+
+      def drain(callback:Runnable) = {
+        credits = 0
+        if( callback!=null ) {
+          queue << callback
+        }
+      }
+    }
+  }
+
+  def session(queue:DispatchQueue) = {
+    val session = new CreditServer(queue)
+    sessions = session :: sessions
+    session.capacity(session_max_credits)
+    session.client
+  }
+
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/Router.scala?rev=961062&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/Router.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/Router.scala Wed Jul  7 03:24:02 2010
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.ng
+
+import _root_.java.util.concurrent.atomic.AtomicLong
+import _root_.org.apache.activemq.util.buffer._
+import _root_.org.fusesource.hawtdispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+
+import java.util.HashMap
+import collection.JavaConversions
+
+object Router {
+  val TOPIC_PREFIX = new AsciiBuffer("/topic/")
+  val QUEUE_PREFIX = new AsciiBuffer("/topic/")
+}
+/**
+ * Provides a non-blocking concurrent producer to consumer
+ * routing implementation.
+ *
+ * Producers create a route object for each destination
+ * they will be producing to.  Once the route is
+ * connected to the router, the producer can use
+ * the route.targets list without synchronization to
+ * get the current set of consumers that are bound
+ * to the destination. 
+ *
+ */
+class Router(var queue:DispatchQueue) {
+  import Router._
+  
+  trait DestinationNode {
+    var targets = List[Consumer]()
+    var routes = List[ProducerRoute]()
+
+    def on_bind(x:List[Consumer]):Unit
+    def on_unbind(x:List[Consumer]):Boolean
+    def on_connect(route:ProducerRoute):Unit
+    def on_disconnect(route:ProducerRoute):Boolean = {
+      routes = routes.filterNot({r=> route==r})
+      route.disconnected()
+      routes == Nil && targets == Nil
+    }
+  }
+
+  class TopicDestinationNode extends DestinationNode {
+    def on_bind(x:List[Consumer]) =  {
+      targets = x ::: targets
+      routes.foreach({r=>
+        r.bind(x)
+      })
+    }
+
+    def on_unbind(x:List[Consumer]):Boolean = {
+      targets = targets.filterNot({t=>x.contains(t)})
+      routes.foreach({r=>
+        r.unbind(x)
+      })
+      routes == Nil && targets == Nil
+    }
+
+    def on_connect(route:ProducerRoute) = {
+      routes = route :: routes
+      route.connected(targets)
+    }
+  }
+
+  class QueueDestinationNode(destination:AsciiBuffer) extends DestinationNode {
+    val queue = new StompQueue(destination)
+
+    def on_bind(x:List[Consumer]) =  {
+      targets = x ::: targets
+      queue.bind(x)
+    }
+
+    def on_unbind(x:List[Consumer]):Boolean = {
+      targets = targets.filterNot({t=>x.contains(t)})
+      queue.unbind(x)
+      routes == Nil && targets == Nil
+    }
+
+    def on_connect(route:ProducerRoute) = {
+      routes = route :: routes
+      route.connected(queue :: Nil)
+    }
+  }
+
+  var destinations = new HashMap[AsciiBuffer, DestinationNode]()
+
+  private def get(destination:AsciiBuffer):DestinationNode = {
+    var result = destinations.get(destination)
+    if( result ==null ) {
+      if( isTopic(destination) ) {
+        result = new TopicDestinationNode
+      } else {
+        result = new QueueDestinationNode(destination)
+      }
+      destinations.put(destination, result)
+    }
+    result
+  }
+
+  def bind(destination:AsciiBuffer, targets:List[Consumer]) = retaining(targets) {
+      get(destination).on_bind(targets)
+    } ->: queue
+
+  def unbind(destination:AsciiBuffer, targets:List[Consumer]) = releasing(targets) {
+      if( get(destination).on_unbind(targets) ) {
+        destinations.remove(destination)
+      }
+    } ->: queue
+
+  def connect(destination:AsciiBuffer, routeQueue:DispatchQueue, producer:Producer)(completed: (ProducerRoute)=>Unit) = {
+    val route = new ProducerRoute(destination, routeQueue, producer) {
+      override def on_connected = {
+        completed(this);
+      }
+    }
+    ^ {
+      get(destination).on_connect(route)
+    } ->: queue
+  }
+
+  def isTopic(destination:AsciiBuffer) = destination.startsWith(TOPIC_PREFIX)
+  def isQueue(destination:AsciiBuffer) = !isTopic(destination)
+
+  def disconnect(route:ProducerRoute) = releasing(route) {
+      get(route.destination).on_disconnect(route)
+    } ->: queue
+
+
+   def each(proc:(AsciiBuffer, DestinationNode)=>Unit) = {
+     import JavaConversions._;
+     for( (destination, node) <- destinations ) {
+        proc(destination, node)
+     }
+   }
+
+}
+
+trait Route extends Retained {
+
+  val destination:AsciiBuffer
+  val queue:DispatchQueue
+  val metric = new AtomicLong();
+
+  def connected(targets:List[Consumer]):Unit
+  def bind(targets:List[Consumer]):Unit
+  def unbind(targets:List[Consumer]):Unit
+  def disconnected():Unit
+
+}
+
+class ProducerRoute(val destination:AsciiBuffer, val queue:DispatchQueue, val producer:Producer) extends BaseRetained with Route {
+
+
+  // Retain the queue while we are retained.
+  queue.retain
+  setDisposer(^{
+    queue.release
+  })
+
+  var targets = List[ConsumerSession]()
+
+  def connected(targets:List[Consumer]) = retaining(targets) {
+    internal_bind(targets)
+    on_connected
+  } ->: queue
+
+  def bind(targets:List[Consumer]) = retaining(targets) {
+    internal_bind(targets)
+  } ->: queue
+
+  private def internal_bind(values:List[Consumer]) = {
+    values.foreach{ x=>
+      targets = x.open_session(queue) :: targets
+    }
+  }
+
+  def unbind(targets:List[Consumer]) = releasing(targets) {
+    this.targets = this.targets.filterNot { x=>
+      val rc = targets.contains(x.consumer)
+      if( rc ) {
+        x.close
+      }
+      rc
+    }
+  } ->: queue
+
+  def disconnected() = ^ {
+    this.targets.foreach { x=>
+      x.close
+      x.consumer.release
+    }    
+  } ->: queue
+
+  protected def on_connected = {}
+  protected def on_disconnected = {}
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/Stomp.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/Stomp.java?rev=961062&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/Stomp.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/Stomp.java Wed Jul  7 03:24:02 2010
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.ng;
+import org.apache.activemq.util.buffer.*;
+
+
+public interface Stomp {
+    
+    Buffer EMPTY_BUFFER = new Buffer(0);
+    byte NULL = 0;
+    Buffer NULL_BUFFER = new Buffer(new byte[]{NULL});
+    byte NEWLINE = '\n';
+    Buffer NEWLINE_BUFFER = new Buffer(new byte[]{NEWLINE});
+    Buffer END_OF_FRAME_BUFFER = new Buffer(new byte[]{NULL, NEWLINE});
+    
+    AsciiBuffer TRUE = new AsciiBuffer("true");
+    AsciiBuffer FALSE = new AsciiBuffer("false");
+
+    public static interface Commands {
+        AsciiBuffer CONNECT = new AsciiBuffer("CONNECT");
+        AsciiBuffer SEND = new AsciiBuffer("SEND");
+        AsciiBuffer DISCONNECT = new AsciiBuffer("DISCONNECT");
+        AsciiBuffer SUBSCRIBE = new AsciiBuffer("SUBSCRIBE");
+        AsciiBuffer UNSUBSCRIBE = new AsciiBuffer("UNSUBSCRIBE");
+
+        AsciiBuffer BEGIN_TRANSACTION = new AsciiBuffer("BEGIN");
+        AsciiBuffer COMMIT_TRANSACTION = new AsciiBuffer("COMMIT");
+        AsciiBuffer ABORT_TRANSACTION = new AsciiBuffer("ABORT");
+        AsciiBuffer BEGIN = new AsciiBuffer("BEGIN");
+        AsciiBuffer COMMIT = new AsciiBuffer("COMMIT");
+        AsciiBuffer ABORT = new AsciiBuffer("ABORT");
+        AsciiBuffer ACK = new AsciiBuffer("ACK");
+    }
+
+    public interface Responses {
+        AsciiBuffer CONNECTED = new AsciiBuffer("CONNECTED");
+        AsciiBuffer ERROR = new AsciiBuffer("ERROR");
+        AsciiBuffer MESSAGE = new AsciiBuffer("MESSAGE");
+        AsciiBuffer RECEIPT = new AsciiBuffer("RECEIPT");
+    }
+
+    public interface Headers {
+        byte SEPERATOR = ':';
+        Buffer SEPERATOR_BUFFER = new Buffer(new byte[]{SEPERATOR});
+        
+        AsciiBuffer RECEIPT_REQUESTED = new AsciiBuffer("receipt");
+        AsciiBuffer TRANSACTION = new AsciiBuffer("transaction");
+        AsciiBuffer CONTENT_LENGTH = new AsciiBuffer("content-length");
+        AsciiBuffer TRANSFORMATION = new AsciiBuffer("transformation");
+        AsciiBuffer TRANSFORMATION_ERROR = new AsciiBuffer("transformation-error");
+
+        public interface Response {
+            AsciiBuffer RECEIPT_ID = new AsciiBuffer("receipt-id");
+        }
+
+        public interface Send {
+            AsciiBuffer DESTINATION = new AsciiBuffer("destination");
+            AsciiBuffer CORRELATION_ID = new AsciiBuffer("correlation-id");
+            AsciiBuffer REPLY_TO = new AsciiBuffer("reply-to");
+            AsciiBuffer EXPIRATION_TIME = new AsciiBuffer("expires");
+            AsciiBuffer PRIORITY = new AsciiBuffer("priority");
+            AsciiBuffer TYPE = new AsciiBuffer("type");
+            AsciiBuffer PERSISTENT = new AsciiBuffer("persistent");
+        }
+
+        public interface Message {
+            AsciiBuffer MESSAGE_ID = new AsciiBuffer("message-id");
+            AsciiBuffer DESTINATION = new AsciiBuffer("destination");
+            AsciiBuffer CORRELATION_ID = new AsciiBuffer("correlation-id");
+            AsciiBuffer EXPIRATION_TIME = new AsciiBuffer("expires");
+            AsciiBuffer REPLY_TO = new AsciiBuffer("reply-to");
+            AsciiBuffer PRORITY = new AsciiBuffer("priority");
+            AsciiBuffer REDELIVERED = new AsciiBuffer("redelivered");
+            AsciiBuffer TIMESTAMP = new AsciiBuffer("timestamp");
+            AsciiBuffer TYPE = new AsciiBuffer("type");
+            AsciiBuffer SUBSCRIPTION = new AsciiBuffer("subscription");
+        }
+
+        public interface Subscribe {
+            AsciiBuffer DESTINATION = new AsciiBuffer("destination");
+            AsciiBuffer ACK_MODE = new AsciiBuffer("ack");
+            AsciiBuffer ID = new AsciiBuffer("id");
+            AsciiBuffer SELECTOR = new AsciiBuffer("selector");
+
+            public interface AckModeValues {
+                AsciiBuffer AUTO = new AsciiBuffer("auto");
+                AsciiBuffer CLIENT = new AsciiBuffer("client");
+                AsciiBuffer INDIVIDUAL = new AsciiBuffer("client-individual");
+            }
+        }
+
+        public interface Unsubscribe {
+            AsciiBuffer DESTINATION = new AsciiBuffer("destination");
+            AsciiBuffer ID = new AsciiBuffer("id");
+        }
+
+        public interface Connect {
+            AsciiBuffer LOGIN = new AsciiBuffer("login");
+            AsciiBuffer PASSCODE = new AsciiBuffer("passcode");
+            AsciiBuffer CLIENT_ID = new AsciiBuffer("client-id");
+            AsciiBuffer REQUEST_ID = new AsciiBuffer("request-id");
+        }
+
+        public interface Error {
+            AsciiBuffer MESSAGE = new AsciiBuffer("message");
+        }
+
+        public interface Connected {
+            AsciiBuffer SESSION = new AsciiBuffer("session");
+            AsciiBuffer RESPONSE_ID = new AsciiBuffer("response-id");
+        }
+
+        public interface Ack {
+            AsciiBuffer MESSAGE_ID = new AsciiBuffer("message-id");
+        }
+    }
+    
+	public enum Transformations {
+		JMS_BYTE, JMS_OBJECT_XML, JMS_OBJECT_JSON, JMS_MAP_XML, JMS_MAP_JSON;
+		
+		public String toString() {
+			return name().replaceAll("_", "-").toLowerCase();
+		}
+		
+		public static Transformations getValue(String value) {
+			return valueOf(value.replaceAll("-", "_").toUpperCase());
+		}
+	}    
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompBroker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompBroker.scala?rev=961062&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompBroker.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompBroker.scala Wed Jul  7 03:24:02 2010
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.ng
+
+import _root_.java.util.concurrent.atomic.AtomicInteger
+import _root_.java.util.concurrent.TimeUnit
+import _root_.java.util.LinkedList
+import _root_.org.apache.activemq.util.buffer._
+import _root_.org.fusesource.hawtdispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import java.nio.channels.SelectionKey._
+
+import java.net.{InetAddress, InetSocketAddress}
+
+import java.nio.channels.{ServerSocketChannel}
+
+object Delivery {
+  type HeaderMap = LinkedList[(AsciiBuffer, AsciiBuffer)]
+  def apply(frame:StompFrame) = new Delivery(frame.headers, frame.content, frame.size)
+  def apply(d:Delivery) = new Delivery(d.headers, d.content, d.size)
+}
+  
+case class Delivery(headers:Delivery.HeaderMap, content:Buffer, size:Int) extends BaseRetained {
+}
+
+trait Producer {
+  def collocate(queue:DispatchQueue):Unit
+}
+
+trait Consumer extends Retained {
+  val queue:DispatchQueue;
+  def open_session(producer_queue:DispatchQueue):ConsumerSession
+
+}
+
+trait ConsumerSession {
+  val consumer:Consumer
+  def deliver(delivery:Delivery)
+  def close:Unit
+}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object StompBroker {
+
+
+  def main(args:Array[String]) = {
+    println("Starting stomp broker...")
+    val broker = new StompBroker();
+    println("Startup complete.")
+    System.in.read
+    println("Shutting down...")
+    broker.close
+    println("Shutdown complete.")
+  }
+}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class StompBroker {
+  import StompBroker._
+
+  val router = new Router(createQueue("router"))
+  val queue = createQueue("broker")
+
+    // Create the nio server socket...
+  val channel = ServerSocketChannel.open();
+  channel.configureBlocking(false);
+  channel.socket().bind(address("0.0.0.0", 61613), 500);
+
+  // Create a source attached to the server socket to deal with new connections...
+  val accept_source = createSource(channel, OP_ACCEPT, queue);
+  accept_source.setEventHandler(^{
+
+    // Accept a new socket connection
+    var socket = channel.accept();
+    try {
+      socket.configureBlocking(false);
+      socket.socket.setSoLinger(true,0);
+      var connection = new StompConnection(socket, router)
+    } catch {
+      case e:Exception=>
+        socket.close
+    }
+
+  });
+  accept_source.setCancelHandler(^{
+    channel.close();
+  });
+
+  // Start listening for accept events..
+  accept_source.resume();
+
+  def close = {
+    accept_source.cancel
+    accept_source.release
+    queue.release
+  }
+
+  private  def address(host: String, port: Int): InetSocketAddress = {
+    return new InetSocketAddress(ip(host), port)
+  }
+
+  private def ip(host: String): InetAddress = {
+    return InetAddress.getByName(host)
+  }
+
+  // Try to periodically re-balance connections so that consumers/producers
+  // are on the same thread.
+  val reblance = ^{
+    router.each { (destination,node)=>
+      // for now just collocate the producer to the consumer's thread..
+      if( !node.targets.isEmpty ) {
+        val target =  node.targets.head.queue
+        if( node.isInstanceOf[Router#QueueDestinationNode] ) {
+          node.asInstanceOf[Router#QueueDestinationNode].queue.collocate(target)
+        }
+        for( route <- node.routes ) {
+          route.producer.collocate( target )
+        }
+      }
+    }
+    schedualRebalance
+  }
+  
+  def schedualRebalance:Unit = router.queue.dispatchAfter(1000, TimeUnit.MILLISECONDS, reblance)
+  schedualRebalance
+}
+
+

Added: activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompConnection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompConnection.scala?rev=961062&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompConnection.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompConnection.scala Wed Jul  7 03:24:02 2010
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.ng
+
+import _root_.java.util.{LinkedList}
+import _root_.org.apache.activemq.ng.Stomp
+import java.nio.channels.SelectionKey._
+import _root_.org.apache.activemq.util.buffer._
+import _root_.org.fusesource.hawtdispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+
+import AsciiBuffer._
+import java.util.concurrent.atomic.AtomicLong
+import java.nio.channels.{SocketChannel}
+import java.io.{IOException}
+import Stomp.{Headers, Responses, Commands}
+import collection.mutable.{HashMap}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object StompConnection {
+  val connectionCounter = new AtomicLong();
+  var bufferSize = 1024*64
+  var maxOutboundSize = 100;
+}
+class StompConnection(val socket:SocketChannel, var router:Router) {
+
+  import Delivery._
+  import StompConnection._
+
+  socket.socket.setSendBufferSize(bufferSize)
+  socket.socket.setReceiveBufferSize(bufferSize)
+
+  val queue = createQueue("connection:"+connectionCounter.incrementAndGet)
+  queue.setTargetQueue(getRandomThreadQueue)
+
+//    println("connected from: "+socket.socket.getRemoteSocketAddress)
+
+  val wireFormat = new StompWireFormat()
+
+
+  val outboundChannel  = new DeliveryBuffer
+  var outbound = new LinkedList[StompFrame]()
+
+  var closed = false;
+  var consumer:SimpleConsumer = null
+
+  val write_source = createSource(socket, OP_WRITE, queue);
+  val read_source = createSource(socket, OP_READ, queue);
+
+  queue.setDisposer(^{
+    socket.close();
+  })
+
+  read_source.setEventHandler(^{
+    try {
+      wireFormat.drain_socket(socket) {
+        frame:StompFrame=>
+          on_frame(frame)
+          read_source.isSuspended
+      }
+    } catch {
+      case e:IOException =>
+        // The peer disconnected..
+        close
+    }
+  });
+
+  read_source.resume();
+
+  def drain_outbound_data = wireFormat.drain_source(socket) { poll_outbound }
+
+  def poll_outbound = {
+    var rc = outbound.poll
+    if( rc==null ) {
+      val delivery = outboundChannel.receive
+      if( delivery!=null ) {
+        rc = StompFrame(Responses.MESSAGE, delivery.headers, delivery.content)
+        outboundChannel.ack(delivery)
+      }
+    }
+    rc
+  }
+
+  write_source.setEventHandler(^{
+    try {
+      if( drain_outbound_data ) {
+        write_source.suspend
+      }
+    } catch {
+      case e:IOException=>
+        close // The peer must have closed on us..
+    }
+  });
+
+
+  def send(frame:StompFrame) = {
+    outbound.add(frame)
+    if( outbound.size == 1 && outboundChannel.isEmpty ) {
+      write_source.resume
+    }
+  }
+
+  outboundChannel.eventHandler = ^{
+    if( outbound.isEmpty && outboundChannel.deliveries.size==1 ) {
+      write_source.resume
+    }
+  }
+
+  def close = {
+    if( !closed ) {
+      closed=true;
+      if( producerRoute!=null ) {
+        router.disconnect(producerRoute)
+        producerRoute=null
+      }
+      if( consumer!=null ) {
+        router.unbind(consumer.dest, consumer::Nil)
+        consumer=null
+      }
+      write_source.cancel
+      write_source.release
+      read_source.cancel
+      read_source.release
+      queue.release
+    }
+  }
+
+  def on_frame(frame:StompFrame) = {
+    frame match {
+      case StompFrame(Commands.CONNECT, headers, _) =>
+        on_stomp_connect(headers)
+      case StompFrame(Commands.SEND, headers, content) =>
+        on_stomp_send(Delivery(frame))
+      case StompFrame(Commands.SUBSCRIBE, headers, content) =>
+        on_stomp_subscribe(headers)
+      case StompFrame(Commands.ACK, headers, content) =>
+        // TODO:
+      case StompFrame(Commands.DISCONNECT, headers, content) =>
+        close
+      case StompFrame(unknown, _, _) =>
+        die("Unsupported STOMP command: "+unknown);
+    }
+  }
+
+  def on_stomp_connect(headers:HeaderMap) = {
+    println("connected on: "+Thread.currentThread.getName);
+    send(StompFrame(Responses.CONNECTED))
+  }
+
+  var producerRoute:ProducerRoute=null
+
+
+  def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = {
+    val i = headers.iterator
+    while( i.hasNext ) {
+      val entry = i.next
+      if( entry._1 == name ) {
+        return Some(entry._2)
+      }
+    }
+    None
+  }
+
+  def on_stomp_send(delivery:Delivery) = {
+    get(delivery.headers, Headers.Send.DESTINATION) match {
+      case Some(dest)=>
+        // create the producer route...
+        if( producerRoute==null || producerRoute.destination!= dest ) {
+
+          // clean up the previous producer..
+          if( producerRoute!=null ) {
+            router.disconnect(producerRoute)
+            producerRoute=null
+          }
+
+          val producer = new Producer() {
+            override def collocate(value:DispatchQueue):Unit = ^{
+              if( value.getTargetQueue ne queue.getTargetQueue ) {
+                println("sender on "+queue.getLabel+" co-locating with: "+value.getLabel);
+                queue.setTargetQueue(value.getTargetQueue)
+                write_source.setTargetQueue(queue);
+                read_source.setTargetQueue(queue)
+              }
+
+            } ->: queue
+          }
+
+          // don't process frames until we are connected..
+          read_source.suspend
+          router.connect(dest, queue, producer) {
+            (route) =>
+              read_source.resume
+              producerRoute = route
+              send_via_route(producerRoute, delivery)
+          }
+        } else {
+          // we can re-use the existing producer route
+          send_via_route(producerRoute, delivery)
+        }
+      case None=>
+        die("destination not set.")
+    }
+  }
+
+  def send_via_route(route:ProducerRoute, delivery:Delivery) = {
+    if( !route.targets.isEmpty ) {
+      read_source.suspend
+      delivery.setDisposer(^{
+        read_source.resume
+      })
+      route.targets.foreach(consumer=>{
+        consumer.deliver(delivery)
+      })
+      delivery.release;
+    }
+  }
+
+  def on_stomp_subscribe(headers:HeaderMap) = {
+    println("Consumer on "+Thread.currentThread.getName)
+    get(headers, Headers.Subscribe.DESTINATION) match {
+      case Some(dest)=>
+        if( consumer !=null ) {
+          die("Only one subscription supported.")
+
+        } else {
+          consumer = new SimpleConsumer(dest);
+          router.bind(dest, consumer :: Nil)
+          consumer.release
+        }
+      case None=>
+        die("destination not set.")
+    }
+
+  }
+
+  private def die(msg:String) = {
+    println("Shutting connection down due to: "+msg)
+    read_source.suspend
+    send(StompFrame(Responses.ERROR, new LinkedList(), ascii(msg)))
+    ^ {
+      close
+    } ->: queue
+  }
+
+  class SimpleConsumer(val dest:AsciiBuffer) extends BaseRetained with Consumer {
+
+    val queue:DispatchQueue = StompConnection.this.queue
+    setDisposer(^{
+      queue.release
+    })
+    val deliveryQueue = new DeliveryCreditBufferProtocol(outboundChannel, queue)
+
+    def open_session(producer_queue:DispatchQueue) = new ConsumerSession {
+      val session = deliveryQueue.session(producer_queue)
+
+      val consumer = SimpleConsumer.this
+      retain
+
+      def deliver(delivery:Delivery) = session.send(delivery)
+
+      def close = {
+        session.close
+        release
+      }
+    }
+    
+  }
+    
+//  class SimpleConsumer(val dest:AsciiBuffer) extends Consumer with BaseRetained {
+//
+//    val queue:DispatchQueue = StompConnection.this.queue
+//    setDisposer(^{
+//      queue.release
+//    })
+//
+//    def open_session = new ConsumerSession {
+//      val consumer = SimpleConsumer.this
+//      val deliveryQueue = new DeliveryOverflowBuffer(outboundChannel)
+//      retain
+//
+//      def deliver(delivery:Delivery) = using(delivery) {
+//        deliveryQueue.send(delivery)
+//      } ->: queue
+//
+//      def close = {
+//        release
+//      }
+//    }
+//
+//  }
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompFrame.scala?rev=961062&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompFrame.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompFrame.scala Wed Jul  7 03:24:02 2010
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.ng
+
+import _root_.java.util.LinkedList
+import _root_.org.apache.activemq.util.buffer._
+import collection.mutable.Map
+import collection.mutable.HashMap
+
+object StompFrame{
+  var NO_DATA = new Buffer(0);
+}
+
+/**
+ * Represents all the data in a STOMP frame.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+case class StompFrame(action:AsciiBuffer, headers:LinkedList[(AsciiBuffer, AsciiBuffer)]=new LinkedList(), content:Buffer=StompFrame.NO_DATA) {
+  def headerSize = {
+    if( headers.isEmpty ) {
+      0
+    } else {
+      // if all the headers were part of the same input buffer.. size can be calculated by
+      // subtracting positions in the buffer.
+      val firstBuffer = headers.getFirst._1
+      val lastBuffer =  headers.getLast._2
+      if( firstBuffer.data eq lastBuffer.data ) {
+        (lastBuffer.offset-firstBuffer.offset)+lastBuffer.length+1
+      } else {
+        // gota do it the hard way
+        var rc = 0;
+        val i = headers.iterator
+        while( i.hasNext ) {
+          val (key, value) = i.next
+          rc += key.length + value.length +2
+        }
+        rc
+      }
+    }
+  }
+
+  def size = {
+     if( action.data eq content.data ) {
+        (content.offset-action.offset)+content.length
+     } else {
+       action.length + 1 +
+       headerSize + 1 + content.length
+     }
+  }
+
+//    public StompFrame(AsciiBuffer command) {
+//    	this(command, null, null);
+//    }
+//
+//    public StompFrame(AsciiBuffer command, Map<AsciiBuffer, AsciiBuffer> headers) {
+//    	this(command, headers, null);
+//    }
+//
+//    public StompFrame(AsciiBuffer command, Map<AsciiBuffer, AsciiBuffer> headers, Buffer data) {
+//        this.action = command;
+//        if (headers != null)
+//        	this.headers = headers;
+//        if (data != null)
+//        	this.content = data;
+//    }
+//
+//    public StompFrame() {
+//    }
+
+
+//    public String toString() {
+//        StringBuffer buffer = new StringBuffer();
+//        buffer.append(getAction());
+//        buffer.append("\n");
+//
+//        for (Entry<AsciiBuffer, AsciiBuffer> entry : headers.entrySet()) {
+//            buffer.append(entry.getKey());
+//            buffer.append(":");
+//            buffer.append(entry.getValue());
+//            buffer.append("\n");
+//        }
+//
+//        buffer.append("\n");
+//        if (getContent() != null) {
+//            try {
+//                buffer.append(getContent());
+//            } catch (Throwable e) {
+//            }
+//        }
+//        return buffer.toString();
+//    }
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompLoadClient.scala?rev=961062&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompLoadClient.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompLoadClient.scala Wed Jul  7 03:24:02 2010
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.fusesource.hawtdispatch.example.stomp
+
+import _root_.java.io._
+import _root_.java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import _root_.org.apache.activemq.util.buffer.AsciiBuffer
+import java.net.{ProtocolException, InetSocketAddress, URI, Socket}
+
+import java.lang.String._
+import java.util.concurrent.TimeUnit._
+import collection.mutable.Map
+
+/**
+ *
+ * Simulates load on the a stomp broker.
+ *
+ */
+object StompLoadClient {
+
+  val NANOS_PER_SECOND = NANOSECONDS.convert(1, SECONDS);
+  import StompLoadClient._
+  implicit def toAsciiBuffer(value: String) = new AsciiBuffer(value)
+
+  var producerSleep = 0;
+  var consumerSleep = 0;
+  var producers = 1;
+  var consumers = 1;
+  var sampleInterval = 5 * 1000;
+  var uri = "stomp://127.0.0.1:61613";
+  var bufferSize = 64*1204
+  var messageSize = 1024;
+  var useContentLength=true
+
+  var destinationType = "queue";
+  var destinationCount = 1;
+
+  val producerCounter = new AtomicLong();
+  val consumerCounter = new AtomicLong();
+  val done = new AtomicBoolean()
+
+  def main(args:Array[String]) = run
+
+  def run() = {
+
+    println("=======================")
+    println("Press ENTER to shutdown");
+    println("=======================")
+    println("")
+
+
+    done.set(false)
+    var producerThreads = List[ProducerThread]()
+    for (i <- 0 until producers) {
+      val producerThread = new ProducerThread(i);
+      producerThreads = producerThread :: producerThreads
+      producerThread.start();
+    }
+
+    var consumerThreads = List[ConsumerThread]()
+    for (i <- 0 until consumers) {
+      val consumerThread = new ConsumerThread(i);
+      consumerThreads = consumerThread :: consumerThreads
+      consumerThread.start();
+    }
+
+    // start a sampling thread...
+    val sampleThread = new Thread() {
+      override def run() = {
+        try {
+          var start = System.nanoTime();
+          while( !done.get ) {
+            Thread.sleep(sampleInterval)
+            val end = System.nanoTime();
+            printRate("Producer", producerCounter, end - start);
+            printRate("Consumer", consumerCounter, end - start);
+            start = end;
+          }
+        } catch {
+          case e:InterruptedException =>
+        }
+      }
+    }
+    sampleThread.start()
+
+
+    System.in.read()
+    println("=======================")
+    done.set(true)
+
+    // wait for the threads to finish..
+    for( thread <- consumerThreads ) {
+      thread.client.close
+      thread.interrupt
+      thread.join
+    }
+    for( thread <- producerThreads ) {
+      thread.client.close
+      thread.interrupt
+      thread.join
+    }
+    sampleThread.interrupt
+    sampleThread.join
+
+    println("Shutdown");
+    println("=======================")
+
+  }
+
+  override def toString() = {
+    "--------------------------------------\n"+
+    "StompLoadClient Properties\n"+
+    "--------------------------------------\n"+
+    "uri              = "+uri+"\n"+
+    "producers        = "+producers+"\n"+
+    "consumers        = "+consumers+"\n"+
+    "destinationType  = "+destinationType+"\n"+
+    "destinationCount = "+destinationCount+"\n" +
+    "messageSize      = "+messageSize+"\n"+
+    "producerSleep    = "+producerSleep+"\n"+
+    "consumerSleep    = "+consumerSleep+"\n"+
+    "bufferSize       = "+bufferSize+"\n"+
+    "useContentLength = "+useContentLength+"\n"+
+    "sampleInterval   = "+sampleInterval+"\n"
+  }
+
+  def printRate(name: String, counter: AtomicLong, nanos: Long) = {
+    val c = counter.getAndSet(0);
+    val rate_per_second: java.lang.Float = ((1.0f * c / nanos) * NANOS_PER_SECOND);
+    println(format("%s rate: %,.3f per second", name, rate_per_second));
+  }
+
+  def destination(i:Int) = "/"+destinationType+"/load-"+(i%destinationCount)
+
+
+  object StompClient {
+    def connect(proc: StompClient=>Unit ) = {
+      val client = new StompClient();
+      try {
+        val connectUri = new URI(uri);
+        client.open(connectUri.getHost(), connectUri.getPort());
+        client.send("""CONNECT
+
+""")
+        client.flush
+        client.receive("CONNECTED")
+        proc(client)
+      } catch {
+        case e: Throwable =>
+          if(!done.get) {
+            println("failure occured: "+e);
+            Thread.sleep(1000);
+          }
+      } finally {
+        try {
+          client.close();
+        } catch {
+          case ignore: Throwable =>
+        }
+      }
+    }
+  }
+
+  class StompClient {
+
+    var socket:Socket = null
+    var out:OutputStream = null;
+    var in:InputStream = null
+
+    def open(host: String, port: Int) = {
+      socket = new Socket
+      socket.connect(new InetSocketAddress(host, port))
+      socket.setSoLinger(true, 0);
+      out = new BufferedOutputStream(socket.getOutputStream, bufferSize)
+      in = new BufferedInputStream(socket.getInputStream, bufferSize)
+    }
+
+    def close() = {
+      if( socket!=null ) {
+        socket.close
+        socket = null
+        out = null
+        in = null
+      }
+    }
+
+    def flush() = {
+      out.flush
+    }
+
+    def send(frame:String) = {
+      out.write(frame.getBytes("UTF-8"))
+      out.write(0)
+      out.write('\n')
+    }
+
+    def send(frame:Array[Byte]) = {
+      out.write(frame)
+      out.write(0)
+      out.write('\n')
+    }
+
+    def skip():Unit = {
+      var c = in.read;
+      while( c >= 0 ) {
+        if( c==0 ) {
+          return;
+        }
+        c = in.read()
+      }
+      throw new EOFException()
+    }
+
+    def receive():String = {
+      val buffer = new ByteArrayOutputStream(500)
+      var c = in.read;
+      while( c >= 0 ) {
+        if( c==0 ) {
+          return new String(buffer.toByteArray, "UTF-8")
+        }
+        buffer.write(c);
+        c = in.read()
+      }
+      throw new EOFException()
+    }
+
+    def receive(expect:String):String = {
+      val rc = receive()
+      if( !rc.trimFront.startsWith(expect) ) {
+        throw new ProtocolException("Expected "+expect)
+      }
+      rc
+    }
+
+  }
+
+  class ProducerThread(val id: Int) extends Thread {
+    val name: String = "producer " + id;
+    var client:StompClient=null
+    val content = ("SEND\n" +
+              "destination:"+destination(id)+"\n"+
+               { if(useContentLength) "content-length:"+messageSize+"\n" else "" } +
+              "\n"+message(name)).getBytes("UTF-8")
+
+    override def run() {
+      while (!done.get) {
+        StompClient.connect { client =>
+          this.client=client
+          var i =0;
+          while (!done.get) {
+            client.send(content)
+            producerCounter.incrementAndGet();
+            Thread.sleep(producerSleep);
+            i += 1
+          }
+        }
+      }
+    }
+  }
+
+  def message(name:String) = {
+    val buffer = new StringBuffer(messageSize)
+    buffer.append("Message from " + name+"\n");
+    for( i <- buffer.length to messageSize ) {
+      buffer.append(('a'+(i%26)).toChar)
+    }
+    var rc = buffer.toString
+    if( rc.length > messageSize ) {
+      rc.substring(0, messageSize)
+    } else {
+      rc
+    }
+  }
+
+  class ConsumerThread(val id: Int) extends Thread {
+    val name: String = "producer " + id;
+    var client:StompClient=null
+
+    override def run() {
+      while (!done.get) {
+        StompClient.connect { client =>
+          this.client=client
+          val headers = Map[AsciiBuffer, AsciiBuffer]();
+          client.send("""
+SUBSCRIBE
+destination:"""+destination(id)+"""
+
+""")
+          client.flush
+
+          while (!done.get) {
+            client.skip
+            consumerCounter.incrementAndGet();
+            Thread.sleep(consumerSleep);
+          }
+        }
+      }
+    }
+  }
+
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompQueue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompQueue.scala?rev=961062&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompQueue.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompQueue.scala Wed Jul  7 03:24:02 2010
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.ng
+
+import _root_.java.util.{LinkedList}
+import _root_.org.apache.activemq.util.buffer._
+import _root_.org.fusesource.hawtdispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+
+import collection.mutable.{HashMap}
+import collection.immutable.Queue
+
+object StompQueue {
+  val maxOutboundSize = 1024*1204*5
+}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class StompQueue(val destination:AsciiBuffer) extends BaseRetained with Route with Consumer with Producer {
+  
+  import StompQueue._;
+  
+  override val queue:DispatchQueue = createQueue("queue:"+destination);
+  queue.setTargetQueue(getRandomThreadQueue)
+  setDisposer(^{
+    queue.release
+  })
+
+
+  val delivery_buffer  = new DeliveryBuffer
+
+  class ConsumerState(val consumer:ConsumerSession) {
+    var bound=true
+
+    def deliver(value:Delivery):Unit = {
+      val delivery = Delivery(value)
+      delivery.setDisposer(^{
+        ^{ completed(value) } ->:queue
+      })
+      consumer.deliver(delivery);
+      delivery.release
+    }
+
+    def completed(delivery:Delivery) = {
+      // Lets get back on the readyList if  we are still bound.
+      if( bound ) {
+        readyConsumers.addLast(this)
+      }
+      delivery_buffer.ack(delivery)
+    }
+  }
+
+  var allConsumers = Map[Consumer,ConsumerState]()
+  val readyConsumers = new LinkedList[ConsumerState]()
+
+  def connected(consumers:List[Consumer]) = bind(consumers)
+  def bind(consumers:List[Consumer]) = retaining(consumers) {
+      for ( consumer <- consumers ) {
+        val cs = new ConsumerState(consumer.open_session(queue))
+        allConsumers += consumer->cs
+        readyConsumers.addLast(cs)
+      }
+      delivery_buffer.eventHandler.run
+    } ->: queue
+
+  def unbind(consumers:List[Consumer]) = releasing(consumers) {
+      for ( consumer <- consumers ) {
+        allConsumers.get(consumer) match {
+          case Some(cs)=>
+            cs.bound = false
+            cs.consumer.close
+            allConsumers -= consumer
+            readyConsumers.remove(cs)
+          case None=>
+        }
+      }
+    } ->: queue
+
+  def disconnected() = throw new RuntimeException("unsupported")
+
+  def collocate(value:DispatchQueue):Unit = {
+    if( value.getTargetQueue ne queue.getTargetQueue ) {
+      println(queue.getLabel+" co-locating with: "+value.getLabel);
+      this.queue.setTargetQueue(value.getTargetQueue)
+    }
+  }
+
+
+  delivery_buffer.eventHandler = ^{
+    while( !readyConsumers.isEmpty && !delivery_buffer.isEmpty ) {
+      val cs = readyConsumers.removeFirst
+      val delivery = delivery_buffer.receive
+      cs.deliver(delivery)
+    }
+  }
+
+
+  val deliveryQueue = new DeliveryCreditBufferProtocol(delivery_buffer, queue)
+  def open_session(producer_queue:DispatchQueue) = new ConsumerSession {
+    val session = deliveryQueue.session(producer_queue)
+    val consumer = StompQueue.this
+    retain
+
+    def deliver(delivery:Delivery) = session.send(delivery)
+    def close = {
+      session.close
+      release
+    }
+  }
+
+//  def open_session(producer_queue:DispatchQueue) = new ConsumerSession {
+//    val consumer = StompQueue.this
+//    val deliveryQueue = new DeliveryOverflowBuffer(delivery_buffer)
+//    retain
+//
+//    def deliver(delivery:Delivery) = using(delivery) {
+//      deliveryQueue.send(delivery)
+//    } ->: queue
+//
+//    def close = {
+//      release
+//    }
+//  }
+
+  
+}



Mime
View raw message