activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r830830 [1/2] - in /activemq/sandbox/activemq-apollo: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/test/java/org/apache/activemq/broker/ activemq-openwire/src/main/java/org/apache/activemq/openwire/ a...
Date Thu, 29 Oct 2009 02:55:35 GMT
Author: chirino
Date: Thu Oct 29 02:55:32 2009
New Revision: 830830

URL: http://svn.apache.org/viewvc?rev=830830&view=rev
Log:
big stomp refactor in progress:
 - it no longer depends on openwire.
 - Avoids utf8 encode/decodes
 - More helper methods in the Buffer class
 - The multi wireformat now works better
 - fixed a warning in webgen
 - Destination now provides a felixble parser class 


Added:
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/DefaultFrameTranslator.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/FrameTranslator.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/Stomp.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompConnection.java
      - copied, changed from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompFrame.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageDelivery.java
      - copied, changed from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageEvaluationContext.java
      - copied, changed from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageEvaluationContext.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompProtocolHandler.java
      - copied, changed from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompSslTransportFactory.java
      - copied, changed from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompTransportFactory.java
      - copied, changed from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormat.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormatFactory.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/XStreamFrameTranslator.java
      - copied, changed from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/package.html
      - copied, changed from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/package.html
Removed:
    activemq/sandbox/activemq-apollo/activemq-openwire/src/main/java/org/apache/activemq/wireformat/
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/wireformat/
Modified:
    activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java
    activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Destination.java
    activemq/sandbox/activemq-apollo/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-apollo/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
    activemq/sandbox/activemq-apollo/activemq-openwire/src/main/resources/META-INF/services/org/apache/activemq/wireformat/default
    activemq/sandbox/activemq-apollo/activemq-openwire/src/main/resources/META-INF/services/org/apache/activemq/wireformat/openwire
    activemq/sandbox/activemq-apollo/activemq-stomp/pom.xml
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp
    activemq/sandbox/activemq-apollo/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp
    activemq/sandbox/activemq-apollo/activemq-stomp/src/test/ide-resources/log4j.properties
    activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompBrokerTest.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteConsumer.java
    activemq/sandbox/activemq-apollo/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteProducer.java
    activemq/sandbox/activemq-apollo/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
    activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/AsciiBuffer.java
    activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java
    activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/ByteArrayInputStream.java
    activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/buffer/UTF8Buffer.java
    activemq/sandbox/activemq-apollo/webgen/src/styles/activemq/main.template

Modified: activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java Thu Oct 29 02:55:32 2009
@@ -62,7 +62,17 @@
             protocolHandler.onCommand(command);
         } else {
             try {
-                WireFormat wireformat = transport.getWireformat();
+                
+                WireFormat wireformat;
+                if( command instanceof WireFormat ) {
+                    // First command might be from the wire format decriminator, letting
+                    // us know what the actually wireformat is.
+                    wireformat = (WireFormat) command;
+                    command = null;
+                } else {
+                    wireformat = transport.getWireformat();
+                }
+                
                 try {
                     protocolHandler = ProtocolHandlerFactory.createProtocolHandler(wireformat.getName());
                 } catch(Exception e) {
@@ -79,7 +89,9 @@
                     }
                 });
                 
-                protocolHandler.onCommand(command);
+                if( command!=null ) {
+                    protocolHandler.onCommand(command);
+                }
                 
             } catch (Exception e) {
                 onException(e);

Modified: activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Destination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Destination.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Destination.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Destination.java Thu Oct 29 02:55:32 2009
@@ -20,15 +20,79 @@
 import java.util.HashSet;
 
 import org.apache.activemq.util.buffer.AsciiBuffer;
+import org.apache.activemq.util.buffer.Buffer;
+
+import static org.apache.activemq.util.buffer.AsciiBuffer.*;
 
 public interface Destination {
 
     AsciiBuffer getDomain();
-
     AsciiBuffer getName();
-
     Collection<Destination> getDestinations();
+    
+    public static class ParserOptions {
+        public AsciiBuffer defaultDomain;
+        public AsciiBuffer queuePrefix;
+        public AsciiBuffer topicPrefix;
+        public AsciiBuffer tempQueuePrefix;
+        public AsciiBuffer tempTopicPrefix;
+    }
 
+    static public class Parser {
+        
+        /**
+         * Parses a simple destination.
+         * 
+         * @param value
+         * @param options
+         * @return
+         */
+        public static Destination parse(AsciiBuffer value, ParserOptions options) {
+            if (options.queuePrefix!=null && value.startsWith(options.queuePrefix)) {
+                AsciiBuffer name = value.slice(options.queuePrefix.length, value.length).ascii();
+                return new SingleDestination(Router.QUEUE_DOMAIN, name);
+            } else if (options.topicPrefix!=null && value.startsWith(options.topicPrefix)) {
+                AsciiBuffer name = value.slice(options.topicPrefix.length, value.length).ascii();
+                return new SingleDestination(Router.TOPIC_DOMAIN, name);
+            } else if (options.tempQueuePrefix!=null && value.startsWith(options.tempQueuePrefix)) {
+                AsciiBuffer name = value.slice(options.tempQueuePrefix.length, value.length).ascii();
+                return new SingleDestination(Router.TEMP_QUEUE_DOMAIN, name);
+            } else if (options.tempTopicPrefix!=null && value.startsWith(options.tempTopicPrefix)) {
+                AsciiBuffer name = value.slice(options.tempTopicPrefix.length, value.length).ascii();
+                return new SingleDestination(Router.TEMP_TOPIC_DOMAIN, name);
+            } else {
+                if( options.defaultDomain==null ) {
+                    throw new IllegalArgumentException("Destination domain not provided: "+value);
+                }
+                return new SingleDestination(options.defaultDomain, value);
+            }
+        }
+        
+        /**
+         * Parses a destination which may or may not be a composite.
+         * 
+         * @param value
+         * @param options
+         * @param compositeSeparator
+         * @return
+         */
+        public static Destination parse(AsciiBuffer value, ParserOptions options, byte compositeSeparator) {
+            if( value == null ) {
+                return null;
+            }
+
+            if( value.contains(compositeSeparator) ) {
+                Buffer[] rc = value.split(compositeSeparator);
+                MultiDestination md = new MultiDestination();
+                for (Buffer buffer : rc) {
+                    md.add(parse(ascii(buffer), options));
+                }
+                return md;
+            }
+            return parse(ascii(value), options);
+        }        
+    }
+    
     public class SingleDestination implements Destination {
 
         private AsciiBuffer domain;

Modified: activemq/sandbox/activemq-apollo/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Thu Oct 29 02:55:32 2009
@@ -27,10 +27,9 @@
 
 import junit.framework.TestCase;
 
-import org.apache.activemq.apollo.broker.Destination;
 import org.apache.activemq.apollo.broker.Broker;
+import org.apache.activemq.apollo.broker.Destination;
 import org.apache.activemq.apollo.broker.Router;
-import org.apache.activemq.apollo.broker.VirtualHost;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.StoreFactory;
 import org.apache.activemq.dispatch.IDispatcher;
@@ -42,7 +41,7 @@
 
 public abstract class BrokerTestBase extends TestCase {
 
-    protected static final int PERFORMANCE_SAMPLES = 3;
+    protected static final int PERFORMANCE_SAMPLES = 30000;
 
     protected static final int IO_WORK_AMOUNT = 0;
     protected static final int FANIN_COUNT = 10;
@@ -98,23 +97,18 @@
         dispatcher = createDispatcher();
         dispatcher.start();
         
-        String brokerWireFormat = getRemoteWireFormat();
-        if(getSupportedWireFormats() != null)
-        {
-            brokerWireFormat= "multi&wireFormat.wireFormats=" + getSupportedWireFormats();
-        }
-        
         if (tcp) {
-            sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + brokerWireFormat;
-            receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=" + brokerWireFormat;
+            sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + getBrokerWireFormat();
+            receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=" + getBrokerWireFormat();
+            
             sendBrokerConnectURI = "tcp://localhost:10000?wireFormat=" + getRemoteWireFormat();
-            receiveBrokerConnectURI = "tcp://localhost:20000" + getRemoteWireFormat();
+            receiveBrokerConnectURI = "tcp://localhost:20000?wireFormat=" + getRemoteWireFormat();
         } else {
             sendBrokerConnectURI = "pipe://SendBroker";
             receiveBrokerConnectURI = "pipe://ReceiveBroker";
             if (forceMarshalling) {
-                sendBrokerBindURI = sendBrokerConnectURI + "?wireFormat=" + getRemoteWireFormat();
-                receiveBrokerBindURI = receiveBrokerConnectURI + "?wireFormat=" + getRemoteWireFormat();
+                sendBrokerBindURI = sendBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat();
+                receiveBrokerBindURI = receiveBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat();
             } else {
                 sendBrokerBindURI = sendBrokerConnectURI;
                 receiveBrokerBindURI = receiveBrokerConnectURI;
@@ -122,8 +116,8 @@
         }
     }
 
-    protected String getSupportedWireFormats() {
-        return null;
+    protected String getBrokerWireFormat() {
+        return "multi";
     }
 
     protected abstract String getRemoteWireFormat();

Modified: activemq/sandbox/activemq-apollo/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java Thu Oct 29 02:55:32 2009
@@ -25,6 +25,7 @@
  */
 public class OpenWireFormatFactory implements WireFormatFactory {
 
+    private static final Buffer MAGIC = new Buffer(new byte[] { 1, 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q' });
     //
     // The default values here are what the wire format changes to after a
     // default negotiation.
@@ -138,14 +139,17 @@
     }
     
     public boolean isDiscriminatable() {
-        return false;
+        return true;
     }
 
-    public boolean matchesWireformatHeader(Buffer byteSequence) {
-        throw new UnsupportedOperationException();
+    public boolean matchesWireformatHeader(Buffer buffer) {
+        if (buffer.length == 4 + MAGIC.length) {
+            return buffer.containsAt(MAGIC, 4);
+        }
+        return false;
     }
 
     public int maxWireformatHeaderLength() {
-        throw new UnsupportedOperationException();
+        return 4 + MAGIC.length;
     }
 }

Modified: activemq/sandbox/activemq-apollo/activemq-openwire/src/main/resources/META-INF/services/org/apache/activemq/wireformat/default
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-openwire/src/main/resources/META-INF/services/org/apache/activemq/wireformat/default?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-openwire/src/main/resources/META-INF/services/org/apache/activemq/wireformat/default (original)
+++ activemq/sandbox/activemq-apollo/activemq-openwire/src/main/resources/META-INF/services/org/apache/activemq/wireformat/default Thu Oct 29 02:55:32 2009
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.wireformat.DiscriminatableOpenWireFormatFactory
+class=org.apache.activemq.openwire.OpenWireFormatFactory

Modified: activemq/sandbox/activemq-apollo/activemq-openwire/src/main/resources/META-INF/services/org/apache/activemq/wireformat/openwire
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-openwire/src/main/resources/META-INF/services/org/apache/activemq/wireformat/openwire?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-openwire/src/main/resources/META-INF/services/org/apache/activemq/wireformat/openwire (original)
+++ activemq/sandbox/activemq-apollo/activemq-openwire/src/main/resources/META-INF/services/org/apache/activemq/wireformat/openwire Thu Oct 29 02:55:32 2009
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-class=org.apache.activemq.wireformat.DiscriminatableOpenWireFormatFactory
+class=org.apache.activemq.openwire.OpenWireFormatFactory

Modified: activemq/sandbox/activemq-apollo/activemq-stomp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/pom.xml?rev=830830&r1=830829&r2=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/pom.xml (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/pom.xml Thu Oct 29 02:55:32 2009
@@ -33,25 +33,40 @@
 
   <dependencies>
 
-    <!-- We should try to remove this depdency -->
-    <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-openwire</artifactId>
-    </dependency>
-
     <dependency>
       <groupId>com.thoughtworks.xstream</groupId>
       <artifactId>xstream</artifactId>
     </dependency>        
-    
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-broker</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-bio</artifactId>
+    </dependency>
+
     <!-- Testing Dependencies -->
     <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-broker</artifactId>
+      <version>${activemq-version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-util</artifactId>
+      <version>${activemq-version}</version>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-jaxb</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
@@ -61,7 +76,7 @@
       <artifactId>log4j</artifactId>
       <scope>test</scope>
     </dependency>
-
+    
   </dependencies>
 
 </project>

Added: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/DefaultFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/DefaultFrameTranslator.java?rev=830830&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/DefaultFrameTranslator.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/DefaultFrameTranslator.java Thu Oct 29 02:55:32 2009
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp;
+
+import org.apache.activemq.apollo.broker.Destination;
+import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.apollo.broker.Destination.Parser;
+import org.apache.activemq.util.buffer.AsciiBuffer;
+
+
+/**
+ * Implements ActiveMQ 4.0 translations
+ */
+public class DefaultFrameTranslator implements FrameTranslator {
+    
+    private static final Destination.ParserOptions PARSER_OPTIONS = new Destination.ParserOptions();
+    static {
+        PARSER_OPTIONS.defaultDomain = Router.QUEUE_DOMAIN;
+        PARSER_OPTIONS.queuePrefix = new AsciiBuffer("/queue/");
+        PARSER_OPTIONS.topicPrefix = new AsciiBuffer("/topic/");
+        PARSER_OPTIONS.tempQueuePrefix = new AsciiBuffer("/remote-temp-queue/");
+        PARSER_OPTIONS.tempTopicPrefix = new AsciiBuffer("/remote-temp-topic/");
+    }
+    
+    public Destination convert(AsciiBuffer dest) {
+        return Parser.parse(dest, PARSER_OPTIONS);
+    }
+	
+//    public ActiveMQMessage convertToOpenwireMessage(StompProtocolHandler converter, StompFrame command) throws JMSException, ProtocolException {
+//        final Map<String, String> headers = command.getHeaders();
+//        final ActiveMQMessage msg;
+//        if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
+//            headers.remove(Stomp.Headers.CONTENT_LENGTH);
+//            ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
+//            bm.writeBytes(command.getContent());
+//            msg = bm;
+//        } else {
+//            ActiveMQTextMessage text = new ActiveMQTextMessage();
+//            try {
+//                text.setText(new String(command.getContent(), "UTF-8"));
+//            } catch (Throwable e) {
+//                throw new ProtocolException("Text could not bet set: " + e, false, e);
+//            }
+//            msg = text;
+//        }
+//        FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
+//        return msg;
+//    }
+//
+//    public StompFrame convertFromOpenwireMessage(StompProtocolHandler converter, ActiveMQMessage message) throws IOException, JMSException {
+//        StompFrame command = new StompFrame();
+//        command.setAction(Stomp.Responses.MESSAGE);
+//        Map<String, String> headers = new HashMap<String, String>(25);
+//        command.setHeaders(headers);
+//
+//        FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this);
+//
+//        if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
+//
+//            ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
+//            command.setContent(msg.getText().getBytes("UTF-8"));
+//
+//        } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
+//
+//            ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
+//            msg.setReadOnlyBody(true);
+//            byte[] data = new byte[(int)msg.getBodyLength()];
+//            msg.readBytes(data);
+//
+//            headers.put(Stomp.Headers.CONTENT_LENGTH, "" + data.length);
+//            command.setContent(data);
+//        }
+//        return command;
+//    }
+//
+//    public String convertFromOpenwireDestination(StompProtocolHandler converter, ActiveMQDestination activeMQDestination) {
+//        if (activeMQDestination == null) {
+//            return null;
+//        }
+//        String physicalName = activeMQDestination.getPhysicalName();
+//
+//        String rc = converter.getCreatedTempDestinationName(activeMQDestination);
+//        if( rc!=null ) {
+//        	return rc;
+//        }
+//        
+//        StringBuffer buffer = new StringBuffer();
+//        if (activeMQDestination.isQueue()) {
+//            if (activeMQDestination.isTemporary()) {
+//                buffer.append("/remote-temp-queue/");
+//            } else {
+//                buffer.append("/queue/");
+//            }
+//        } else {
+//            if (activeMQDestination.isTemporary()) {
+//                buffer.append("/remote-temp-topic/");
+//            } else {
+//                buffer.append("/topic/");
+//            }
+//        }
+//        buffer.append(physicalName);
+//        return buffer.toString();
+//    }
+//
+//    public ActiveMQDestination convertToOpenwireDestination(StompProtocolHandler converter, String name) throws ProtocolException {
+//        if (name == null) {
+//            return null;
+//        } else if (name.startsWith("/queue/")) {
+//            String qName = name.substring("/queue/".length(), name.length());
+//            return ActiveMQDestination.createDestination(qName, ActiveMQDestination.QUEUE_TYPE);
+//        } else if (name.startsWith("/topic/")) {
+//            String tName = name.substring("/topic/".length(), name.length());
+//            return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE);
+//        } else if (name.startsWith("/remote-temp-queue/")) {
+//            String tName = name.substring("/remote-temp-queue/".length(), name.length());
+//            return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE);
+//        } else if (name.startsWith("/remote-temp-topic/")) {
+//            String tName = name.substring("/remote-temp-topic/".length(), name.length());
+//            return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE);
+//        } else if (name.startsWith("/temp-queue/")) {
+//            return converter.createTempQueue(name);
+//        } else if (name.startsWith("/temp-topic/")) {
+//            return converter.createTempTopic(name);
+//        } else {
+//            throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
+//                                        + "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+//        }
+//    }
+//
+//    public String convertFromDestination(StompProtocolHandler converter, Destination d) throws ProtocolException {
+//        if (d == null) {
+//            return null;
+//        }
+//        
+//        StringBuffer buffer = new StringBuffer();
+//        if( d.getDomain().equals(Router.QUEUE_DOMAIN) ) {
+//            buffer.append("/queue/");
+//        } else if( d.getDomain().equals(Router.QUEUE_DOMAIN) ) {
+//            buffer.append("/topic/");
+//        } else {
+//            throw new ProtocolException("Illegal destination: Stomp can only handle queue or topic Domains");
+//        }
+//        
+//        buffer.append(d.getName().toString());
+//        return buffer.toString();
+//    }
+//
+//    public Destination convertToDestination(StompProtocolHandler converter, String name) throws ProtocolException {
+//        if (name == null) {
+//            return null;
+//        } else if (name.startsWith("/queue/")) {
+//            String qName = name.substring("/queue/".length(), name.length());
+//            return new Destination.SingleDestination(Router.QUEUE_DOMAIN, new AsciiBuffer(qName));
+//        } else if (name.startsWith("/topic/")) {
+//            String tName = name.substring("/topic/".length(), name.length());
+//            return new Destination.SingleDestination(Router.TOPIC_DOMAIN, new AsciiBuffer(tName));
+//        } else if (name.startsWith("/remote-temp-queue/")) {
+//            throw new UnsupportedOperationException();
+//        } else if (name.startsWith("/remote-temp-topic/")) {
+//            throw new UnsupportedOperationException();
+//        } else if (name.startsWith("/temp-queue/")) {
+//            throw new UnsupportedOperationException();
+//        } else if (name.startsWith("/temp-topic/")) {
+//            throw new UnsupportedOperationException();
+//        } else {
+//            throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
+//                                        + "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+//        }
+//    }
+}

Added: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/FrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/FrameTranslator.java?rev=830830&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/FrameTranslator.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/FrameTranslator.java Thu Oct 29 02:55:32 2009
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp;
+
+import org.apache.activemq.apollo.broker.Destination;
+import org.apache.activemq.util.buffer.AsciiBuffer;
+
+
+/**
+ * Implementations of this interface are used to map back and forth from Stomp
+ * to ActiveMQ. There are several standard mappings which are semantically the
+ * same, the inner class, Helper, provides functions to copy those properties
+ * from one to the other
+ */
+public interface FrameTranslator {
+
+    Destination convert(AsciiBuffer dest);
+//    
+//    ActiveMQMessage convertToOpenwireMessage(StompProtocolHandler converter, StompFrame frame) throws JMSException, ProtocolException;
+//    StompFrame convertFromOpenwireMessage(StompProtocolHandler converter, ActiveMQMessage message) throws IOException, JMSException;
+//    
+//    String convertFromOpenwireDestination(StompProtocolHandler converter, ActiveMQDestination d);
+//    ActiveMQDestination convertToOpenwireDestination(StompProtocolHandler converter, String name) throws ProtocolException;
+//
+//    String convertFromDestination(StompProtocolHandler converter, Destination d) throws ProtocolException;
+//    Destination convertToDestination(StompProtocolHandler converter, String name) throws ProtocolException;
+//
+//    /**
+//     * Helper class which holds commonly needed functions used when implementing
+//     * FrameTranslators
+//     */
+//    static final class Helper {
+//
+//        private Helper() {
+//        }
+//
+//        public static void copyStandardHeadersFromMessageToFrame(StompProtocolHandler converter, ActiveMQMessage message, StompFrame command, FrameTranslator ft) throws IOException {
+//            final Map<String, String> headers = command.getHeaders();
+//            headers.put(Stomp.Headers.Message.DESTINATION, ft.convertFromOpenwireDestination(converter, message.getDestination()));
+//            headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID());
+//
+//            if (message.getJMSCorrelationID() != null) {
+//                headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getJMSCorrelationID());
+//            }
+//            headers.put(Stomp.Headers.Message.EXPIRATION_TIME, "" + message.getJMSExpiration());
+//
+//            if (message.getJMSRedelivered()) {
+//                headers.put(Stomp.Headers.Message.REDELIVERED, "true");
+//            }
+//            headers.put(Stomp.Headers.Message.PRORITY, "" + message.getJMSPriority());
+//
+//            if (message.getJMSReplyTo() != null) {
+//                headers.put(Stomp.Headers.Message.REPLY_TO, ft.convertFromOpenwireDestination(converter, (ActiveMQDestination) message.getJMSReplyTo()));
+//            }
+//            headers.put(Stomp.Headers.Message.TIMESTAMP, "" + message.getJMSTimestamp());
+//
+//            if (message.getJMSType() != null) {
+//                headers.put(Stomp.Headers.Message.TYPE, message.getJMSType());
+//            }
+//
+//            // now lets add all the message headers
+//            final Map<String, Object> properties = message.getProperties();
+//            if (properties != null) {
+//                for (Map.Entry<String, Object> prop : properties.entrySet()) {
+//                    headers.put(prop.getKey(), "" + prop.getValue());
+//                }
+//            }
+//        }
+//
+//        public static void copyStandardHeadersFromFrameToMessage(StompProtocolHandler converter, StompFrame command, ActiveMQMessage msg, FrameTranslator ft) throws ProtocolException, JMSException {
+//            final Map<String, String> headers = new HashMap<String, String>(command.getHeaders());
+//            final String destination = headers.remove(Stomp.Headers.Send.DESTINATION);
+//            msg.setDestination(ft.convertToOpenwireDestination(converter, destination));
+//
+//            // the standard JMS headers
+//            msg.setJMSCorrelationID(headers.remove(Stomp.Headers.Send.CORRELATION_ID));
+//
+//            Object o = headers.remove(Stomp.Headers.Send.EXPIRATION_TIME);
+//            if (o != null) {
+//                msg.setJMSExpiration(Long.parseLong((String)o));
+//            }
+//
+//            o = headers.remove(Stomp.Headers.Send.PRIORITY);
+//            if (o != null) {
+//                msg.setJMSPriority(Integer.parseInt((String)o));
+//            }
+//
+//            o = headers.remove(Stomp.Headers.Send.TYPE);
+//            if (o != null) {
+//                msg.setJMSType((String)o);
+//            }
+//
+//            o = headers.remove(Stomp.Headers.Send.REPLY_TO);
+//            if (o != null) {
+//                msg.setJMSReplyTo(ft.convertToOpenwireDestination(converter, (String)o));
+//            }
+//
+//            o = headers.remove(Stomp.Headers.Send.PERSISTENT);
+//            if (o != null) {
+//                msg.setPersistent("true".equals(o));
+//            }
+//
+//            // now the general headers
+//            msg.setProperties(headers);
+//        }
+//    }
+}

Added: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/Stomp.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/Stomp.java?rev=830830&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/Stomp.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/Stomp.java Thu Oct 29 02:55:32 2009
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp;
+
+import org.apache.activemq.util.buffer.AsciiBuffer;
+import org.apache.activemq.util.buffer.Buffer;
+
+public interface Stomp {
+    
+    Buffer EMPTY_BUFFER = new Buffer(0);
+    byte NULL = 0;
+    Buffer NULL_BUFFER = new Buffer(new byte[]{NULL});
+    byte NEWLINE = '\n';
+    Buffer NEWLINE_BUFFER = new Buffer(new byte[]{NEWLINE});
+    Buffer END_OF_FRAME_BUFFER = new Buffer(new byte[]{NULL, NEWLINE});
+    
+    AsciiBuffer TRUE = new AsciiBuffer("true");
+    AsciiBuffer FALSE = new AsciiBuffer("false");
+
+    public static interface Commands {
+        AsciiBuffer CONNECT = new AsciiBuffer("CONNECT");
+        AsciiBuffer SEND = new AsciiBuffer("SEND");
+        AsciiBuffer DISCONNECT = new AsciiBuffer("DISCONNECT");
+        AsciiBuffer SUBSCRIBE = new AsciiBuffer("SUB");
+        AsciiBuffer UNSUBSCRIBE = new AsciiBuffer("UNSUB");
+
+        AsciiBuffer BEGIN_TRANSACTION = new AsciiBuffer("BEGIN");
+        AsciiBuffer COMMIT_TRANSACTION = new AsciiBuffer("COMMIT");
+        AsciiBuffer ABORT_TRANSACTION = new AsciiBuffer("ABORT");
+        AsciiBuffer BEGIN = new AsciiBuffer("BEGIN");
+        AsciiBuffer COMMIT = new AsciiBuffer("COMMIT");
+        AsciiBuffer ABORT = new AsciiBuffer("ABORT");
+        AsciiBuffer ACK = new AsciiBuffer("ACK");
+    }
+
+    public interface Responses {
+        AsciiBuffer CONNECTED = new AsciiBuffer("CONNECTED");
+        AsciiBuffer ERROR = new AsciiBuffer("ERROR");
+        AsciiBuffer MESSAGE = new AsciiBuffer("MESSAGE");
+        AsciiBuffer RECEIPT = new AsciiBuffer("RECEIPT");
+    }
+
+    public interface Headers {
+        byte SEPERATOR = ':';
+        Buffer SEPERATOR_BUFFER = new Buffer(new byte[]{SEPERATOR});
+        
+        AsciiBuffer RECEIPT_REQUESTED = new AsciiBuffer("receipt");
+        AsciiBuffer TRANSACTION = new AsciiBuffer("transaction");
+        AsciiBuffer CONTENT_LENGTH = new AsciiBuffer("content-length");
+        AsciiBuffer TRANSFORMATION = new AsciiBuffer("transformation");
+        AsciiBuffer TRANSFORMATION_ERROR = new AsciiBuffer("transformation-error");
+
+        public interface Response {
+            AsciiBuffer RECEIPT_ID = new AsciiBuffer("receipt-id");
+        }
+
+        public interface Send {
+            AsciiBuffer DESTINATION = new AsciiBuffer("destination");
+            AsciiBuffer CORRELATION_ID = new AsciiBuffer("correlation-id");
+            AsciiBuffer REPLY_TO = new AsciiBuffer("reply-to");
+            AsciiBuffer EXPIRATION_TIME = new AsciiBuffer("expires");
+            AsciiBuffer PRIORITY = new AsciiBuffer("priority");
+            AsciiBuffer TYPE = new AsciiBuffer("type");
+            AsciiBuffer PERSISTENT = new AsciiBuffer("persistent");
+        }
+
+        public interface Message {
+            AsciiBuffer MESSAGE_ID = new AsciiBuffer("message-id");
+            AsciiBuffer DESTINATION = new AsciiBuffer("destination");
+            AsciiBuffer CORRELATION_ID = new AsciiBuffer("correlation-id");
+            AsciiBuffer EXPIRATION_TIME = new AsciiBuffer("expires");
+            AsciiBuffer REPLY_TO = new AsciiBuffer("reply-to");
+            AsciiBuffer PRORITY = new AsciiBuffer("priority");
+            AsciiBuffer REDELIVERED = new AsciiBuffer("redelivered");
+            AsciiBuffer TIMESTAMP = new AsciiBuffer("timestamp");
+            AsciiBuffer TYPE = new AsciiBuffer("type");
+            AsciiBuffer SUBSCRIPTION = new AsciiBuffer("subscription");
+        }
+
+        public interface Subscribe {
+            AsciiBuffer DESTINATION = new AsciiBuffer("destination");
+            AsciiBuffer ACK_MODE = new AsciiBuffer("ack");
+            AsciiBuffer ID = new AsciiBuffer("id");
+            AsciiBuffer SELECTOR = new AsciiBuffer("selector");
+
+            public interface AckModeValues {
+                AsciiBuffer AUTO = new AsciiBuffer("auto");
+                AsciiBuffer CLIENT = new AsciiBuffer("client");
+                AsciiBuffer INDIVIDUAL = new AsciiBuffer("client-individual");
+            }
+        }
+
+        public interface Unsubscribe {
+            AsciiBuffer DESTINATION = new AsciiBuffer("destination");
+            AsciiBuffer ID = new AsciiBuffer("id");
+        }
+
+        public interface Connect {
+            AsciiBuffer LOGIN = new AsciiBuffer("login");
+            AsciiBuffer PASSCODE = new AsciiBuffer("passcode");
+            AsciiBuffer CLIENT_ID = new AsciiBuffer("client-id");
+            AsciiBuffer REQUEST_ID = new AsciiBuffer("request-id");
+        }
+
+        public interface Error {
+            AsciiBuffer MESSAGE = new AsciiBuffer("message");
+        }
+
+        public interface Connected {
+            AsciiBuffer SESSION = new AsciiBuffer("session");
+            AsciiBuffer RESPONSE_ID = new AsciiBuffer("response-id");
+        }
+
+        public interface Ack {
+            AsciiBuffer MESSAGE_ID = new AsciiBuffer("message-id");
+        }
+    }
+    
+	public enum Transformations {
+		JMS_BYTE, JMS_OBJECT_XML, JMS_OBJECT_JSON, JMS_MAP_XML, JMS_MAP_JSON;
+		
+		public String toString() {
+			return name().replaceAll("_", "-").toLowerCase();
+		}
+		
+		public static Transformations getValue(String value) {
+			return valueOf(value.replaceAll("-", "_").toUpperCase());
+		}
+	}    
+}

Copied: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompConnection.java (from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompConnection.java?p2=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompConnection.java&p1=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java&r1=830516&r2=830830&rev=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompConnection.java Thu Oct 29 02:55:32 2009
@@ -15,95 +15,60 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.apollo.stomp;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.Socket;
 import java.net.UnknownHostException;
 import java.util.HashMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
-import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;
+import org.apache.activemq.util.buffer.AsciiBuffer;
+
+import static org.apache.activemq.apollo.stomp.StompWireFormat.*;
+import static org.apache.activemq.util.buffer.AsciiBuffer.*;
+import static org.apache.activemq.util.buffer.UTF8Buffer.*;
 
-public class StompConnection {
 
-    public static final long RECEIVE_TIMEOUT = 10000;
+public class StompConnection {
 
-    private Socket stompSocket;
-    private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
+    private Socket socket;
+    private BufferedInputStream is;
+    private BufferedOutputStream os;
     
     public void open(String host, int port) throws IOException, UnknownHostException {
         open(new Socket(host, port));
     }
     
-    public void open(Socket socket) {
-    	stompSocket = socket;
+    public void open(Socket socket) throws IOException {
+    	this.socket = socket;
+    	this.is = new BufferedInputStream(socket.getInputStream());
+        this.os = new BufferedOutputStream(socket.getOutputStream());
     }
 
     public void close() throws IOException {
-        if (stompSocket != null) {
-            stompSocket.close();
-            stompSocket = null;
+        if (socket != null) {
+            socket.close();
+            socket = null;
         }
     }
 
-    public void sendFrame(String data) throws Exception {
-        byte[] bytes = data.getBytes("UTF-8");
-        OutputStream outputStream = stompSocket.getOutputStream();
-        outputStream.write(bytes);
-        outputStream.write(0);
-        outputStream.flush();
+    public void sendFrame(StompFrame frame) throws Exception {
+        write(frame, os);
+        os.flush();
     }
     
     public StompFrame receive() throws Exception {
-        return receive(RECEIVE_TIMEOUT);
+        return read(is);
     }    
     
-    public StompFrame receive(long timeOut) throws Exception {
-    	stompSocket.setSoTimeout((int)timeOut);
-    	InputStream is = stompSocket.getInputStream();
-        StompWireFormat wf = new StompWireFormat();
-        DataInputStream dis = new DataInputStream(is);
-        return (StompFrame)wf.unmarshal(dis);
-    }
-
-    public String receiveFrame() throws Exception {
-        return receiveFrame(RECEIVE_TIMEOUT);
-    }
-
-    public String receiveFrame(long timeOut) throws Exception {
-        stompSocket.setSoTimeout((int)timeOut);
-        InputStream is = stompSocket.getInputStream();
-        int c = 0;
-        for (;;) {
-            c = is.read();
-            if (c < 0) {
-                throw new IOException("socket closed.");
-            } else if (c == 0) {
-                c = is.read();
-                if (c != '\n') {
-                    throw new IOException("Expecting stomp frame to terminate with \0\n");
-                }
-                byte[] ba = inputBuffer.toByteArray();
-                inputBuffer.reset();
-                return new String(ba, "UTF-8");
-            } else {
-                inputBuffer.write(c);
-            }
-        }
-    }
-
 	public Socket getStompSocket() {
-		return stompSocket;
+		return socket;
 	}
 
 	public void setStompSocket(Socket stompSocket) {
-		this.stompSocket = stompSocket;
+		this.socket = stompSocket;
 	}
 	
     public void connect(String username, String password) throws Exception {
@@ -111,40 +76,40 @@
     }
 	
     public void connect(String username, String password, String client) throws Exception {
-    	HashMap<String, String> headers = new HashMap();
-    	headers.put("login", username);
-    	headers.put("passcode", password);
+    	HashMap<AsciiBuffer, AsciiBuffer> headers = new HashMap<AsciiBuffer, AsciiBuffer>();
+    	headers.put(Stomp.Headers.Connect.LOGIN, ascii(username));
+    	headers.put(Stomp.Headers.Connect.PASSCODE, ascii(password));
     	if (client != null) {
-    		headers.put("client-id", client);
+    		headers.put(Stomp.Headers.Connect.CLIENT_ID, ascii(client));
     	}
-    	StompFrame frame = new StompFrame("CONNECT", headers);
-        sendFrame(frame.toString());
+    	StompFrame frame = new StompFrame(Stomp.Commands.CONNECT, headers);
+        sendFrame(frame);
         
         StompFrame connect = receive();
         if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
-        	throw new Exception ("Not connected: " + connect.getBody());
+        	throw new Exception ("Not connected: " + utf8(connect.getContent()));
         }
     }
     
     public void disconnect() throws Exception {
-    	StompFrame frame = new StompFrame("DISCONNECT");
-        sendFrame(frame.toString());    	
+    	StompFrame frame = new StompFrame(Stomp.Commands.DISCONNECT);
+        sendFrame(frame);
     }
     
     public void send(String destination, String message) throws Exception {
     	send(destination, message, null, null);
     }
     
-    public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception {
+    public void send(String destination, String message, String transaction, HashMap<AsciiBuffer, AsciiBuffer> headers) throws Exception {
     	if (headers == null) {
-    		headers = new HashMap<String, String>();
+    		headers = new HashMap<AsciiBuffer, AsciiBuffer>();
     	}
-    	headers.put("destination", destination);
+    	headers.put(Stomp.Headers.Send.DESTINATION, ascii(destination));
     	if (transaction != null) {
-    		headers.put("transaction", transaction);
+    		headers.put(Stomp.Headers.TRANSACTION, ascii(transaction));
     	}
-    	StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
-        sendFrame(frame.toString());    	
+    	StompFrame frame = new StompFrame(Stomp.Commands.SEND, headers, utf8(message));
+        sendFrame(frame);
     }
     
     public void subscribe(String destination) throws Exception {
@@ -152,61 +117,61 @@
     }
     
     public void subscribe(String destination, String ack) throws Exception {
-    	subscribe(destination, ack, new HashMap<String, String>());
+    	subscribe(destination, ack, new HashMap<AsciiBuffer, AsciiBuffer>());
     }
     
-    public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception {
+    public void subscribe(String destination, String ack, HashMap<AsciiBuffer, AsciiBuffer> headers) throws Exception {
 		if (headers == null) {
-			headers = new HashMap<String, String>();
+			headers = new HashMap<AsciiBuffer, AsciiBuffer>();
 		}
-		headers.put("destination", destination);
+		headers.put(Stomp.Headers.Subscribe.DESTINATION, ascii(destination));
     	if (ack != null) {
-    		headers.put("ack", ack);
+    		headers.put(Stomp.Headers.Subscribe.ACK_MODE, ascii(ack));
     	}
-    	StompFrame frame = new StompFrame("SUBSCRIBE", headers);
-        sendFrame(frame.toString());    	
+    	StompFrame frame = new StompFrame(Stomp.Commands.SUBSCRIBE, headers);
+        sendFrame(frame);
     }
     
     public void unsubscribe(String destination) throws Exception {
     	unsubscribe(destination, null);
     }
     
-    public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception {
+    public void unsubscribe(String destination, HashMap<AsciiBuffer, AsciiBuffer> headers) throws Exception {
 		if (headers == null) {
-			headers = new HashMap<String, String>();
+			headers = new HashMap<AsciiBuffer, AsciiBuffer>();
 		}
-		headers.put("destination", destination);
-    	StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
-        sendFrame(frame.toString());    	
+		headers.put(Stomp.Headers.Unsubscribe.DESTINATION, ascii(destination));
+    	StompFrame frame = new StompFrame(Stomp.Commands.UNSUBSCRIBE, headers);
+        sendFrame(frame);
     }    
     
     public void begin(String transaction) throws Exception {
-    	HashMap<String, String> headers = new HashMap<String, String>();
-    	headers.put("transaction", transaction);
-    	StompFrame frame = new StompFrame("BEGIN", headers);
-    	sendFrame(frame.toString());
+    	HashMap<AsciiBuffer, AsciiBuffer> headers = new HashMap<AsciiBuffer, AsciiBuffer>();
+    	headers.put(Stomp.Headers.TRANSACTION, ascii(transaction));
+    	StompFrame frame = new StompFrame(Stomp.Commands.BEGIN, headers);
+        sendFrame(frame);
     }
     
     public void abort(String transaction) throws Exception {
-    	HashMap<String, String> headers = new HashMap<String, String>();
-    	headers.put("transaction", transaction);
-    	StompFrame frame = new StompFrame("ABORT", headers);
-    	sendFrame(frame.toString());
+    	HashMap<AsciiBuffer, AsciiBuffer> headers = new HashMap<AsciiBuffer, AsciiBuffer>();
+    	headers.put(Stomp.Headers.TRANSACTION, ascii(transaction));
+    	StompFrame frame = new StompFrame(Stomp.Commands.ABORT, headers);
+        sendFrame(frame);
     }
     
     public void commit(String transaction) throws Exception {
-    	HashMap<String, String> headers = new HashMap<String, String>();
-    	headers.put("transaction", transaction);
-    	StompFrame frame = new StompFrame("COMMIT", headers);
-    	sendFrame(frame.toString());
+    	HashMap<AsciiBuffer, AsciiBuffer> headers = new HashMap<AsciiBuffer, AsciiBuffer>();
+    	headers.put(Stomp.Headers.TRANSACTION, ascii(transaction));
+    	StompFrame frame = new StompFrame(Stomp.Commands.COMMIT, headers);
+    	sendFrame(frame);
     }
     
     public void ack(StompFrame frame) throws Exception {
-    	ack(frame.getHeaders().get("message-id"), null);
+    	ack(frame.get(Stomp.Headers.Ack.MESSAGE_ID), null);
     }    
     
     public void ack(StompFrame frame, String transaction) throws Exception {
-    	ack(frame.getHeaders().get("message-id"), transaction);
+    	ack(frame.get(Stomp.Headers.Ack.MESSAGE_ID), transaction);
     }
     
     public void ack(String messageId) throws Exception {
@@ -214,21 +179,16 @@
     }
     
     public void ack(String messageId, String transaction) throws Exception {
-    	HashMap<String, String> headers = new HashMap<String, String>();
-    	headers.put("message-id", messageId);
-    	if (transaction != null)
-    		headers.put("transaction", transaction);
-    	StompFrame frame = new StompFrame("ACK", headers);
-    	sendFrame(frame.toString());	
+        ack(ascii(messageId), transaction);
     }
     
-    protected String appendHeaders(HashMap<String, Object> headers) {
-    	StringBuffer result = new StringBuffer();
-    	for (String key : headers.keySet()) {
-    		result.append(key + ":" + headers.get(key) + "\n");
-    	}
-    	result.append("\n");
-    	return result.toString();
+    private void ack(AsciiBuffer messageId, String transaction) throws Exception {
+    	HashMap<AsciiBuffer, AsciiBuffer> headers = new HashMap<AsciiBuffer, AsciiBuffer>();
+    	headers.put(Stomp.Headers.Ack.MESSAGE_ID, messageId);
+    	if (transaction != null)
+    		headers.put(Stomp.Headers.TRANSACTION, ascii(transaction));
+    	StompFrame frame = new StompFrame(Stomp.Commands.ACK, headers);
+    	sendFrame(frame);	
     }
 
 }

Added: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompFrame.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompFrame.java?rev=830830&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompFrame.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompFrame.java Thu Oct 29 02:55:32 2009
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.activemq.util.buffer.AsciiBuffer;
+import org.apache.activemq.util.buffer.Buffer;
+
+/**
+ * Represents all the data in a STOMP frame.
+ * 
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class StompFrame {
+
+    public static final Buffer NO_DATA = new Buffer(0);
+
+    private AsciiBuffer action;
+    private Map<AsciiBuffer, AsciiBuffer> headers = new HashMap<AsciiBuffer, AsciiBuffer>();
+    private Buffer content = NO_DATA;
+
+    public StompFrame(AsciiBuffer command) {
+    	this(command, null, null);
+    }
+    
+    public StompFrame(AsciiBuffer command, Map<AsciiBuffer, AsciiBuffer> headers) {
+    	this(command, headers, null);
+    }    
+    
+    public StompFrame(AsciiBuffer command, Map<AsciiBuffer, AsciiBuffer> headers, Buffer data) {
+        this.action = command;
+        if (headers != null)
+        	this.headers = headers;
+        if (data != null)
+        	this.content = data;
+    }
+    
+    public StompFrame() {
+    }
+
+    public AsciiBuffer getAction() {
+        return action;
+    }
+
+    public void setAction(AsciiBuffer command) {
+        this.action = command;
+    }
+
+    public Buffer getContent() {
+        return content;
+    }
+    
+    public void setContent(Buffer data) {
+        this.content = data;
+    }
+
+    public AsciiBuffer get(AsciiBuffer header) {
+        return headers.get(header);
+    }
+    
+    public AsciiBuffer put(AsciiBuffer key, AsciiBuffer value) {
+        return headers.put(key, value);
+    }
+
+    public Map<AsciiBuffer, AsciiBuffer> getHeaders() {
+        return headers;
+    }
+
+    public void setHeaders(Map<AsciiBuffer, AsciiBuffer> headers) {
+        this.headers = headers;
+    }
+
+    public String toString() {
+        StringBuffer buffer = new StringBuffer();
+        buffer.append(getAction());
+        buffer.append("\n");
+        
+        for (Entry<AsciiBuffer, AsciiBuffer> entry : headers.entrySet()) {
+            buffer.append(entry.getKey());
+            buffer.append(":");
+            buffer.append(entry.getValue());
+            buffer.append("\n");
+        }
+
+        buffer.append("\n");
+        if (getContent() != null) {
+            try {
+                buffer.append(getContent());
+            } catch (Throwable e) {
+            }
+        }
+        return buffer.toString();
+    }
+
+}

Copied: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageDelivery.java (from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageDelivery.java?p2=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageDelivery.java&p1=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java&r1=830516&r2=830830&rev=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageDelivery.java Thu Oct 29 02:55:32 2009
@@ -14,24 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.stomp;
+package org.apache.activemq.apollo.stomp;
 
 import org.apache.activemq.apollo.broker.BrokerMessageDelivery;
 import org.apache.activemq.apollo.broker.Destination;
 import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.transport.stomp.Stomp;
-import org.apache.activemq.transport.stomp.StompFrame;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.activemq.util.buffer.Buffer;
 
 public class StompMessageDelivery extends BrokerMessageDelivery {
-
+    
     static final private AsciiBuffer ENCODING = new AsciiBuffer("stomp");
 
     private final StompFrame frame;
     private Destination destination;
     private Runnable completionCallback;
-    private String receiptId;
     private int priority = Integer.MIN_VALUE;
     private AsciiBuffer msgId;
     private PersistListener persistListener = null;
@@ -44,7 +41,6 @@
     public StompMessageDelivery(StompFrame frame, Destination destiantion) {
         this.frame = frame;
         this.destination = destiantion;
-        this.receiptId = frame.getHeaders().remove(Stomp.Headers.RECEIPT_REQUESTED);
     }
 
     public void setPersistListener(PersistListener listener) {
@@ -61,9 +57,9 @@
 
     public int getPriority() {
         if (priority == Integer.MIN_VALUE) {
-            String p = frame.getHeaders().get(Stomp.Headers.Message.PRORITY);
+            AsciiBuffer p = frame.get(Stomp.Headers.Message.PRORITY);
             try {
-                priority = (p == null) ? 4 : Integer.parseInt(p);
+                priority = (p == null) ? 4 : Integer.parseInt(p.toString());
             } catch (NumberFormatException e) {
                 priority = 4;
             }
@@ -78,9 +74,9 @@
      */
     public long getExpiration() {
         if (tte == Long.MIN_VALUE) {
-            String t = frame.getHeaders().get(Stomp.Headers.Message.EXPIRATION_TIME);
+            AsciiBuffer t = frame.get(Stomp.Headers.Message.EXPIRATION_TIME);
             try {
-                tte = (t == null) ? -1 : Long.parseLong(t);
+                tte = (t == null) ? -1 : Long.parseLong(t.toString());
             } catch (NumberFormatException e) {
                 tte = 1;
             }
@@ -90,9 +86,9 @@
 
     public AsciiBuffer getMsgId() {
         if (msgId == null) {
-            String p = frame.getHeaders().get(Stomp.Headers.Message.MESSAGE_ID);
+            AsciiBuffer p = frame.get(Stomp.Headers.Message.MESSAGE_ID);
             if (p != null) {
-                msgId = new AsciiBuffer(p);
+                msgId = new AsciiBuffer(p.toString());
             }
         }
         return msgId;
@@ -117,21 +113,20 @@
         return null;
     }
 
-    public StompFrame getStomeFame() {
+    public StompFrame getStompFrame() {
         return frame;
     }
 
-    public String getReceiptId() {
-        return receiptId;
-    }
-
     public boolean isPersistent() {
-        String p = frame.getHeaders().get(Stomp.Headers.Send.PERSISTENT);
-        return "true".equals(p);
+        AsciiBuffer persistent = frame.get(Stomp.Headers.Send.PERSISTENT);
+        if( persistent==null ) {
+            return false;
+        }
+        return Stomp.TRUE.equals(persistent);
     }
 
     public boolean isResponseRequired() {
-        return receiptId != null;
+        return frame.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED);
     }
 
     public void onMessagePersisted() {
@@ -146,9 +141,7 @@
     }
     
     public Buffer getStoreEncoded() {
-        Buffer bytes;
-        //TODO encode it:
-        //return bytes;
+        // TODO:
         throw new UnsupportedOperationException();
     }
     

Copied: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageEvaluationContext.java (from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageEvaluationContext.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageEvaluationContext.java?p2=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageEvaluationContext.java&p1=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageEvaluationContext.java&r1=830516&r2=830830&rev=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageEvaluationContext.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageEvaluationContext.java Thu Oct 29 02:55:32 2009
@@ -1,4 +1,4 @@
-package org.apache.activemq.broker.stomp;
+package org.apache.activemq.apollo.stomp;
 
 import org.apache.activemq.filter.Expression;
 import org.apache.activemq.filter.FilterException;

Copied: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompProtocolHandler.java (from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompProtocolHandler.java?p2=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompProtocolHandler.java&p1=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java&r1=830516&r2=830830&rev=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompProtocolHandler.java Thu Oct 29 02:55:32 2009
@@ -14,12 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.stomp;
+package org.apache.activemq.apollo.stomp;
 
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
-import java.io.UnsupportedEncodingException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -33,9 +32,12 @@
 import org.apache.activemq.apollo.broker.MessageDelivery;
 import org.apache.activemq.apollo.broker.ProtocolHandler;
 import org.apache.activemq.apollo.broker.Router;
-import org.apache.activemq.apollo.broker.ProtocolHandler.AbstractClientContext;
+import org.apache.activemq.apollo.stomp.Stomp.Headers;
+import org.apache.activemq.apollo.stomp.Stomp.Headers.Ack;
+import org.apache.activemq.apollo.stomp.Stomp.Headers.Response;
+import org.apache.activemq.apollo.stomp.Stomp.Headers.Send;
+import org.apache.activemq.apollo.stomp.Stomp.Headers.Subscribe;
 import org.apache.activemq.broker.store.Store.MessageRecord;
-import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.FilterException;
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
@@ -43,37 +45,43 @@
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.IFlowResource;
-import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 import org.apache.activemq.queue.QueueDispatchTarget;
 import org.apache.activemq.queue.SingleFlowRelay;
-import org.apache.activemq.selector.SelectorParser;
-import org.apache.activemq.transport.stomp.Stomp;
-import org.apache.activemq.transport.stomp.StompFrame;
-import org.apache.activemq.transport.stomp.StompSubscription;
 import org.apache.activemq.util.FactoryFinder;
 import org.apache.activemq.util.buffer.AsciiBuffer;
+import org.apache.activemq.util.buffer.Buffer;
 import org.apache.activemq.util.buffer.ByteArrayOutputStream;
 import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import static org.apache.activemq.apollo.stomp.Stomp.Commands.*;
+import static org.apache.activemq.apollo.stomp.Stomp.Headers.*;
+import static org.apache.activemq.apollo.stomp.Stomp.Responses.*;
+import static org.apache.activemq.selector.SelectorParser.*;
+import static org.apache.activemq.util.buffer.AsciiBuffer.*;
 
 public class StompProtocolHandler implements ProtocolHandler, StompMessageDelivery.PersistListener {
 
+    private static final Log LOG = LogFactory.getLog(StompProtocolHandler.class);
+    
     interface ActionHander {
         public void onStompFrame(StompFrame frame) throws Exception;
     }
 
     private InboundContext inboundContext;
 
-    protected final HashMap<String, ActionHander> actionHandlers = new HashMap<String, ActionHander>();
-    protected final HashMap<String, ConsumerContext> consumers = new HashMap<String, ConsumerContext>();
+    protected final HashMap<AsciiBuffer, ActionHander> actionHandlers = new HashMap<AsciiBuffer, ActionHander>();
+    protected final HashMap<AsciiBuffer, ConsumerContext> consumers = new HashMap<AsciiBuffer, ConsumerContext>();
 
     protected BrokerConnection connection;
 
     // TODO: need to update the FrameTranslator to normalize to new broker API
     // objects instead of to the openwire command set.
-    private final FrameTranslator translator = new LegacyFrameTranslator();
+    private final FrameTranslator translator = new DefaultFrameTranslator();
     private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/broker/stomp/frametranslator/");
     private SingleFlowRelay<MessageDelivery> outboundQueue;
 
@@ -82,9 +90,9 @@
 
     protected FrameTranslator translator(StompFrame frame) {
         try {
-            String header = frame.getHeaders().get(Stomp.Headers.TRANSFORMATION);
+            AsciiBuffer header = frame.get(TRANSFORMATION);
             if (header != null) {
-                return (FrameTranslator) FRAME_TRANSLATOR_FINDER.newInstance(header);
+                return (FrameTranslator) FRAME_TRANSLATOR_FINDER.newInstance(header.toString());
             }
         } catch (Exception ignore) {
         }
@@ -92,53 +100,53 @@
     }
 
     public StompProtocolHandler() {
-        actionHandlers.put(Stomp.Commands.CONNECT, new ActionHander() {
+        actionHandlers.put(CONNECT, new ActionHander() {
             public void onStompFrame(StompFrame frame) throws Exception {
-                StompFrame response = new StompFrame(Stomp.Responses.CONNECTED);
+                StompFrame response = new StompFrame(CONNECTED);
                 connection.write(response);
             }
         });
-        actionHandlers.put(Stomp.Commands.SEND, new ActionHander() {
-
+        actionHandlers.put(SEND, new ActionHander() {
             public void onStompFrame(StompFrame frame) throws Exception {
-                String dest = frame.getHeaders().get(Stomp.Headers.Send.DESTINATION);
-                Destination destination = translator(frame).convertToDestination(StompProtocolHandler.this, dest);
+                AsciiBuffer dest = frame.get(Send.DESTINATION);
+                Destination destination = translator(frame).convert(dest);
 
-                frame.setAction(Stomp.Responses.MESSAGE);
+                frame.setAction(MESSAGE);
                 StompMessageDelivery md = new StompMessageDelivery(frame, destination);
                 inboundContext.onReceive(md);
             }
         });
-        actionHandlers.put(Stomp.Commands.SUBSCRIBE, new ActionHander() {
+        actionHandlers.put(SUBSCRIBE, new ActionHander() {
             public void onStompFrame(StompFrame frame) throws Exception {
-                ConsumerContext ctx = new ConsumerContext(frame);
+                AsciiBuffer subscriptionId = frame.get(Subscribe.ID);
+                ConsumerContext ctx = new ConsumerContext(subscriptionId.toString(), frame);
                 consumers.put(ctx.stompDestination, ctx);
                 ack(frame);
             }
         });
-        actionHandlers.put(Stomp.Commands.UNSUBSCRIBE, new ActionHander() {
+        actionHandlers.put(UNSUBSCRIBE, new ActionHander() {
             public void onStompFrame(StompFrame frame) throws Exception {
             }
         });
-        actionHandlers.put(Stomp.Commands.ACK, new ActionHander() {
+        actionHandlers.put(ACK, new ActionHander() {
             public void onStompFrame(StompFrame frame) throws Exception {
-                frame.getHeaders().get(Stomp.Headers.Ack.MESSAGE_ID);
+                frame.get(Ack.MESSAGE_ID);
             }
         });
-        actionHandlers.put(Stomp.Commands.DISCONNECT, new ActionHander() {
+        actionHandlers.put(DISCONNECT, new ActionHander() {
             public void onStompFrame(StompFrame frame) throws Exception {
             }
         });
 
-        actionHandlers.put(Stomp.Commands.ABORT_TRANSACTION, new ActionHander() {
+        actionHandlers.put(ABORT_TRANSACTION, new ActionHander() {
             public void onStompFrame(StompFrame frame) throws Exception {
             }
         });
-        actionHandlers.put(Stomp.Commands.BEGIN_TRANSACTION, new ActionHander() {
+        actionHandlers.put(BEGIN_TRANSACTION, new ActionHander() {
             public void onStompFrame(StompFrame frame) throws Exception {
             }
         });
-        actionHandlers.put(Stomp.Commands.COMMIT_TRANSACTION, new ActionHander() {
+        actionHandlers.put(COMMIT_TRANSACTION, new ActionHander() {
             public void onStompFrame(StompFrame frame) throws Exception {
             }
         });
@@ -169,7 +177,7 @@
     public void onCommand(Object o) {
         StompFrame command = (StompFrame) o;
         try {
-            String action = command.getAction();
+            AsciiBuffer action = command.getAction();
             ActionHander actionHander = actionHandlers.get(action);
             if (actionHander == null) {
                 throw new IOException("Unsupported command: " + action);
@@ -185,17 +193,17 @@
                 error.printStackTrace(stream);
                 stream.close();
 
-                HashMap<String, String> headers = new HashMap<String, String>();
-                headers.put(Stomp.Headers.Error.MESSAGE, error.getMessage());
+                HashMap<AsciiBuffer, AsciiBuffer> headers = new HashMap<AsciiBuffer, AsciiBuffer>();
+                headers.put(Headers.Error.MESSAGE, new AsciiBuffer(error.getMessage()));
 
                 if (command != null) {
-                    final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+                    final AsciiBuffer receiptId = command.get(RECEIPT_REQUESTED);
                     if (receiptId != null) {
-                        headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+                        headers.put(Response.RECEIPT_ID, receiptId);
                     }
                 }
 
-                StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+                StompFrame errorMessage = new StompFrame(ERROR, headers, baos.toBuffer());
                 connection.write(errorMessage);
                 connection.stop();
             } catch (Exception ignore) {
@@ -205,6 +213,7 @@
 
     public void onException(Exception error) {
         if (!connection.isStopping()) {
+            LOG.debug("Unexpected exception.. closing..", error);
             try {
 
                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -212,7 +221,7 @@
                 error.printStackTrace(stream);
                 stream.close();
 
-                sendError(error.getMessage(), baos.toByteArray());
+                sendError(new AsciiBuffer(error.getMessage()), baos.toBuffer());
                 connection.stop();
 
             } catch (Exception ignore) {
@@ -269,39 +278,39 @@
         public WindowLimiter<MessageDelivery> limiter;
         private FrameTranslator translator;
         private String subscriptionId;
-        private String stompDestination;
+        private AsciiBuffer stompDestination;
         private Destination destination;
-        private String ackMode;
+        private AsciiBuffer ackMode;
 
         private LinkedHashMap<AsciiBuffer, SubscriptionDelivery<MessageDelivery>> sentMessageIds = new LinkedHashMap<AsciiBuffer, SubscriptionDelivery<MessageDelivery>>();
 
         private boolean durable;
 
-        public ConsumerContext(final StompFrame subscribe) throws Exception {
-            super(subscribe.getHeaders().get(Stomp.Headers.Subscribe.ID), null);
+        public ConsumerContext(String id, final StompFrame subscribe) throws Exception {
+            super(id, null);
             translator = translator(subscribe);
 
-            Map<String, String> headers = subscribe.getHeaders();
-            stompDestination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
-            destination = translator.convertToDestination(StompProtocolHandler.this, stompDestination);
-            subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
+            Map<AsciiBuffer, AsciiBuffer> headers = subscribe.getHeaders();
+            stompDestination = headers.get(Subscribe.DESTINATION);
+            destination = translator.convert(stompDestination);
+            subscriptionId = string(headers.get(Subscribe.ID));
             
-            ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
-            if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
-                ackMode = StompSubscription.CLIENT_ACK;
-            } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
-                ackMode = StompSubscription.INDIVIDUAL_ACK;
-                sendError(StompSubscription.INDIVIDUAL_ACK + " not supported.");
+            AsciiBuffer requestedAckMode = headers.get(Subscribe.ACK_MODE);
+            if (Subscribe.AckModeValues.CLIENT.equals(requestedAckMode)) {
+                ackMode = Subscribe.AckModeValues.CLIENT;
+            } else if (Subscribe.AckModeValues.INDIVIDUAL.equals(requestedAckMode)) {
+                ackMode = Subscribe.AckModeValues.INDIVIDUAL;
+                sendError(Subscribe.AckModeValues.INDIVIDUAL + " not supported.");
                 connection.stop();
                 return;
             } else {
-                ackMode = StompSubscription.AUTO_ACK;
+                ackMode = Subscribe.AckModeValues.AUTO;
             }
 
-            selectorString = subscribe.getHeaders().get(Stomp.Headers.Subscribe.SELECTOR);
+            selectorString = string(subscribe.get(Subscribe.SELECTOR));
             selector = parseSelector(selectorString);
 
-            if (ackMode != StompSubscription.AUTO_ACK) {
+            if (ackMode != Subscribe.AckModeValues.AUTO) {
                 Flow flow = new Flow("broker-" + subscriptionId + "-outbound", false);
                 limiter = new WindowLimiter<MessageDelivery>(true, flow, connection.getOutputWindowSize(), connection.getOutputResumeThreshold()) {
                     @Override
@@ -321,10 +330,10 @@
         }
 
         public void ack(StompFrame info) throws Exception {
-            if (ackMode == StompSubscription.CLIENT_ACK || ackMode == StompSubscription.INDIVIDUAL_ACK) {
+            if (ackMode == Subscribe.AckModeValues.CLIENT || ackMode == Subscribe.AckModeValues.INDIVIDUAL) {
                 int credits = 0;
                 synchronized (allSentMessageIds) {
-                    AsciiBuffer mid = new AsciiBuffer(info.getHeaders().get(Stomp.Headers.Ack.MESSAGE_ID));
+                    AsciiBuffer mid = new AsciiBuffer(info.get(Ack.MESSAGE_ID));
                     for (Iterator<AsciiBuffer> iterator = sentMessageIds.keySet().iterator(); iterator.hasNext();) {
                         AsciiBuffer next = iterator.next();
                         iterator.remove();
@@ -399,7 +408,7 @@
         private void addInternal(MessageDelivery message, ISourceController<?> controller, SubscriptionDelivery<MessageDelivery> callback)
         {
             StompFrame frame = message.asType(StompFrame.class);
-            if (ackMode == StompSubscription.CLIENT_ACK || ackMode == StompSubscription.INDIVIDUAL_ACK) {
+            if (ackMode == Subscribe.AckModeValues.CLIENT || ackMode == Subscribe.AckModeValues.INDIVIDUAL) {
                 synchronized (allSentMessageIds) {
                     AsciiBuffer msgId = message.getMsgId();
                     sentMessageIds.put(msgId, callback);
@@ -497,21 +506,13 @@
     }
 
     private void sendError(String message) {
-        sendError(message, StompFrame.NO_DATA);
-    }
-
-    private void sendError(String message, String details) {
-        try {
-            sendError(message, details.getBytes("UTF-8"));
-        } catch (UnsupportedEncodingException e) {
-            throw new RuntimeException(e);
-        }
+        sendError(ascii(message), StompFrame.NO_DATA);
     }
 
-    private void sendError(String message, byte[] details) {
-        HashMap<String, String> headers = new HashMap<String, String>();
-        headers.put(Stomp.Headers.Error.MESSAGE, message);
-        StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, details);
+    private void sendError(AsciiBuffer message, Buffer details) {
+        HashMap<AsciiBuffer, AsciiBuffer> headers = new HashMap<AsciiBuffer, AsciiBuffer>();
+        headers.put(Headers.Error.MESSAGE, message);
+        StompFrame errorMessage = new StompFrame(ERROR, headers, details);
         connection.write(errorMessage);
     }
 
@@ -519,50 +520,28 @@
     // met.
     public void onMessagePersisted(StompMessageDelivery delivery) {
         // TODO this method must not block:
-        ack(delivery.getStomeFame());
+        ack(delivery.getStompFrame());
     }
 
     void ack(StompFrame frame) {
-        ack(frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
+        ack(frame.get(RECEIPT_REQUESTED));
     }
 
-    private void ack(String receiptId) {
+    private void ack(AsciiBuffer receiptId) {
         if (receiptId != null) {
             StompFrame receipt = new StompFrame();
-            receipt.setAction(Stomp.Responses.RECEIPT);
-            receipt.setHeaders(new HashMap<String, String>(1));
-            receipt.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+            receipt.setAction(RECEIPT);
+            receipt.setHeaders(new HashMap<AsciiBuffer, AsciiBuffer>(1));
+            receipt.put(Response.RECEIPT_ID, receiptId);
             connection.write(receipt);
         }
     }
 
-    static public Destination convert(ActiveMQDestination dest) {
-        if (dest.isComposite()) {
-            ActiveMQDestination[] compositeDestinations = dest.getCompositeDestinations();
-            Destination.MultiDestination md = new Destination.MultiDestination();
-            for (int i = 0; i < compositeDestinations.length; i++) {
-                md.add(convert(compositeDestinations[i]));
-            }
-            return md;
-        }
-        AsciiBuffer domain;
-        if (dest.isQueue()) {
-            domain = Router.QUEUE_DOMAIN;
+    private static BooleanExpression parseSelector(String selectorString) throws FilterException {
+        if (selectorString == null) {
+            return null;
         }
-        if (dest.isTopic()) {
-            domain = Router.TOPIC_DOMAIN;
-        } else {
-            throw new IllegalArgumentException("Unsupported domain type: " + dest);
-        }
-        return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName()));
-    }
-
-    private static BooleanExpression parseSelector(String selector) throws FilterException {
-        BooleanExpression rc = null;
-        if (selector != null) {
-            rc = SelectorParser.parse(selector);
-        }
-        return rc;
+        return parse(selectorString);
     }
 
     public BrokerConnection getConnection() {
@@ -577,22 +556,9 @@
     public void setWireFormat(WireFormat wireFormat) {
     }
 
-    public String getCreatedTempDestinationName(ActiveMQDestination activeMQDestination) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    public ActiveMQDestination createTempQueue(String name) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    public ActiveMQDestination createTempTopic(String name) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
     public BrokerMessageDelivery createMessageDelivery(MessageRecord record) {
         throw new UnsupportedOperationException();
     }
+    
+
 }

Copied: activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompSslTransportFactory.java (from r830516, activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompSslTransportFactory.java?p2=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompSslTransportFactory.java&p1=activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java&r1=830516&r2=830830&rev=830830&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompSslTransportFactory.java Thu Oct 29 02:55:32 2009
@@ -14,35 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.stomp;
+package org.apache.activemq.apollo.stomp;
 
-import java.util.Map;
-
-import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.tcp.SslTransportFactory;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.apollo.broker.BrokerAware;
-import org.apache.activemq.apollo.broker.Broker;
 
 /**
  * A <a href="http://activemq.apache.org/stomp/">STOMP</a> over SSL transport factory
  * 
  */
-public class StompSslTransportFactory extends SslTransportFactory implements BrokerAware {
-    private Broker broker;
-
+public class StompSslTransportFactory extends SslTransportFactory {
     protected String getDefaultWireFormatType() {
         return "stomp";
     }
-
-    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), broker);
-        IntrospectionSupport.setProperties(transport, options);
-        return super.compositeConfigure(transport, format, options);
-    }
-
-    public void setBroker(Broker broker) {
-        this.broker = broker;
-    }
 }



Mime
View raw message