activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r781822 [1/2] - in /activemq/sandbox/activemq-flow: activemq-all/ activemq-stomp/ activemq-stomp/src/main/java/org/apache/activemq/broker/ activemq-stomp/src/main/java/org/apache/activemq/transport/ activemq-stomp/src/main/java/org/apache/a...
Date Thu, 04 Jun 2009 18:19:19 GMT
Author: chirino
Date: Thu Jun  4 18:19:18 2009
New Revision: 781822

URL: http://svn.apache.org/viewvc?rev=781822&view=rev
Log:
removing 5.x dependency in the stomp module.

Added:
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/BrokerServiceAware.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolException.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrameError.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/package.html
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/xbean/
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/xbean/XBeanBrokerService.java
Modified:
    activemq/sandbox/activemq-flow/activemq-all/pom.xml
    activemq/sandbox/activemq-flow/activemq-stomp/pom.xml

Modified: activemq/sandbox/activemq-flow/activemq-all/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-all/pom.xml?rev=781822&r1=781821&r2=781822&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-all/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-all/pom.xml Thu Jun  4 18:19:18 2009
@@ -35,6 +35,10 @@
   
     <dependency>
       <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-broker</artifactId>
     </dependency>
     <dependency>

Modified: activemq/sandbox/activemq-flow/activemq-stomp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/pom.xml?rev=781822&r1=781821&r2=781822&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/pom.xml Thu Jun  4 18:19:18 2009
@@ -43,15 +43,12 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-core</artifactId>
-      <version>5.3-SNAPSHOT</version>
-    </dependency>
-
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-context</artifactId>
+    </dependency>    
     <dependency>
       <groupId>com.thoughtworks.xstream</groupId>
       <artifactId>xstream</artifactId>
-      <optional>true</optional>
     </dependency>        
     
     <!-- Testing Dependencies -->

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/BrokerService.java (added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/BrokerService.java Thu Jun  4 18:19:18 2009
@@ -0,0 +1,5 @@
+package org.apache.activemq.broker;
+
+public class BrokerService {
+
+}

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/BrokerServiceAware.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/BrokerServiceAware.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/BrokerServiceAware.java (added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/BrokerServiceAware.java Thu Jun  4 18:19:18 2009
@@ -0,0 +1,5 @@
+package org.apache.activemq.broker;
+
+public interface BrokerServiceAware {
+
+}

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java (added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java Thu Jun  4 18:19:18 2009
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+
+/**
+ * Implementations of this interface are used to map back and forth from Stomp
+ * to ActiveMQ. There are several standard mappings which are semantically the
+ * same, the inner class, Helper, provides functions to copy those properties
+ * from one to the other
+ */
+public interface FrameTranslator {
+    ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame frame) throws JMSException, ProtocolException;
+
+    StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException;
+
+    String convertDestination(ProtocolConverter converter, Destination d);
+
+    ActiveMQDestination convertDestination(ProtocolConverter converter, String name) throws ProtocolException;
+
+    /**
+     * Helper class which holds commonly needed functions used when implementing
+     * FrameTranslators
+     */
+    static final class Helper {
+
+        private Helper() {
+        }
+
+        public static void copyStandardHeadersFromMessageToFrame(ProtocolConverter converter, ActiveMQMessage message, StompFrame command, FrameTranslator ft) throws IOException {
+            final Map<String, String> headers = command.getHeaders();
+            headers.put(Stomp.Headers.Message.DESTINATION, ft.convertDestination(converter, message.getDestination()));
+            headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID());
+
+            if (message.getJMSCorrelationID() != null) {
+                headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getJMSCorrelationID());
+            }
+            headers.put(Stomp.Headers.Message.EXPIRATION_TIME, "" + message.getJMSExpiration());
+
+            if (message.getJMSRedelivered()) {
+                headers.put(Stomp.Headers.Message.REDELIVERED, "true");
+            }
+            headers.put(Stomp.Headers.Message.PRORITY, "" + message.getJMSPriority());
+
+            if (message.getJMSReplyTo() != null) {
+                headers.put(Stomp.Headers.Message.REPLY_TO, ft.convertDestination(converter, message.getJMSReplyTo()));
+            }
+            headers.put(Stomp.Headers.Message.TIMESTAMP, "" + message.getJMSTimestamp());
+
+            if (message.getJMSType() != null) {
+                headers.put(Stomp.Headers.Message.TYPE, message.getJMSType());
+            }
+
+            // now lets add all the message headers
+            final Map<String, Object> properties = message.getProperties();
+            if (properties != null) {
+                for (Map.Entry<String, Object> prop : properties.entrySet()) {
+                    headers.put(prop.getKey(), "" + prop.getValue());
+                }
+            }
+        }
+
+        public static void copyStandardHeadersFromFrameToMessage(ProtocolConverter converter, StompFrame command, ActiveMQMessage msg, FrameTranslator ft) throws ProtocolException, JMSException {
+            final Map<String, String> headers = new HashMap<String, String>(command.getHeaders());
+            final String destination = headers.remove(Stomp.Headers.Send.DESTINATION);
+            msg.setDestination(ft.convertDestination(converter, destination));
+
+            // the standard JMS headers
+            msg.setJMSCorrelationID(headers.remove(Stomp.Headers.Send.CORRELATION_ID));
+
+            Object o = headers.remove(Stomp.Headers.Send.EXPIRATION_TIME);
+            if (o != null) {
+                msg.setJMSExpiration(Long.parseLong((String)o));
+            }
+
+            o = headers.remove(Stomp.Headers.Send.PRIORITY);
+            if (o != null) {
+                msg.setJMSPriority(Integer.parseInt((String)o));
+            }
+
+            o = headers.remove(Stomp.Headers.Send.TYPE);
+            if (o != null) {
+                msg.setJMSType((String)o);
+            }
+
+            o = headers.remove(Stomp.Headers.Send.REPLY_TO);
+            if (o != null) {
+                msg.setJMSReplyTo(ft.convertDestination(converter, (String)o));
+            }
+
+            o = headers.remove(Stomp.Headers.Send.PERSISTENT);
+            if (o != null) {
+                msg.setPersistent("true".equals(o));
+            }
+
+            // now the general headers
+            msg.setProperties(headers);
+        }
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java (added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java Thu Jun  4 18:19:18 2009
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.HierarchicalStreamReader;
+import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
+import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
+import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
+import com.thoughtworks.xstream.io.xml.XppReader;
+
+/**
+ * Frame translator implementation that uses XStream to convert messages to and
+ * from XML and JSON
+ * 
+ * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
+ */
+public class JmsFrameTranslator extends LegacyFrameTranslator implements
+		ApplicationContextAware {
+
+	XStream xStream = null;
+	ApplicationContext applicationContext;
+
+	public ActiveMQMessage convertFrame(ProtocolConverter converter,
+			StompFrame command) throws JMSException, ProtocolException {
+		Map headers = command.getHeaders();
+		ActiveMQMessage msg;
+		String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
+		if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
+			msg = super.convertFrame(converter, command);
+		} else {
+			HierarchicalStreamReader in;
+
+			try {
+				String text = new String(command.getContent(), "UTF-8");
+				switch (Stomp.Transformations.getValue(transformation)) {
+				case JMS_OBJECT_XML:
+					in = new XppReader(new StringReader(text));
+					msg = createObjectMessage(in);
+					break;
+				case JMS_OBJECT_JSON:
+					in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
+					msg = createObjectMessage(in);
+					break;
+				case JMS_MAP_XML:
+					in = new XppReader(new StringReader(text));
+					msg = createMapMessage(in);
+					break;
+				case JMS_MAP_JSON:
+					in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
+					msg = createMapMessage(in);
+					break;
+				default:
+					throw new Exception("Unkown transformation: " + transformation);
+				}
+			} catch (Throwable e) {
+				command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
+				msg = super.convertFrame(converter, command);
+			}
+		}
+		FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
+		return msg;
+	}
+
+	public StompFrame convertMessage(ProtocolConverter converter,
+			ActiveMQMessage message) throws IOException, JMSException {
+		if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
+			StompFrame command = new StompFrame();
+			command.setAction(Stomp.Responses.MESSAGE);
+			Map<String, String> headers = new HashMap<String, String>(25);
+			command.setHeaders(headers);
+
+			FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
+					converter, message, command, this);
+			ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
+			command.setContent(marshall(msg.getObject(),
+					headers.get(Stomp.Headers.TRANSFORMATION))
+					.getBytes("UTF-8"));
+			return command;
+
+		} else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { 
+			StompFrame command = new StompFrame();
+			command.setAction(Stomp.Responses.MESSAGE);
+			Map<String, String> headers = new HashMap<String, String>(25);
+			command.setHeaders(headers);
+
+			FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
+					converter, message, command, this);
+			ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
+			command.setContent(marshall((Serializable)msg.getContentMap(),
+					headers.get(Stomp.Headers.TRANSFORMATION))
+					.getBytes("UTF-8"));
+			return command;		
+		} else {
+			return super.convertMessage(converter, message);
+		}
+	}
+
+	/**
+	 * Marshalls the Object to a string using XML or JSON encoding
+	 */
+	protected String marshall(Serializable object, String transformation)
+			throws JMSException {
+		StringWriter buffer = new StringWriter();
+		HierarchicalStreamWriter out;
+		if (transformation.toLowerCase().endsWith("json")) {
+			out = new JettisonMappedXmlDriver().createWriter(buffer);
+		} else {
+			out = new PrettyPrintWriter(buffer);
+		}
+		getXStream().marshal(object, out);
+		return buffer.toString();
+	}
+
+	protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
+		ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
+		Object obj = getXStream().unmarshal(in);
+		objMsg.setObject((Serializable) obj);
+		return objMsg;
+	}
+	
+	protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
+		ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
+		Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
+		for (String key : map.keySet()) {
+			mapMsg.setObject(key, map.get(key));
+		}
+		return mapMsg;
+	}
+	
+	
+
+	// Properties
+	// -------------------------------------------------------------------------
+	public XStream getXStream() {
+		if (xStream == null) {
+			xStream = createXStream();
+		}
+		return xStream;
+	}
+
+	public void setXStream(XStream xStream) {
+		this.xStream = xStream;
+	}
+
+	// Implementation methods
+	// -------------------------------------------------------------------------
+	protected XStream createXStream() {
+		XStream xstream = null;
+		if (applicationContext != null) {
+			String[] names = applicationContext
+					.getBeanNamesForType(XStream.class);
+			for (int i = 0; i < names.length; i++) {
+				String name = names[i];
+				xstream = (XStream) applicationContext.getBean(name);
+				if (xstream != null) {
+					break;
+				}
+			}
+		}
+
+		if (xstream == null) {
+			xstream = new XStream();
+		}
+		return xstream;
+
+	}
+
+	public void setApplicationContext(ApplicationContext applicationContext)
+			throws BeansException {
+		this.applicationContext = applicationContext;
+	}
+
+}

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java (added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java Thu Jun  4 18:19:18 2009
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+
+/**
+ * Implements ActiveMQ 4.0 translations
+ */
+public class LegacyFrameTranslator implements FrameTranslator {
+	
+	
+    public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
+        final Map headers = command.getHeaders();
+        final ActiveMQMessage msg;
+        if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
+            headers.remove(Stomp.Headers.CONTENT_LENGTH);
+            ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
+            bm.writeBytes(command.getContent());
+            msg = bm;
+        } else {
+            ActiveMQTextMessage text = new ActiveMQTextMessage();
+            try {
+                text.setText(new String(command.getContent(), "UTF-8"));
+            } catch (Throwable e) {
+                throw new ProtocolException("Text could not bet set: " + e, false, e);
+            }
+            msg = text;
+        }
+        FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
+        return msg;
+    }
+
+    public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException {
+        StompFrame command = new StompFrame();
+        command.setAction(Stomp.Responses.MESSAGE);
+        Map<String, String> headers = new HashMap<String, String>(25);
+        command.setHeaders(headers);
+
+        FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this);
+
+        if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
+
+            ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
+            command.setContent(msg.getText().getBytes("UTF-8"));
+
+        } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
+
+            ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
+            msg.setReadOnlyBody(true);
+            byte[] data = new byte[(int)msg.getBodyLength()];
+            msg.readBytes(data);
+
+            headers.put(Stomp.Headers.CONTENT_LENGTH, "" + data.length);
+            command.setContent(data);
+        }
+        return command;
+    }
+
+    public String convertDestination(ProtocolConverter converter, Destination d) {
+        if (d == null) {
+            return null;
+        }
+        ActiveMQDestination activeMQDestination = (ActiveMQDestination)d;
+        String physicalName = activeMQDestination.getPhysicalName();
+
+        String rc = converter.getCreatedTempDestinationName(activeMQDestination);
+        if( rc!=null ) {
+        	return rc;
+        }
+        
+        StringBuffer buffer = new StringBuffer();
+        if (activeMQDestination.isQueue()) {
+            if (activeMQDestination.isTemporary()) {
+                buffer.append("/remote-temp-queue/");
+            } else {
+                buffer.append("/queue/");
+            }
+        } else {
+            if (activeMQDestination.isTemporary()) {
+                buffer.append("/remote-temp-topic/");
+            } else {
+                buffer.append("/topic/");
+            }
+        }
+        buffer.append(physicalName);
+        return buffer.toString();
+    }
+
+    public ActiveMQDestination convertDestination(ProtocolConverter converter, String name) throws ProtocolException {
+        if (name == null) {
+            return null;
+        } else if (name.startsWith("/queue/")) {
+            String qName = name.substring("/queue/".length(), name.length());
+            return ActiveMQDestination.createDestination(qName, ActiveMQDestination.QUEUE_TYPE);
+        } else if (name.startsWith("/topic/")) {
+            String tName = name.substring("/topic/".length(), name.length());
+            return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE);
+        } else if (name.startsWith("/remote-temp-queue/")) {
+            String tName = name.substring("/remote-temp-queue/".length(), name.length());
+            return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE);
+        } else if (name.startsWith("/remote-temp-topic/")) {
+            String tName = name.substring("/remote-temp-topic/".length(), name.length());
+            return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE);
+        } else if (name.startsWith("/temp-queue/")) {
+            return converter.createTempQueue(name);
+        } else if (name.startsWith("/temp-topic/")) {
+            return converter.createTempTopic(name);
+        } else {
+            throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
+                                        + "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+        }
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Thu Jun  4 18:19:18 2009
@@ -0,0 +1,630 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.FactoryFinder;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+/**
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class ProtocolConverter {
+
+    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+
+    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
+    private final SessionId sessionId = new SessionId(connectionId, -1);
+    private final ProducerId producerId = new ProducerId(sessionId, 1);
+
+    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
+    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
+    private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
+    private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
+
+    private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
+    private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
+    private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
+    private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
+    private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
+    private final StompTransportFilter transportFilter;
+
+    private final Object commnadIdMutex = new Object();
+    private int lastCommandId;
+    private final AtomicBoolean connected = new AtomicBoolean(false);
+    private final FrameTranslator frameTranslator;
+    private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
+    private final ApplicationContext applicationContext;
+
+    public ProtocolConverter(StompTransportFilter stompTransportFilter, FrameTranslator translator, ApplicationContext applicationContext) {
+        this.transportFilter = stompTransportFilter;
+        this.frameTranslator = translator;
+        this.applicationContext = applicationContext;
+    }
+
+    protected int generateCommandId() {
+        synchronized (commnadIdMutex) {
+            return lastCommandId++;
+        }
+    }
+
+    protected ResponseHandler createResponseHandler(final StompFrame command) {
+        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+        if (receiptId != null) {
+            return new ResponseHandler() {
+                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+                    if (response.isException()) {
+                        // Generally a command can fail.. but that does not invalidate the connection.
+                        // We report back the failure but we don't close the connection.
+                        Throwable exception = ((ExceptionResponse)response).getException();
+                        handleException(exception, command);
+                    } else {
+                        StompFrame sc = new StompFrame();
+                        sc.setAction(Stomp.Responses.RECEIPT);
+                        sc.setHeaders(new HashMap<String, String>(1));
+                        sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+                        transportFilter.sendToStomp(sc);
+                    }
+                }
+            };
+        }
+        return null;
+    }
+
+    protected void sendToActiveMQ(Command command, ResponseHandler handler) {
+        command.setCommandId(generateCommandId());
+        if (handler != null) {
+            command.setResponseRequired(true);
+            resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
+        }
+        transportFilter.sendToActiveMQ(command);
+    }
+
+    protected void sendToStomp(StompFrame command) throws IOException {
+        transportFilter.sendToStomp(command);
+    }
+    
+    protected FrameTranslator findTranslator(String header) {
+		FrameTranslator translator = frameTranslator;
+		try {
+			if (header != null) {
+				translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
+						.newInstance(header);
+				if (translator instanceof ApplicationContextAware) {
+					((ApplicationContextAware)translator).setApplicationContext(applicationContext);
+				}
+			}
+		} catch (Exception ignore) {
+			// if anything goes wrong use the default translator
+		}
+		
+		return translator;
+	}
+
+    /**
+	 * Convert a stomp command
+	 * 
+	 * @param command
+	 */
+    public void onStompCommand(StompFrame command) throws IOException, JMSException {
+        try {
+
+            if (command.getClass() == StompFrameError.class) {
+                throw ((StompFrameError)command).getException();
+            }
+
+            String action = command.getAction();
+            if (action.startsWith(Stomp.Commands.SEND)) {
+                onStompSend(command);
+            } else if (action.startsWith(Stomp.Commands.ACK)) {
+                onStompAck(command);
+            } else if (action.startsWith(Stomp.Commands.BEGIN)) {
+                onStompBegin(command);
+            } else if (action.startsWith(Stomp.Commands.COMMIT)) {
+                onStompCommit(command);
+            } else if (action.startsWith(Stomp.Commands.ABORT)) {
+                onStompAbort(command);
+            } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
+                onStompSubscribe(command);
+            } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
+                onStompUnsubscribe(command);
+            } else if (action.startsWith(Stomp.Commands.CONNECT)) {
+                onStompConnect(command);
+            } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
+                onStompDisconnect(command);
+            } else {
+                throw new ProtocolException("Unknown STOMP action: " + action);
+            }
+
+        } catch (ProtocolException e) {
+            handleException(e, command);
+            // Some protocol errors can cause the connection to get closed.
+            if( e.isFatal() ) {
+               getTransportFilter().onException(e);
+            }
+        }
+    }
+    
+    protected void handleException(Throwable exception, StompFrame command) throws IOException {
+        // Let the stomp client know about any protocol errors.
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
+        exception.printStackTrace(stream);
+        stream.close();
+
+        HashMap<String, String> headers = new HashMap<String, String>();
+        headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
+
+        if (command != null) {
+        	final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+        	if (receiptId != null) {
+        		headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+        	}
+        }
+
+        StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+        sendToStomp(errorMessage);
+    }
+
+    protected void onStompSend(StompFrame command) throws IOException, JMSException {
+        checkConnected();
+
+        Map<String, String> headers = command.getHeaders();
+        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
+        headers.remove("transaction");
+
+        ActiveMQMessage message = convertMessage(command);
+
+        message.setProducerId(producerId);
+        MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
+        message.setMessageId(id);
+        message.setJMSTimestamp(System.currentTimeMillis());
+
+        if (stompTx != null) {
+            TransactionId activemqTx = transactions.get(stompTx);
+            if (activemqTx == null) {
+                throw new ProtocolException("Invalid transaction id: " + stompTx);
+            }
+            message.setTransactionId(activemqTx);
+        }
+
+        message.onSend();
+        sendToActiveMQ(message, createResponseHandler(command));
+
+    }
+
+    protected void onStompAck(StompFrame command) throws ProtocolException {
+        checkConnected();
+
+        // TODO: acking with just a message id is very bogus
+        // since the same message id could have been sent to 2 different
+        // subscriptions
+        // on the same stomp connection. For example, when 2 subs are created on
+        // the same topic.
+
+        Map<String, String> headers = command.getHeaders();
+        String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
+        if (messageId == null) {
+            throw new ProtocolException("ACK received without a message-id to acknowledge!");
+        }
+
+        TransactionId activemqTx = null;
+        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
+        if (stompTx != null) {
+            activemqTx = transactions.get(stompTx);
+            if (activemqTx == null) {
+                throw new ProtocolException("Invalid transaction id: " + stompTx);
+            }
+        }
+
+        boolean acked = false;
+        for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
+            StompSubscription sub = iter.next();
+            MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
+            if (ack != null) {
+                ack.setTransactionId(activemqTx);
+                sendToActiveMQ(ack, createResponseHandler(command));
+                acked = true;
+                break;
+            }
+        }
+
+        if (!acked) {
+            throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
+        }
+
+    }
+
+    protected void onStompBegin(StompFrame command) throws ProtocolException {
+        checkConnected();
+
+        Map<String, String> headers = command.getHeaders();
+
+        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
+
+        if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
+            throw new ProtocolException("Must specify the transaction you are beginning");
+        }
+
+        if (transactions.get(stompTx) != null) {
+            throw new ProtocolException("The transaction was allready started: " + stompTx);
+        }
+
+        LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
+        transactions.put(stompTx, activemqTx);
+
+        TransactionInfo tx = new TransactionInfo();
+        tx.setConnectionId(connectionId);
+        tx.setTransactionId(activemqTx);
+        tx.setType(TransactionInfo.BEGIN);
+
+        sendToActiveMQ(tx, createResponseHandler(command));
+
+    }
+
+    protected void onStompCommit(StompFrame command) throws ProtocolException {
+        checkConnected();
+
+        Map<String, String> headers = command.getHeaders();
+
+        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
+        if (stompTx == null) {
+            throw new ProtocolException("Must specify the transaction you are committing");
+        }
+
+        TransactionId activemqTx = transactions.remove(stompTx);
+        if (activemqTx == null) {
+            throw new ProtocolException("Invalid transaction id: " + stompTx);
+        }
+        
+        for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
+            StompSubscription sub = iter.next();
+            sub.onStompCommit(activemqTx);
+        }
+
+        TransactionInfo tx = new TransactionInfo();
+        tx.setConnectionId(connectionId);
+        tx.setTransactionId(activemqTx);
+        tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
+
+        sendToActiveMQ(tx, createResponseHandler(command));
+        
+    }
+
+    protected void onStompAbort(StompFrame command) throws ProtocolException {
+        checkConnected();
+        Map<String, String> headers = command.getHeaders();
+
+        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
+        if (stompTx == null) {
+            throw new ProtocolException("Must specify the transaction you are committing");
+        }
+
+        TransactionId activemqTx = transactions.remove(stompTx);
+        if (activemqTx == null) {
+            throw new ProtocolException("Invalid transaction id: " + stompTx);
+        }
+        for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
+            StompSubscription sub = iter.next();
+            try {
+            	sub.onStompAbort(activemqTx);
+            } catch (Exception e) {
+            	throw new ProtocolException("Transaction abort failed", false, e);
+            }
+        }
+
+        TransactionInfo tx = new TransactionInfo();
+        tx.setConnectionId(connectionId);
+        tx.setTransactionId(activemqTx);
+        tx.setType(TransactionInfo.ROLLBACK);
+
+        sendToActiveMQ(tx, createResponseHandler(command));
+
+    }
+
+    protected void onStompSubscribe(StompFrame command) throws ProtocolException {
+        checkConnected();
+        FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
+        Map<String, String> headers = command.getHeaders();
+
+        String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
+        String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
+
+        ActiveMQDestination actualDest = translator.convertDestination(this, destination);
+        ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+        ConsumerInfo consumerInfo = new ConsumerInfo(id);
+        consumerInfo.setPrefetchSize(1000);
+        consumerInfo.setDispatchAsync(true);
+
+        String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
+        consumerInfo.setSelector(selector);
+
+        IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
+
+        consumerInfo.setDestination(translator.convertDestination(this, destination));
+
+        StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
+        stompSubscription.setDestination(actualDest);
+
+        String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
+        if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
+            stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
+        } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
+            stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
+        } else {
+            stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
+        }
+
+        subscriptionsByConsumerId.put(id, stompSubscription);
+        sendToActiveMQ(consumerInfo, createResponseHandler(command));
+
+    }
+
+    protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
+        checkConnected();
+        Map<String, String> headers = command.getHeaders();
+
+        ActiveMQDestination destination = null;
+        Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
+        if (o != null) {
+            destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o);
+        }
+
+        String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
+
+        if (subscriptionId == null && destination == null) {
+            throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
+        }
+       
+        // check if it is a durable subscription
+        String durable = command.getHeaders().get("activemq.subscriptionName"); 
+        if (durable != null) {
+            RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
+            info.setClientId(durable);
+            info.setSubscriptionName(durable);
+            info.setConnectionId(connectionId);
+            sendToActiveMQ(info, createResponseHandler(command));
+            return;
+        }
+
+        // TODO: Unsubscribing using a destination is a bit wierd if multiple
+        // subscriptions
+        // are created with the same destination. Perhaps this should be
+        // removed.
+        //
+        for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
+            StompSubscription sub = iter.next();
+            if ((subscriptionId != null && subscriptionId.equals(sub.getSubscriptionId())) || (destination != null && destination.equals(sub.getDestination()))) {
+                sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
+                iter.remove();
+                return;
+            }
+        }
+       
+        throw new ProtocolException("No subscription matched.");
+    }
+
+    ConnectionInfo connectionInfo = new ConnectionInfo();
+    
+    protected void onStompConnect(final StompFrame command) throws ProtocolException {
+
+        if (connected.get()) {
+            throw new ProtocolException("Allready connected.");
+        }
+
+        final Map<String, String> headers = command.getHeaders();
+
+        // allow anyone to login for now
+        String login = headers.get(Stomp.Headers.Connect.LOGIN);
+        String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
+        String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
+
+
+        IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
+
+        connectionInfo.setConnectionId(connectionId);
+        if (clientId != null) {
+            connectionInfo.setClientId(clientId);
+        } else {
+            connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
+        }
+
+        connectionInfo.setResponseRequired(true);
+        connectionInfo.setUserName(login);
+        connectionInfo.setPassword(passcode);
+
+        sendToActiveMQ(connectionInfo, new ResponseHandler() {
+            public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+
+                if (response.isException()) {
+                    // If the connection attempt fails we close the socket.
+                    Throwable exception = ((ExceptionResponse)response).getException();
+                    handleException(exception, command);
+                    getTransportFilter().onException(IOExceptionSupport.create(exception));
+                    return;
+                }
+
+                final SessionInfo sessionInfo = new SessionInfo(sessionId);
+                sendToActiveMQ(sessionInfo, null);
+
+                final ProducerInfo producerInfo = new ProducerInfo(producerId);
+                sendToActiveMQ(producerInfo, new ResponseHandler() {
+                    public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+                        
+                        if (response.isException()) {
+                            // If the connection attempt fails we close the socket.
+                            Throwable exception = ((ExceptionResponse)response).getException();
+                            handleException(exception, command);
+                            getTransportFilter().onException(IOExceptionSupport.create(exception));
+                        }
+                        
+                        connected.set(true);
+                        HashMap<String, String> responseHeaders = new HashMap<String, String>();
+
+                        responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
+                        String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID);
+                        if (requestId == null) {
+                            // TODO legacy
+                            requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED);
+                        }
+                        if (requestId != null) {
+                            // TODO legacy
+                            responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
+                            responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
+                        }
+
+                        StompFrame sc = new StompFrame();
+                        sc.setAction(Stomp.Responses.CONNECTED);
+                        sc.setHeaders(responseHeaders);
+                        sendToStomp(sc);
+                    }
+                });
+
+            }
+        });
+    }
+
+    protected void onStompDisconnect(StompFrame command) throws ProtocolException {
+        checkConnected();
+        sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
+        sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
+        connected.set(false);
+    }
+
+    protected void checkConnected() throws ProtocolException {
+        if (!connected.get()) {
+            throw new ProtocolException("Not connected.");
+        }
+    }
+
+    /**
+     * Dispatch a ActiveMQ command
+     * 
+     * @param command
+     * @throws IOException
+     */
+    public void onActiveMQCommand(Command command) throws IOException, JMSException {
+        if (command.isResponse()) {
+
+            Response response = (Response)command;
+            ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
+            if (rh != null) {
+                rh.onResponse(this, response);
+            } else {
+                // Pass down any unexpected errors. Should this close the connection?
+                if (response.isException()) {
+                    Throwable exception = ((ExceptionResponse)response).getException();
+                    handleException(exception, null);
+                }
+            }
+        } else if (command.isMessageDispatch()) {
+
+            MessageDispatch md = (MessageDispatch)command;
+            StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
+            if (sub != null) {
+                sub.onMessageDispatch(md);
+            }
+        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
+            // Pass down any unexpected async errors. Should this close the connection?
+            Throwable exception = ((ConnectionError)command).getException();
+            handleException(exception, null);
+        }
+    }
+
+    public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
+        ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command);
+        return msg;
+    }
+
+    public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException {
+    	if (ignoreTransformation == true) {
+    		return frameTranslator.convertMessage(this, message);
+    	} else {
+    		return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message);
+    	}
+    }
+
+    public StompTransportFilter getTransportFilter() {
+        return transportFilter;
+    }
+
+	public ActiveMQDestination createTempQueue(String name) {
+        ActiveMQDestination rc = tempDestinations.get(name);
+        if( rc == null ) {
+            rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
+            sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
+            tempDestinations.put(name, rc);
+        }        
+        return rc;
+	}
+
+	public ActiveMQDestination createTempTopic(String name) {
+        ActiveMQDestination rc = tempDestinations.get(name);
+        if( rc == null ) {
+            rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
+            sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
+            tempDestinations.put(name, rc);
+            tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
+        }        
+        return rc;
+	}
+
+	public String getCreatedTempDestinationName(ActiveMQDestination destination) {
+		return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
+	}
+}

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolException.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolException.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolException.java (added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolException.java Thu Jun  4 18:19:18 2009
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+
+/**
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class ProtocolException extends IOException {
+
+    private static final long serialVersionUID = -2869735532997332242L;
+
+    private final boolean fatal;
+
+    public ProtocolException() {
+        this(null);
+    }
+
+    public ProtocolException(String s) {
+        this(s, false);
+    }
+
+    public ProtocolException(String s, boolean fatal) {
+        this(s, fatal, null);
+    }
+
+    public ProtocolException(String s, boolean fatal, Throwable cause) {
+        super(s);
+        this.fatal = fatal;
+        initCause(cause);
+    }
+
+    public boolean isFatal() {
+        return fatal;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java (added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ResponseHandler.java Thu Jun  4 18:19:18 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+
+import org.apache.activemq.command.Response;
+
+/**
+ * Interface used by the ProtocolConverter for callbacks.
+ * 
+ * @author <a href="http://hiramchirino.com">chirino</a> 
+ */
+interface ResponseHandler {
+    void onResponse(ProtocolConverter converter, Response response) throws IOException;
+}

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java (added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/Stomp.java Thu Jun  4 18:19:18 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+public interface Stomp {
+    String NULL = "\u0000";
+    String NEWLINE = "\n";
+
+    public static interface Commands {
+        String CONNECT = "CONNECT";
+        String SEND = "SEND";
+        String DISCONNECT = "DISCONNECT";
+        String SUBSCRIBE = "SUB";
+        String UNSUBSCRIBE = "UNSUB";
+
+        String BEGIN_TRANSACTION = "BEGIN";
+        String COMMIT_TRANSACTION = "COMMIT";
+        String ABORT_TRANSACTION = "ABORT";
+        String BEGIN = "BEGIN";
+        String COMMIT = "COMMIT";
+        String ABORT = "ABORT";
+        String ACK = "ACK";
+    }
+
+    public interface Responses {
+        String CONNECTED = "CONNECTED";
+        String ERROR = "ERROR";
+        String MESSAGE = "MESSAGE";
+        String RECEIPT = "RECEIPT";
+    }
+
+    public interface Headers {
+        String SEPERATOR = ":";
+        String RECEIPT_REQUESTED = "receipt";
+        String TRANSACTION = "transaction";
+        String CONTENT_LENGTH = "content-length";
+        String TRANSFORMATION = "transformation";
+        String TRANSFORMATION_ERROR = "transformation-error";
+
+        public interface Response {
+            String RECEIPT_ID = "receipt-id";
+        }
+
+        public interface Send {
+            String DESTINATION = "destination";
+            String CORRELATION_ID = "correlation-id";
+            String REPLY_TO = "reply-to";
+            String EXPIRATION_TIME = "expires";
+            String PRIORITY = "priority";
+            String TYPE = "type";
+            Object PERSISTENT = "persistent";
+        }
+
+        public interface Message {
+            String MESSAGE_ID = "message-id";
+            String DESTINATION = "destination";
+            String CORRELATION_ID = "correlation-id";
+            String EXPIRATION_TIME = "expires";
+            String REPLY_TO = "reply-to";
+            String PRORITY = "priority";
+            String REDELIVERED = "redelivered";
+            String TIMESTAMP = "timestamp";
+            String TYPE = "type";
+            String SUBSCRIPTION = "subscription";
+        }
+
+        public interface Subscribe {
+            String DESTINATION = "destination";
+            String ACK_MODE = "ack";
+            String ID = "id";
+            String SELECTOR = "selector";
+
+            public interface AckModeValues {
+                String AUTO = "auto";
+                String CLIENT = "client";
+                String INDIVIDUAL = "client-individual";
+            }
+        }
+
+        public interface Unsubscribe {
+            String DESTINATION = "destination";
+            String ID = "id";
+        }
+
+        public interface Connect {
+            String LOGIN = "login";
+            String PASSCODE = "passcode";
+            String CLIENT_ID = "client-id";
+            String REQUEST_ID = "request-id";
+        }
+
+        public interface Error {
+            String MESSAGE = "message";
+        }
+
+        public interface Connected {
+            String SESSION = "session";
+            String RESPONSE_ID = "response-id";
+        }
+
+        public interface Ack {
+            String MESSAGE_ID = "message-id";
+        }
+    }
+    
+	public enum Transformations {
+		JMS_BYTE, JMS_OBJECT_XML, JMS_OBJECT_JSON, JMS_MAP_XML, JMS_MAP_JSON;
+		
+		public String toString() {
+			return name().replaceAll("_", "-").toLowerCase();
+		}
+		
+		public static Transformations getValue(String value) {
+			return valueOf(value.replaceAll("-", "_").toUpperCase());
+		}
+	}    
+}

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java (added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java Thu Jun  4 18:19:18 2009
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;
+
+public class StompConnection {
+
+    public static final long RECEIVE_TIMEOUT = 10000;
+
+    private Socket stompSocket;
+    private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
+    
+    public void open(String host, int port) throws IOException, UnknownHostException {
+        open(new Socket(host, port));
+    }
+    
+    public void open(Socket socket) {
+    	stompSocket = socket;
+    }
+
+    public void close() throws IOException {
+        if (stompSocket != null) {
+            stompSocket.close();
+            stompSocket = null;
+        }
+    }
+
+    public void sendFrame(String data) throws Exception {
+        byte[] bytes = data.getBytes("UTF-8");
+        OutputStream outputStream = stompSocket.getOutputStream();
+        outputStream.write(bytes);
+        outputStream.write(0);
+        outputStream.flush();
+    }
+    
+    public StompFrame receive() throws Exception {
+        return receive(RECEIVE_TIMEOUT);
+    }    
+    
+    public StompFrame receive(long timeOut) throws Exception {
+    	stompSocket.setSoTimeout((int)timeOut);
+    	InputStream is = stompSocket.getInputStream();
+        StompWireFormat wf = new StompWireFormat();
+        DataInputStream dis = new DataInputStream(is);
+        return (StompFrame)wf.unmarshal(dis);
+    }
+
+    public String receiveFrame() throws Exception {
+        return receiveFrame(RECEIVE_TIMEOUT);
+    }
+
+    public String receiveFrame(long timeOut) throws Exception {
+        stompSocket.setSoTimeout((int)timeOut);
+        InputStream is = stompSocket.getInputStream();
+        int c = 0;
+        for (;;) {
+            c = is.read();
+            if (c < 0) {
+                throw new IOException("socket closed.");
+            } else if (c == 0) {
+                c = is.read();
+                if (c != '\n') {
+                    throw new IOException("Expecting stomp frame to terminate with \0\n");
+                }
+                byte[] ba = inputBuffer.toByteArray();
+                inputBuffer.reset();
+                return new String(ba, "UTF-8");
+            } else {
+                inputBuffer.write(c);
+            }
+        }
+    }
+
+	public Socket getStompSocket() {
+		return stompSocket;
+	}
+
+	public void setStompSocket(Socket stompSocket) {
+		this.stompSocket = stompSocket;
+	}
+	
+    public void connect(String username, String password) throws Exception {
+        connect(username, password, null);
+    }
+	
+    public void connect(String username, String password, String client) throws Exception {
+    	HashMap<String, String> headers = new HashMap();
+    	headers.put("login", username);
+    	headers.put("passcode", password);
+    	if (client != null) {
+    		headers.put("client-id", client);
+    	}
+    	StompFrame frame = new StompFrame("CONNECT", headers);
+        sendFrame(frame.toString());
+        
+        StompFrame connect = receive();
+        if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
+        	throw new Exception ("Not connected: " + connect.getBody());
+        }
+    }
+    
+    public void disconnect() throws Exception {
+    	StompFrame frame = new StompFrame("DISCONNECT");
+        sendFrame(frame.toString());    	
+    }
+    
+    public void send(String destination, String message) throws Exception {
+    	send(destination, message, null, null);
+    }
+    
+    public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception {
+    	if (headers == null) {
+    		headers = new HashMap<String, String>();
+    	}
+    	headers.put("destination", destination);
+    	if (transaction != null) {
+    		headers.put("transaction", transaction);
+    	}
+    	StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
+        sendFrame(frame.toString());    	
+    }
+    
+    public void subscribe(String destination) throws Exception {
+    	subscribe(destination, null, null);
+    }
+    
+    public void subscribe(String destination, String ack) throws Exception {
+    	subscribe(destination, ack, new HashMap<String, String>());
+    }
+    
+    public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception {
+		if (headers == null) {
+			headers = new HashMap<String, String>();
+		}
+		headers.put("destination", destination);
+    	if (ack != null) {
+    		headers.put("ack", ack);
+    	}
+    	StompFrame frame = new StompFrame("SUBSCRIBE", headers);
+        sendFrame(frame.toString());    	
+    }
+    
+    public void unsubscribe(String destination) throws Exception {
+    	unsubscribe(destination, null);
+    }
+    
+    public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception {
+		if (headers == null) {
+			headers = new HashMap<String, String>();
+		}
+		headers.put("destination", destination);
+    	StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
+        sendFrame(frame.toString());    	
+    }    
+    
+    public void begin(String transaction) throws Exception {
+    	HashMap<String, String> headers = new HashMap<String, String>();
+    	headers.put("transaction", transaction);
+    	StompFrame frame = new StompFrame("BEGIN", headers);
+    	sendFrame(frame.toString());
+    }
+    
+    public void abort(String transaction) throws Exception {
+    	HashMap<String, String> headers = new HashMap<String, String>();
+    	headers.put("transaction", transaction);
+    	StompFrame frame = new StompFrame("ABORT", headers);
+    	sendFrame(frame.toString());
+    }
+    
+    public void commit(String transaction) throws Exception {
+    	HashMap<String, String> headers = new HashMap<String, String>();
+    	headers.put("transaction", transaction);
+    	StompFrame frame = new StompFrame("COMMIT", headers);
+    	sendFrame(frame.toString());
+    }
+    
+    public void ack(StompFrame frame) throws Exception {
+    	ack(frame.getHeaders().get("message-id"), null);
+    }    
+    
+    public void ack(StompFrame frame, String transaction) throws Exception {
+    	ack(frame.getHeaders().get("message-id"), transaction);
+    }
+    
+    public void ack(String messageId) throws Exception {
+    	ack(messageId, null);
+    }
+    
+    public void ack(String messageId, String transaction) throws Exception {
+    	HashMap<String, String> headers = new HashMap<String, String>();
+    	headers.put("message-id", messageId);
+    	if (transaction != null)
+    		headers.put("transaction", transaction);
+    	StompFrame frame = new StompFrame("ACK", headers);
+    	sendFrame(frame.toString());	
+    }
+    
+    protected String appendHeaders(HashMap<String, Object> headers) {
+    	StringBuffer result = new StringBuffer();
+    	for (String key : headers.keySet()) {
+    		result.append(key + ":" + headers.get(key) + "\n");
+    	}
+    	result.append("\n");
+    	return result.toString();
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java (added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java Thu Jun  4 18:19:18 2009
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Endpoint;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * Represents all the data in a STOMP frame.
+ * 
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class StompFrame implements Command {
+
+    public static final byte[] NO_DATA = new byte[] {};
+
+    private String action;
+    private Map<String, String> headers = new HashMap<String, String>();
+    private byte[] content = NO_DATA;
+
+    public StompFrame(String command) {
+    	this(command, null, null);
+    }
+    
+    public StompFrame(String command, Map<String, String> headers) {
+    	this(command, headers, null);
+    }    
+    
+    public StompFrame(String command, Map<String, String> headers, byte[] data) {
+        this.action = command;
+        if (headers != null)
+        	this.headers = headers;
+        if (data != null)
+        	this.content = data;
+    }
+    
+    public StompFrame() {
+    }
+
+    public String getAction() {
+        return action;
+    }
+
+    public void setAction(String command) {
+        this.action = command;
+    }
+
+    public byte[] getContent() {
+        return content;
+    }
+    
+    public String getBody() {
+    	return new String(content);
+    }
+
+    public void setContent(byte[] data) {
+        this.content = data;
+    }
+
+    public Map<String, String> getHeaders() {
+        return headers;
+    }
+
+    public void setHeaders(Map<String, String> headers) {
+        this.headers = headers;
+    }
+
+    //
+    // Methods in the Command interface
+    //
+    public int getCommandId() {
+        return 0;
+    }
+
+    public Endpoint getFrom() {
+        return null;
+    }
+
+    public Endpoint getTo() {
+        return null;
+    }
+
+    public boolean isBrokerInfo() {
+        return false;
+    }
+
+    public boolean isMessage() {
+        return false;
+    }
+
+    public boolean isMessageAck() {
+        return false;
+    }
+
+    public boolean isMessageDispatch() {
+        return false;
+    }
+
+    public boolean isMessageDispatchNotification() {
+        return false;
+    }
+
+    public boolean isResponse() {
+        return false;
+    }
+
+    public boolean isResponseRequired() {
+        return false;
+    }
+
+    public boolean isShutdownInfo() {
+        return false;
+    }
+
+    public boolean isWireFormatInfo() {
+        return false;
+    }
+
+    public void setCommandId(int value) {
+    }
+
+    public void setFrom(Endpoint from) {
+    }
+
+    public void setResponseRequired(boolean responseRequired) {
+    }
+
+    public void setTo(Endpoint to) {
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return null;
+    }
+
+    public byte getDataStructureType() {
+        return 0;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    public String toString() {
+        StringBuffer buffer = new StringBuffer();
+        buffer.append(getAction());
+        buffer.append("\n");
+        Map headers = getHeaders();
+        for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
+            Map.Entry entry = (Map.Entry)iter.next();
+            buffer.append(entry.getKey());
+            buffer.append(":");
+            buffer.append(entry.getValue());
+            buffer.append("\n");
+        }
+        buffer.append("\n");
+        if (getContent() != null) {
+            try {
+                buffer.append(new String(getContent()));
+            } catch (Throwable e) {
+                buffer.append(Arrays.toString(getContent()));
+            }
+        }
+        return buffer.toString();
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrameError.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrameError.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrameError.java (added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompFrameError.java Thu Jun  4 18:19:18 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+/**
+ * Command indicating that an invalid Stomp Frame was received.
+ * 
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class StompFrameError extends StompFrame {
+
+    private final ProtocolException exception;
+
+    public StompFrameError(ProtocolException exception) {
+        this.exception = exception;
+    }
+
+    public ProtocolException getException() {
+        return exception;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java?rev=781822&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java (added)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java Thu Jun  4 18:19:18 2009
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.util.Map;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.SslTransportFactory;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.xbean.XBeanBrokerService;
+import org.springframework.context.ApplicationContext;
+
+/**
+ * A <a href="http://stomp.codehaus.org/">STOMP</a> over SSL transport factory
+ * 
+ * @version $Revision: 645574 $
+ */
+public class StompSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
+
+    private ApplicationContext applicationContext = null;
+
+    protected String getDefaultWireFormatType() {
+        return "stomp";
+    }
+
+    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+        transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), applicationContext);
+        IntrospectionSupport.setProperties(transport, options);
+        return super.compositeConfigure(transport, format, options);
+    }
+
+    public void setBrokerService(BrokerService brokerService) {
+        if (brokerService instanceof XBeanBrokerService) {
+            this.applicationContext = ((XBeanBrokerService)brokerService).getApplicationContext();
+        }
+    }
+
+}



Mime
View raw message