activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r418550 - in /incubator/activemq/trunk/activemq-core/src/main: java/org/apache/activemq/transport/stomp2/ resources/META-INF/services/org/apache/activemq/transport/ resources/META-INF/services/org/apache/activemq/wireformat/
Date Sun, 02 Jul 2006 04:18:46 GMT
Author: chirino
Date: Sat Jul  1 21:18:44 2006
New Revision: 418550

URL: http://svn.apache.org/viewvc?rev=418550&view=rev
Log:
Added a new/highly refactored version of the STOMP protocol implementation.

The biggest difference between this and previous implementation is that conversion between the STOMP protocol and 
the ActiveMQ protocol happens at a Transport Filter layer instead of doing it all at the WireFormat layer.

I think this has resulted in cleaner code since there's a better seperating between marshalling/unmarshalling of 
the STOMP protocol and converting the stomp protocol to the activemq protocol.


Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ResponseHandler.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormatFactory.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp
    incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java?rev=418550&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ProtocolConverter.java Sat Jul  1 21:18:44 2006
@@ -0,0 +1,627 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.ProtocolException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+import org.apache.activeio.util.ByteArrayOutputStream;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.LongSequenceGenerator;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">chirino</a> 
+ */
+public class ProtocolConverter {
+	
+    private static final IdGenerator connectionIdGenerator = new IdGenerator();
+    private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
+    private final SessionId sessionId = new SessionId(connectionId, -1);
+    private final ProducerId producerId = new ProducerId(sessionId, 1);
+    
+    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
+    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
+    private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
+	
+    private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap();
+    private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap();
+    private final Map transactions = new ConcurrentHashMap();
+	private StompTransportFilter transportFilter;
+	
+	private final Object commnadIdMutex = new Object();
+	private int lastCommandId;
+    private final AtomicBoolean connected = new AtomicBoolean(false);
+
+    protected int generateCommandId() {
+    	synchronized(commnadIdMutex){
+    		return lastCommandId++;
+    	}
+    }
+
+    protected ResponseHandler createResponseHandler(StompCommand command){
+        final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+        // A response may not be needed.
+        if( receiptId != null ) {
+	        return new ResponseHandler() {
+	    		public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+	                StompCommand sc = new StompCommand();
+	                sc.setHeaders(new HashMap(5));
+	                sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+	        		transportFilter.sendToStomp(sc);
+	    		}
+	        };
+	    }
+    	return null;
+    }
+    
+	protected void sendToActiveMQ(Command command, ResponseHandler handler) {
+		command.setCommandId(generateCommandId());
+		if(handler!=null) {
+			command.setResponseRequired(true);
+			resposeHandlers.put(new Integer(command.getCommandId()), handler);
+		}
+		transportFilter.sendToActiveMQ(command);
+	}
+
+	protected void sendToStomp(StompCommand command) throws IOException {
+		transportFilter.sendToStomp(command);
+	}
+
+	/**
+     * Convert a stomp command
+     * @param command
+     */
+	public void onStompCommad( StompCommand command ) throws IOException, JMSException {
+		try {
+			
+			String action = command.getAction();
+	        if (action.startsWith(Stomp.Commands.SEND))
+	            onStompSend(command);
+	        else if (action.startsWith(Stomp.Commands.ACK))
+	            onStompAck(command);
+	        else if (action.startsWith(Stomp.Commands.BEGIN))
+	            onStompBegin(command);
+	        else if (action.startsWith(Stomp.Commands.COMMIT))
+	            onStompCommit(command);
+	        else if (action.startsWith(Stomp.Commands.ABORT))
+	            onStompAbort(command);
+	        else if (action.startsWith(Stomp.Commands.SUBSCRIBE))
+	            onStompSubscribe(command);
+	        else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE))
+	            onStompUnsubscribe(command);
+			else if (action.startsWith(Stomp.Commands.CONNECT))
+	            onStompConnect(command);
+	        else if (action.startsWith(Stomp.Commands.DISCONNECT))
+	            onStompDisconnect(command);
+	        else
+	        	throw new ProtocolException("Unknown STOMP action: "+action);
+	        
+        } catch (ProtocolException e) {
+        	
+        	// Let the stomp client know about any protocol errors.
+        	ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        	PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos,"UTF-8"));
+        	e.printStackTrace(stream);
+        	stream.close();
+
+        	HashMap headers = new HashMap();
+        	headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
+        	
+            final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+            if( receiptId != null ) {            	
+            	headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+            }
+        	
+        	StompCommand errorMessage = new StompCommand(Stomp.Responses.ERROR,headers,baos.toByteArray());
+			sendToStomp(errorMessage);
+			
+        }
+	}
+	
+	protected void onStompSend(StompCommand command) throws IOException, JMSException {
+		checkConnected();
+
+    	Map headers = command.getHeaders();
+        String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
+
+        ActiveMQMessage message = convertMessage(command);
+
+        message.setProducerId(producerId);
+        MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
+        message.setMessageId(id);
+        message.setJMSTimestamp(System.currentTimeMillis());
+
+        if (stompTx!=null) {
+        	TransactionId activemqTx = (TransactionId) transactions.get(stompTx);
+            if (activemqTx == null)
+                throw new ProtocolException("Invalid transaction id: "+stompTx);
+            message.setTransactionId(activemqTx);
+        }
+		
+        message.onSend();
+		sendToActiveMQ(message, createResponseHandler(command));
+		
+	}
+	
+
+    protected void onStompAck(StompCommand command) throws ProtocolException {
+		checkConnected();
+
+    	// TODO: acking with just a message id is very bogus
+    	// since the same message id could have been sent to 2 different subscriptions
+    	// on the same stomp connection. For example, when 2 subs are created on the same topic.
+    	
+    	Map headers = command.getHeaders();
+        String messageId = (String) headers.get(Stomp.Headers.Ack.MESSAGE_ID);
+        if (messageId == null)
+            throw new ProtocolException("ACK received without a message-id to acknowledge!");
+
+        TransactionId activemqTx=null;
+        String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
+        if (stompTx!=null) {
+        	activemqTx = (TransactionId) transactions.get(stompTx);
+            if (activemqTx == null)
+                throw new ProtocolException("Invalid transaction id: "+stompTx);
+        }
+
+        boolean acked=false;
+        for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
+			StompSubscription sub = (StompSubscription) iter.next();
+			MessageAck ack = sub.onStompMessageAck(messageId);
+			if( ack!=null ) {
+		        ack.setTransactionId(activemqTx);
+		        sendToActiveMQ(ack,createResponseHandler(command));
+		        acked=true;
+		        break;
+			}
+		}
+        
+        if( !acked )
+        	throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
+
+	}
+    
+
+	protected void onStompBegin(StompCommand command) throws ProtocolException {
+		checkConnected();
+
+		Map headers = command.getHeaders();
+		
+        String stompTx = (String)headers.get(Stomp.Headers.TRANSACTION);
+        
+        if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
+            throw new ProtocolException("Must specify the transaction you are beginning");
+        }
+        
+        if( transactions.get(stompTx)!=null  ) {
+            throw new ProtocolException("The transaction was allready started: "+stompTx);
+        }
+        
+        LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
+        transactions.put(stompTx, activemqTx);
+        
+        TransactionInfo tx = new TransactionInfo();
+        tx.setConnectionId(connectionId);
+        tx.setTransactionId(activemqTx);
+        tx.setType(TransactionInfo.BEGIN);
+        
+		sendToActiveMQ(tx, createResponseHandler(command));
+		
+	}
+	
+	protected void onStompCommit(StompCommand command) throws ProtocolException {
+		checkConnected();
+
+		Map headers = command.getHeaders();
+		
+        String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
+        if (stompTx==null) {
+            throw new ProtocolException("Must specify the transaction you are committing");
+        }
+        
+        TransactionId activemqTx=null;
+        if (stompTx!=null) {
+        	activemqTx = (TransactionId) transactions.remove(stompTx);
+            if (activemqTx == null)
+                throw new ProtocolException("Invalid transaction id: "+stompTx);
+        }
+
+        TransactionInfo tx = new TransactionInfo();
+        tx.setConnectionId(connectionId);
+        tx.setTransactionId(activemqTx);
+        tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
+		
+		sendToActiveMQ(tx, createResponseHandler(command));
+	}
+
+	protected void onStompAbort(StompCommand command) throws ProtocolException {
+		checkConnected();
+    	Map headers = command.getHeaders();
+		
+        String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
+        if (stompTx==null) {
+            throw new ProtocolException("Must specify the transaction you are committing");
+        }
+        
+        TransactionId activemqTx=null;
+        if (stompTx!=null) {
+        	activemqTx = (TransactionId) transactions.remove(stompTx);
+            if (activemqTx == null)
+                throw new ProtocolException("Invalid transaction id: "+stompTx);
+        }
+
+        TransactionInfo tx = new TransactionInfo();
+        tx.setConnectionId(connectionId);
+        tx.setTransactionId(activemqTx);
+        tx.setType(TransactionInfo.ROLLBACK);
+		
+		sendToActiveMQ(tx, createResponseHandler(command));
+		
+	}
+
+	protected void onStompSubscribe(StompCommand command) throws ProtocolException {
+		checkConnected();
+    	Map headers = command.getHeaders();
+        
+        String subscriptionId = (String)headers.get(Stomp.Headers.Subscribe.ID);
+        String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
+        
+        ActiveMQDestination actual_dest = convertDestination(destination);
+        ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+        ConsumerInfo consumerInfo = new ConsumerInfo(id);
+        consumerInfo.setPrefetchSize(1000);
+        consumerInfo.setDispatchAsync(true);
+
+        String selector = (String) headers.remove(Stomp.Headers.Subscribe.SELECTOR);
+        consumerInfo.setSelector(selector);
+        
+        IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
+        
+        consumerInfo.setDestination(convertDestination(destination));
+                
+        StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo);
+        stompSubscription.setDestination(actual_dest);
+        
+        String ackMode = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
+        if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
+            stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
+        } else {
+            stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
+        }
+
+        subscriptionsByConsumerId.put(id, stompSubscription);
+		sendToActiveMQ(consumerInfo, createResponseHandler(command));
+		
+	}
+
+	protected void onStompUnsubscribe(StompCommand command) throws ProtocolException {
+		checkConnected();
+    	Map headers = command.getHeaders();
+
+        ActiveMQDestination destination=null;
+        Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
+        if( o!=null ) 
+        	destination =convertDestination((String) o);
+        
+        String subscriptionId = (String)headers.get(Stomp.Headers.Unsubscribe.ID);
+        
+        if (subscriptionId==null && destination==null) {
+            throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
+        }
+
+        // TODO: Unsubscribing using a destination is a bit wierd if multiple subscriptions
+        // are created with the same destination.  Perhaps this should be removed.
+        //
+        for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
+			StompSubscription sub = (StompSubscription) iter.next();
+			if ( 
+				(subscriptionId!=null && subscriptionId.equals(sub.getSubscriptionId()) ) ||
+				(destination!=null && destination.equals(sub.getDestination()) )
+			) {
+		        sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
+				return;
+			}
+		}
+        
+        throw new ProtocolException("No subscription matched.");
+	}
+
+	protected void onStompConnect(StompCommand command) throws ProtocolException {
+
+		if(connected.get()) {
+			throw new ProtocolException("Allready connected.");
+		}
+
+    	final Map headers = command.getHeaders();
+        
+        // allow anyone to login for now
+        String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
+        String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+        String clientId = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
+        
+        final ConnectionInfo connectionInfo = new ConnectionInfo();
+        
+        IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
+        
+        connectionInfo.setConnectionId(connectionId);
+        if( clientId!=null )
+            connectionInfo.setClientId(clientId);
+        else
+            connectionInfo.setClientId(""+connectionInfo.getConnectionId().toString());
+        
+        connectionInfo.setResponseRequired(true);
+        connectionInfo.setUserName(login);
+        connectionInfo.setPassword(passcode);
+
+		sendToActiveMQ(connectionInfo, new ResponseHandler(){
+			public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+					            
+	            final SessionInfo sessionInfo = new SessionInfo(sessionId);
+	            sendToActiveMQ(sessionInfo,null);
+	            
+	            
+	            final ProducerInfo producerInfo = new ProducerInfo(producerId);
+	            sendToActiveMQ(producerInfo,new ResponseHandler(){
+					public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+						
+						connected.set(true);
+	                    HashMap responseHeaders = new HashMap();
+	                    
+	                    responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
+	                    String requestId = (String) headers.get(Stomp.Headers.Connect.REQUEST_ID);
+	                    if( requestId !=null ){
+		                    responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
+	            		}
+	                    
+	                    StompCommand sc = new StompCommand();
+	                    sc.setAction(Stomp.Responses.CONNECTED);
+	                    sc.setHeaders(responseHeaders);
+	                    sendToStomp(sc);
+					}
+				});
+	            
+			}
+		});
+		
+	}
+
+	protected void onStompDisconnect(StompCommand command) throws ProtocolException {
+		checkConnected();
+		sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
+		connected.set(false);
+	}
+
+
+	protected void checkConnected() throws ProtocolException {
+		if(!connected.get()) {
+			throw new ProtocolException("Not connected.");
+		}
+	}
+
+	/**
+     * Convert a ActiveMQ command
+     * @param command
+     * @throws IOException 
+     */
+	public void onActiveMQCommad( Command command ) throws IOException, JMSException {
+		
+    	if ( command.isResponse() ) {
+		    
+			Response response = (Response) command;
+		    ResponseHandler rh = (ResponseHandler) resposeHandlers.remove(new Integer(response.getCorrelationId()));
+		    if( rh !=null ) {
+		    	rh.onResponse(this, response);
+		    }
+		    
+		} else if( command.isMessageDispatch() ) {
+			
+		    MessageDispatch md = (MessageDispatch)command;
+		    StompSubscription sub = (StompSubscription) subscriptionsByConsumerId.get(md.getConsumerId());
+		    if (sub != null)
+		        sub.onMessageDispatch(md);
+		    
+		}
+		
+	}
+
+	public  ActiveMQMessage convertMessage(StompCommand command) throws IOException, JMSException {
+		Map headers = command.getHeaders();
+        
+        // now the body
+        ActiveMQMessage msg;
+        if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
+            headers.remove(Stomp.Headers.CONTENT_LENGTH);
+            ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
+            bm.writeBytes(command.getContent());
+            msg = bm;
+        } else {
+            ActiveMQTextMessage text = new ActiveMQTextMessage();
+            try {
+				text.setText(new String(command.getContent(), "UTF-8"));
+			} catch (Throwable e) {
+				throw (ProtocolException)new ProtocolException("Text could not bet set: "+e).initCause(e);
+			}
+            msg = text;
+        }
+
+        String destination = (String) headers.remove(Stomp.Headers.Send.DESTINATION);
+        msg.setDestination(convertDestination(destination));
+
+        // the standard JMS headers
+        msg.setJMSCorrelationID((String) headers.remove(Stomp.Headers.Send.CORRELATION_ID));
+
+        Object o = headers.remove(Stomp.Headers.Send.EXPIRATION_TIME);
+        if (o != null) {
+            msg.setJMSExpiration(Long.parseLong((String) o));
+        }
+        
+        o = headers.remove(Stomp.Headers.Send.PRIORITY);
+        if (o != null) {
+            msg.setJMSPriority(Integer.parseInt((String)o));
+        }
+        
+        o = headers.remove(Stomp.Headers.Send.TYPE);
+        if (o != null) {
+            msg.setJMSType((String) o);
+        }
+        
+        o = headers.remove(Stomp.Headers.Send.REPLY_TO);
+        if( o!=null ) {
+        	msg.setJMSReplyTo(convertDestination((String)o));
+        }
+
+        o = headers.remove(Stomp.Headers.Send.PERSISTENT);
+        if (o != null) {
+            msg.setPersistent("true".equals(o));
+        }
+        
+        // now the general headers
+        msg.setProperties(headers);
+        
+        return msg;        
+	}
+	
+	public StompCommand convertMessage(ActiveMQMessage message) throws IOException, JMSException {
+
+		StompCommand command = new StompCommand();
+		command.setAction(Stomp.Responses.MESSAGE);
+		
+		HashMap headers = new HashMap();
+		command.setHeaders(headers);
+		
+        headers.put(Stomp.Headers.Message.DESTINATION, convertDestination(message.getDestination()));
+        headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID());
+        headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getJMSCorrelationID());
+        headers.put(Stomp.Headers.Message.EXPIRATION_TIME, ""+message.getJMSExpiration());
+        if (message.getJMSRedelivered()) {
+            headers.put(Stomp.Headers.Message.REDELIVERED, "true");
+        }
+        headers.put(Stomp.Headers.Message.PRORITY, ""+message.getJMSPriority());
+        headers.put(Stomp.Headers.Message.REPLY_TO, convertDestination(message.getJMSReplyTo()));
+        headers.put(Stomp.Headers.Message.TIMESTAMP, ""+message.getJMSTimestamp());
+        headers.put(Stomp.Headers.Message.TYPE, message.getJMSType());
+
+        // now lets add all the message headers
+        Map properties = message.getProperties();
+        if (properties != null) {
+            headers.putAll(properties);
+        }
+        
+        if( message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE ) {
+        	
+            ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
+            command.setContent(msg.getText().getBytes("UTF-8"));
+            
+        } else if( message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE ) {
+            
+        	ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
+            byte[] data = new byte[(int)msg.getBodyLength()]; 
+            msg.readBytes(data);
+
+            headers.put(Stomp.Headers.CONTENT_LENGTH, ""+data.length);
+            command.setContent(data);
+            
+        }
+
+        return command;		
+	}
+	
+    protected ActiveMQDestination convertDestination(String name) throws ProtocolException {
+        if (name == null) {
+            return null;
+        }
+        else if (name.startsWith("/queue/")) {
+            String q_name = name.substring("/queue/".length(), name.length());
+            return ActiveMQDestination.createDestination(q_name, ActiveMQDestination.QUEUE_TYPE);
+        }
+        else if (name.startsWith("/topic/")) {
+            String t_name = name.substring("/topic/".length(), name.length());
+            return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TOPIC_TYPE);
+        }
+        else {
+            throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations " + "must begine with /queue/ or /topic/");
+        }
+
+    }
+
+    protected String convertDestination(Destination d) {
+        if (d == null) {
+            return null;
+        }
+        ActiveMQDestination amq_d = (ActiveMQDestination) d;
+        String p_name = amq_d.getPhysicalName();
+
+        StringBuffer buffer = new StringBuffer();
+        if (amq_d.isQueue()) {
+            buffer.append("/queue/");
+        }
+        if (amq_d.isTopic()) {
+            buffer.append("/topic/");
+        }
+        buffer.append(p_name);
+
+        return buffer.toString();
+    }
+
+	public StompTransportFilter getTransportFilter() {
+		return transportFilter;
+	}
+
+	public void setTransportFilter(StompTransportFilter transportFilter) {
+		this.transportFilter = transportFilter;
+	}
+	
+	public void onStompExcepton(IOException error) {
+		// TODO Auto-generated method stub
+	}
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ResponseHandler.java?rev=418550&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ResponseHandler.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/ResponseHandler.java Sat Jul  1 21:18:44 2006
@@ -0,0 +1,30 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.io.IOException;
+
+import org.apache.activemq.command.Response;
+
+/**
+ * Interface used by the ProtocolConverter for callbacks.
+ * 
+ * @author <a href="http://hiramchirino.com">chirino</a> 
+ */
+interface ResponseHandler {
+    void onResponse(ProtocolConverter converter, Response response) throws IOException;
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java?rev=418550&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompCommand.java Sat Jul  1 21:18:44 2006
@@ -0,0 +1,149 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Endpoint;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * Represents all the data in a STOMP frame.
+ * 
+ * @author <a href="http://hiramchirino.com">chirino</a> 
+ */
+public class StompCommand implements Command {
+
+    private static final byte[] NO_DATA = new byte[]{};
+
+	private String action;
+	private Map headers = Collections.EMPTY_MAP;
+	private byte[] content = NO_DATA;
+
+	public StompCommand(String command, HashMap headers, byte[] data) {
+		this.action = command;
+		this.headers = headers;
+		this.content = data;
+	}
+
+	public StompCommand() {
+	}
+
+	public String getAction() {
+		return action;
+	}
+
+	public void setAction(String command) {
+		this.action = command;
+	}
+
+	public byte[] getContent() {
+		return content;
+	}
+
+	public void setContent(byte[] data) {
+		this.content = data;
+	}
+
+	public Map getHeaders() {
+		return headers;
+	}
+
+	public void setHeaders(Map headers) {
+		this.headers = headers;
+	}
+
+	//
+	// Methods in the Command interface
+	//
+	public int getCommandId() {
+		return 0;
+	}
+
+	public Endpoint getFrom() {
+		return null;
+	}
+
+	public Endpoint getTo() {
+		return null;
+	}
+
+	public boolean isBrokerInfo() {
+		return false;
+	}
+
+	public boolean isMessage() {
+		return false;
+	}
+
+	public boolean isMessageAck() {
+		return false;
+	}
+
+	public boolean isMessageDispatch() {
+		return false;
+	}
+
+	public boolean isMessageDispatchNotification() {
+		return false;
+	}
+
+	public boolean isResponse() {
+		return false;
+	}
+
+	public boolean isResponseRequired() {
+		return false;
+	}
+
+	public boolean isShutdownInfo() {
+		return false;
+	}
+
+	public boolean isWireFormatInfo() {
+		return false;
+	}
+
+	public void setCommandId(int value) {
+	}
+
+	public void setFrom(Endpoint from) {
+	}
+
+	public void setResponseRequired(boolean responseRequired) {
+	}
+
+	public void setTo(Endpoint to) {
+	}
+
+	public Response visit(CommandVisitor visitor) throws Exception {
+		return null;
+	}
+
+	public byte getDataStructureType() {
+		return 0;
+	}
+
+	public boolean isMarshallAware() {
+		return false;
+	}
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java?rev=418550&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompSubscription.java Sat Jul  1 21:18:44 2006
@@ -0,0 +1,136 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.transport.stomp.Stomp;
+
+/**
+ * Keeps track of the STOMP susbscription so that acking is correctly done.
+ *  
+ * @author <a href="http://hiramchirino.com">chirino</a> 
+ */
+public class StompSubscription {
+    
+    public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
+    public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
+
+	private final ProtocolConverter protocolConverter;
+    private final String subscriptionId;
+    private final ConsumerInfo consumerInfo;
+    
+    private final LinkedHashMap dispatchedMessage = new LinkedHashMap();
+    
+    private String ackMode = AUTO_ACK;
+	private ActiveMQDestination destination;
+
+    
+    public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo) {
+        this.protocolConverter = stompTransport;
+		this.subscriptionId = subscriptionId;
+        this.consumerInfo = consumerInfo;
+    }
+
+    void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
+
+    	ActiveMQMessage message = (ActiveMQMessage) md.getMessage();
+    	
+        if (ackMode == CLIENT_ACK) {
+            synchronized (this) {
+            	dispatchedMessage.put(message.getJMSMessageID(), message.getMessageId());
+            }
+        } else if (ackMode == AUTO_ACK) {
+            MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
+            protocolConverter.getTransportFilter().sendToActiveMQ(ack);
+        }
+        
+        StompCommand command = protocolConverter.convertMessage(message);
+        
+        command.setAction(Stomp.Responses.MESSAGE);        
+        if (subscriptionId!=null) {
+            command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
+        }
+        
+        protocolConverter.getTransportFilter().sendToStomp(command);
+    }
+    
+    synchronized MessageAck onStompMessageAck(String messageId) {
+    	
+		if( !dispatchedMessage.containsKey(messageId) ) {
+			return null;
+		}
+    	
+        MessageAck ack = new MessageAck();
+        ack.setDestination(consumerInfo.getDestination());
+        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+        ack.setConsumerId(consumerInfo.getConsumerId());
+        
+        int count=0;
+        for (Iterator iter = dispatchedMessage.keySet().iterator(); iter.hasNext();) {
+            
+            String id = (String) iter.next();
+            if( ack.getFirstMessageId()==null )
+                ack.setFirstMessageId((MessageId) dispatchedMessage.get(id));
+
+            iter.remove();
+            count++;
+            if( id.equals(messageId)  ) {
+                ack.setLastMessageId((MessageId) dispatchedMessage.get(id));
+                break;
+            }
+        }
+        
+        ack.setMessageCount(count);
+        return ack;
+    }
+
+	public String getAckMode() {
+		return ackMode;
+	}
+
+	public void setAckMode(String ackMode) {
+		this.ackMode = ackMode;
+	}
+
+	public String getSubscriptionId() {
+		return subscriptionId;
+	}
+
+	public void setDestination(ActiveMQDestination destination) {
+		this.destination = destination;
+	}
+
+	public ActiveMQDestination getDestination() {
+		return destination;
+	}
+
+	public ConsumerInfo getConsumerInfo() {
+		return consumerInfo;
+	}
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFactory.java?rev=418550&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFactory.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFactory.java Sat Jul  1 21:18:44 2006
@@ -0,0 +1,40 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.util.Map;
+
+import org.apache.activeio.command.WireFormat;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+
+/**
+ * A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class StompTransportFactory extends TcpTransportFactory {
+
+    protected String getDefaultWireFormatType() {
+        return "stomp";
+    }
+    
+    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+    	transport = new StompTransportFilter(transport);
+    	return super.compositeConfigure(transport, format, options);
+    }
+}
\ No newline at end of file

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java?rev=418550&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompTransportFilter.java Sat Jul  1 21:18:44 2006
@@ -0,0 +1,95 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.util.IOExceptionSupport;
+
+/**
+ * The StompTransportFilter normally sits on top of a TcpTransport
+ * that has been configured with the StompWireFormat and is used to
+ * convert STOMP commands to ActiveMQ commands.
+ * 
+ * All of the coversion work is done by delegating to the ProtocolConverter. 
+ *  
+ * @author <a href="http://hiramchirino.com">chirino</a> 
+ */
+public class StompTransportFilter extends TransportFilter {
+
+    ProtocolConverter protocolConverter = new ProtocolConverter();
+    
+    private final Object sendToActiveMQMutex = new Object();
+    private final Object sendToStompMutex = new Object();
+    
+	public StompTransportFilter(Transport next) {
+		super(next);
+		protocolConverter.setTransportFilter(this);
+	}
+
+	public void start() throws Exception {
+		super.start();
+	}
+	
+	public void stop() throws Exception {
+		super.stop();
+	}
+	
+	public void oneway(Command command) throws IOException {
+        try {
+        	protocolConverter.onActiveMQCommad(command);
+		} catch (JMSException e) {
+			throw IOExceptionSupport.create(e);
+		}
+	}
+	
+	public void onCommand(Command command) {
+        try {
+        	protocolConverter.onStompCommad((StompCommand) command);
+		} catch (IOException e) {
+			onException(e);
+		} catch (JMSException e) {
+			onException(IOExceptionSupport.create(e));
+		}
+	}
+	
+	public void onException(IOException error) {
+		protocolConverter.onStompExcepton(error);
+		transportListener.onException(error);
+	}
+
+
+	public void sendToActiveMQ(Command command) {
+		synchronized(sendToActiveMQMutex) {
+			transportListener.onCommand(command);
+		}
+	}
+	
+	public void sendToStomp(StompCommand command) throws IOException {
+		synchronized(sendToStompMutex) {
+			next.oneway(command);
+		}
+	}
+
+
+	
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java?rev=418550&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormat.java Sat Jul  1 21:18:44 2006
@@ -0,0 +1,200 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.ProtocolException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.activeio.adapter.PacketInputStream;
+import org.apache.activeio.command.WireFormat;
+import org.apache.activeio.packet.ByteArrayPacket;
+import org.apache.activeio.packet.ByteSequence;
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.util.ByteArrayOutputStream;
+import org.apache.activemq.transport.stomp.Stomp;
+
+/**
+ * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ */
+public class StompWireFormat implements WireFormat {
+
+    private static final byte[] NO_DATA = new byte[]{};
+	private static final byte[] END_OF_FRAME = new byte[]{0,'\n'};
+	
+	private static final int MAX_COMMAND_LENGTH = 1024;
+	private static final int MAX_HEADER_LENGTH = 1024*10;
+	private static final int MAX_HEADERS = 1000;
+	private static final int MAX_DATA_LENGTH = 1024*1024*100;
+    
+	private int version=1;
+
+	public Packet marshal(Object command) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        marshal(command, dos);
+        dos.close();
+        return new ByteArrayPacket(baos.toByteSequence());
+    }
+
+    public Object unmarshal(Packet packet) throws IOException {
+        PacketInputStream stream = new PacketInputStream(packet);
+        DataInputStream dis = new DataInputStream(stream);
+        return unmarshal(dis);
+    }
+
+    public void marshal(Object command, DataOutputStream os) throws IOException {
+		StompCommand stomp = (org.apache.activemq.transport.stomp2.StompCommand) command;
+
+		StringBuffer buffer = new StringBuffer();
+		buffer.append(stomp.getAction());
+		buffer.append(Stomp.NEWLINE);
+
+		// Output the headers.
+		for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
+			Map.Entry entry = (Map.Entry) iter.next();
+			buffer.append(entry.getKey());
+			buffer.append(Stomp.Headers.SEPERATOR);
+			buffer.append(entry.getValue());
+			buffer.append(Stomp.NEWLINE);
+		}
+
+		// Add a newline to seperate the headers from the content.
+		buffer.append(Stomp.NEWLINE);
+
+		os.write(buffer.toString().getBytes("UTF-8"));
+		os.write(stomp.getContent());
+		os.write(END_OF_FRAME);
+	}
+    
+
+    public Object unmarshal(DataInputStream in) throws IOException {
+        	
+        String action = null;
+        
+        // skip white space to next real action line
+		while (true) {
+			action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
+			if (action == null) {
+				throw new IOException("connection was closed");
+			} else {
+				action = action.trim();
+				if (action.length() > 0) {
+					break;
+				}
+			}
+		}
+		
+		// Parse the headers
+    	HashMap headers = new HashMap(25);
+        while (true) {
+            String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+            if (line != null && line.trim().length() > 0) {
+            	
+            	if( headers.size() > MAX_HEADERS )
+            		throw new ProtocolException("The maximum number of headers was exceeded");
+            	
+                try {
+                    int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
+                    String name = line.substring(0, seperator_index).trim();
+                    String value = line.substring(seperator_index + 1, line.length()).trim();
+                    headers.put(name, value);
+                }
+                catch (Exception e) {
+                    throw new ProtocolException("Unable to parser header line [" + line + "]");
+                }
+            }
+            else {
+                break;
+            }
+        }
+        
+        // Read in the data part.
+        byte[] data = NO_DATA;
+        String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
+        if (contentLength!=null) {
+            
+        	// Bless the client, he's telling us how much data to read in.        	
+        	int length;
+			try {
+				length = Integer.parseInt(contentLength.trim());
+			} catch (NumberFormatException e) {
+				throw new ProtocolException("Specified content-length is not a valid integer");
+			}
+
+			if( length > MAX_DATA_LENGTH )
+        		throw new ProtocolException("The maximum data length was exceeded");
+			
+            data = new byte[length];
+            in.readFully(data);
+            
+            if (in.readByte() != 0) {
+                throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH+" bytes were read and " + "there was no trailing null byte");
+            }
+        
+        } else {
+
+        	// We don't know how much to read.. data ends when we hit a 0
+            byte b;
+            ByteArrayOutputStream baos=null;
+            while ((b = in.readByte()) != 0) {
+    			
+    			if( baos == null ) {
+            		baos = new ByteArrayOutputStream();
+            	} else if( baos.size() > MAX_DATA_LENGTH ) {
+            		throw new ProtocolException("The maximum data length was exceeded");
+            	}
+            
+                baos.write(b);
+            }
+            
+            if( baos!=null ) {
+	            baos.close();
+	            data = baos.toByteArray();
+            }
+            
+        }
+        
+        return new StompCommand(action, headers, data); 
+
+    }
+
+    private String readLine(DataInputStream in, int maxLength, String errorMessage) throws IOException {
+        byte b;
+        ByteArrayOutputStream baos=new ByteArrayOutputStream(maxLength);
+        while ((b = in.readByte()) != '\n') {
+        	if( baos.size() > maxLength )
+        		throw new ProtocolException(errorMessage);
+            baos.write(b);
+        }
+        ByteSequence sequence = baos.toByteSequence();
+		return new String(sequence.getData(),sequence.getOffset(),sequence.getLength(),"UTF-8");
+	}
+
+	public int getVersion() {
+        return version;
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormatFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormatFactory.java?rev=418550&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormatFactory.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp2/StompWireFormatFactory.java Sat Jul  1 21:18:44 2006
@@ -0,0 +1,29 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp2;
+
+import org.apache.activeio.command.WireFormat;
+import org.apache.activeio.command.WireFormatFactory;
+
+/**
+ * Creates WireFormat objects that marshalls the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ */
+public class StompWireFormatFactory implements WireFormatFactory {
+    public WireFormat createWireFormat() {
+        return new StompWireFormat();
+    }
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp?rev=418550&r1=418549&r2=418550&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp (original)
+++ incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp Sat Jul  1 21:18:44 2006
@@ -1 +1 @@
-class=org.apache.activemq.transport.stomp.StompTransportFactory
+class=org.apache.activemq.transport.stomp2.StompTransportFactory

Modified: incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp?rev=418550&r1=418549&r2=418550&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp (original)
+++ incubator/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/wireformat/stomp Sat Jul  1 21:18:44 2006
@@ -1 +1 @@
-class=org.apache.activemq.transport.stomp.StompWireFormatFactory
\ No newline at end of file
+class=org.apache.activemq.transport.stomp2.StompWireFormatFactory
\ No newline at end of file



Mime
View raw message