activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bri...@apache.org
Subject svn commit: r453497 - in /incubator/activemq/trunk: ./ activemq-console/ activemq-core/src/main/java/org/apache/activemq/transport/stomp/ activemq-core/src/test/java/org/apache/activemq/transport/stomp/ activemq-openwire-generator/ activemq-optional/ a...
Date Fri, 06 Oct 2006 04:55:05 GMT
Author: brianm
Date: Thu Oct  5 21:55:04 2006
New Revision: 453497

URL: http://svn.apache.org/viewvc?view=rev&rev=453497
Log:
Initial work on making the stomp frame and destination conversion in and out of activemq pluggable.
The only implementation is the 4.0 style, but adding others is doable now.

Configuration is done on a per-TransportFactory basis right now, this might need some tweaking.
Will discuss on list

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
Modified:
    incubator/activemq/trunk/   (props changed)
    incubator/activemq/trunk/activemq-console/   (props changed)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java
    incubator/activemq/trunk/activemq-openwire-generator/   (props changed)
    incubator/activemq/trunk/activemq-optional/   (props changed)
    incubator/activemq/trunk/activemq-rar/   (props changed)
    incubator/activemq/trunk/activemq-tooling/   (props changed)
    incubator/activemq/trunk/activemq-tooling/maven-activemq-memtest-plugin/   (props changed)
    incubator/activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/   (props changed)
    incubator/activemq/trunk/activemq-tooling/maven-activemq-plugin/   (props changed)
    incubator/activemq/trunk/activemq-web/   (props changed)
    incubator/activemq/trunk/activemq-web-console/   (props changed)
    incubator/activemq/trunk/activemq-web-demo/   (props changed)

Propchange: incubator/activemq/trunk/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Oct  5 21:55:04 2006
@@ -1,3 +1,8 @@
+maven-bundle-plugin
+target
+*.iml
+*.iws
+*.ipr
 xdocs
 activemq-data
 foo

Propchange: incubator/activemq/trunk/activemq-console/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Oct  5 21:55:04 2006
@@ -1,4 +1,4 @@
-
+*.iml
 target
 .classpath
 .wtpmodules

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java?view=auto&rev=453497
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
Thu Oct  5 21:55:04 2006
@@ -0,0 +1,109 @@
+package org.apache.activemq.transport.stomp;
+
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+
+import javax.jms.JMSException;
+import javax.jms.Destination;
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * Implementations of this interface are used to map back and forth from Stomp to ActiveMQ.
+ * There are several standard mappings which are semantically the same, the inner class,
+ * Helper, provides functions to copy those properties from one to the other
+ */
+public interface FrameTranslator
+{
+    public ActiveMQMessage convertFrame(StompFrame frame) throws JMSException, ProtocolException;
+
+    public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException;
+
+    public String convertDestination(Destination d);
+
+    public ActiveMQDestination convertDestination(String name) throws ProtocolException;
+
+    /**
+     * Helper class which holds commonly needed functions used when implementing
+     * FrameTranslators
+     */
+    public final static class Helper
+    {
+        public static void copyStandardHeadersFromMessageToFrame(ActiveMQMessage message,
+                                                                 StompFrame command,
+                                                                 FrameTranslator ft)
+                throws IOException
+        {
+            final Map headers = command.getHeaders();
+            headers.put(Stomp.Headers.Message.DESTINATION, ft.convertDestination(message.getDestination()));
+            headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID());
+
+            if (message.getJMSCorrelationID() != null) {
+                headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getJMSCorrelationID());
+            }
+            headers.put(Stomp.Headers.Message.EXPIRATION_TIME, ""+message.getJMSExpiration());
+
+            if (message.getJMSRedelivered()) {
+                headers.put(Stomp.Headers.Message.REDELIVERED, "true");
+            }
+            headers.put(Stomp.Headers.Message.PRORITY, ""+message.getJMSPriority());
+
+            if (message.getJMSReplyTo() != null) {
+                headers.put(Stomp.Headers.Message.REPLY_TO, ft.convertDestination(message.getJMSReplyTo()));
+            }
+            headers.put(Stomp.Headers.Message.TIMESTAMP, ""+message.getJMSTimestamp());
+
+            if (message.getJMSType() != null) {
+                headers.put(Stomp.Headers.Message.TYPE, message.getJMSType());
+            }
+
+            // now lets add all the message headers
+            final Map properties = message.getProperties();
+            if (properties != null) {
+                headers.putAll(properties);
+            }
+        }
+
+        public static void copyStandardHeadersFromFrameToMessage(StompFrame command,
+                                                                 ActiveMQMessage msg,
+                                                                 FrameTranslator ft)
+                throws ProtocolException, JMSException
+        {
+            final Map headers = new HashMap(command.getHeaders());
+            final String destination = (String) headers.remove(Stomp.Headers.Send.DESTINATION);
+            msg.setDestination( ft.convertDestination(destination));
+
+            // the standard JMS headers
+            msg.setJMSCorrelationID((String) headers.remove(Stomp.Headers.Send.CORRELATION_ID));
+
+            Object o = headers.remove(Stomp.Headers.Send.EXPIRATION_TIME);
+            if (o != null) {
+                msg.setJMSExpiration(Long.parseLong((String) o));
+            }
+
+            o = headers.remove(Stomp.Headers.Send.PRIORITY);
+            if (o != null) {
+                msg.setJMSPriority(Integer.parseInt((String) o));
+            }
+
+            o = headers.remove(Stomp.Headers.Send.TYPE);
+            if (o != null) {
+                msg.setJMSType((String) o);
+            }
+
+            o = headers.remove(Stomp.Headers.Send.REPLY_TO);
+            if (o != null) {
+                msg.setJMSReplyTo(ft.convertDestination((String) o));
+            }
+
+            o = headers.remove(Stomp.Headers.Send.PERSISTENT);
+            if (o != null) {
+                msg.setPersistent("true".equals(o));
+            }
+
+            // now the general headers
+            msg.setProperties(headers);
+        }
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java?view=auto&rev=453497
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
Thu Oct  5 21:55:04 2006
@@ -0,0 +1,101 @@
+package org.apache.activemq.transport.stomp;
+
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+
+import javax.jms.JMSException;
+import javax.jms.Destination;
+import java.util.Map;
+import java.util.HashMap;
+import java.io.IOException;
+
+/**
+ * Implements ActiveMQ 4.0 translations
+ */
+public class LegacyFrameTranslator implements FrameTranslator
+{
+    public ActiveMQMessage convertFrame(StompFrame command) throws JMSException, ProtocolException
{
+        final Map headers = command.getHeaders();
+        final ActiveMQMessage msg;
+        if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
+            headers.remove(Stomp.Headers.CONTENT_LENGTH);
+            ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
+            bm.writeBytes(command.getContent());
+            msg = bm;
+        } else {
+            ActiveMQTextMessage text = new ActiveMQTextMessage();
+            try {
+                text.setText(new String(command.getContent(), "UTF-8"));
+            }
+            catch (Throwable e) {
+                throw new ProtocolException("Text could not bet set: " + e, false, e);
+            }
+            msg = text;
+        }
+        FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(command, msg, this);
+        return msg;
+    }
+
+    public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException
{
+        StompFrame command = new StompFrame();
+		command.setAction(Stomp.Responses.MESSAGE);
+        Map headers = new HashMap(25);
+        command.setHeaders(headers);
+
+        FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(message, command, this);
+
+        if( message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE ) {
+
+            ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
+            command.setContent(msg.getText().getBytes("UTF-8"));
+
+        } else if( message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE
) {
+
+        	ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
+            byte[] data = new byte[(int)msg.getBodyLength()];
+            msg.readBytes(data);
+
+            headers.put(Stomp.Headers.CONTENT_LENGTH, ""+data.length);
+            command.setContent(data);
+        }
+        return command;
+    }
+
+    public String convertDestination(Destination d) {
+        if (d == null) {
+            return null;
+        }
+        ActiveMQDestination amq_d = (ActiveMQDestination) d;
+        String p_name = amq_d.getPhysicalName();
+
+        StringBuffer buffer = new StringBuffer();
+        if (amq_d.isQueue()) {
+            buffer.append("/queue/");
+        }
+        if (amq_d.isTopic()) {
+            buffer.append("/topic/");
+        }
+        buffer.append(p_name);
+        return buffer.toString();
+    }
+
+    public ActiveMQDestination convertDestination(String name) throws ProtocolException {
+        if (name == null) {
+            return null;
+        }
+        else if (name.startsWith("/queue/")) {
+            String q_name = name.substring("/queue/".length(), name.length());
+            return ActiveMQDestination.createDestination(q_name, ActiveMQDestination.QUEUE_TYPE);
+        }
+        else if (name.startsWith("/topic/")) {
+            String t_name = name.substring("/topic/".length(), name.length());
+            return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TOPIC_TYPE);
+        }
+        else {
+            throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ
STOMP destinations " +
+                                        "must begine with /queue/ or /topic/");
+        }
+    }
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?view=diff&rev=453497&r1=453496&r2=453497
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Thu Oct  5 21:55:04 2006
@@ -57,28 +57,35 @@
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * 
- * @author <a href="http://hiramchirino.com">chirino</a> 
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
  */
 public class ProtocolConverter {
-	
+
     private static final IdGenerator connectionIdGenerator = new IdGenerator();
     private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
     private final SessionId sessionId = new SessionId(connectionId, -1);
     private final ProducerId producerId = new ProducerId(sessionId, 1);
-    
+
     private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
     private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
     private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
-	
+
     private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap();
     private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap();
     private final Map transactions = new ConcurrentHashMap();
-	private StompTransportFilter transportFilter;
-	
+	private final StompTransportFilter transportFilter;
+
 	private final Object commnadIdMutex = new Object();
 	private int lastCommandId;
     private final AtomicBoolean connected = new AtomicBoolean(false);
+    private final FrameTranslator frameTranslator;
+
+    public ProtocolConverter(StompTransportFilter stompTransportFilter, FrameTranslator translator)
+    {
+        this.transportFilter = stompTransportFilter;
+        this.frameTranslator = translator;
+    }
 
     protected int generateCommandId() {
     	synchronized(commnadIdMutex){
@@ -102,7 +109,7 @@
 	    }
     	return null;
     }
-    
+
 	protected void sendToActiveMQ(Command command, ResponseHandler handler) {
 		command.setCommandId(generateCommandId());
 		if(handler!=null) {
@@ -122,11 +129,11 @@
      */
 	public void onStompCommad( StompFrame command ) throws IOException, JMSException {
 		try {
-			
+
 			if( command.getClass() == StompFrameError.class ) {
 				throw ((StompFrameError)command).getException();
 			}
-			
+
 			String action = command.getAction();
 	        if (action.startsWith(Stomp.Commands.SEND))
 	            onStompSend(command);
@@ -148,9 +155,9 @@
 	            onStompDisconnect(command);
 	        else
 	        	throw new ProtocolException("Unknown STOMP action: "+action);
-	        
+
         } catch (ProtocolException e) {
-        	
+
         	// Let the stomp client know about any protocol errors.
         	ByteArrayOutputStream baos = new ByteArrayOutputStream();
         	PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos,"UTF-8"));
@@ -159,20 +166,20 @@
 
         	HashMap headers = new HashMap();
         	headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
-        	
+
             final String receiptId = (String) command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
-            if( receiptId != null ) {            	
+            if( receiptId != null ) {
             	headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
             }
-        	
+
         	StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR,headers,baos.toByteArray());
 			sendToStomp(errorMessage);
-			
+
 			if( e.isFatal() )
 				getTransportFilter().onException(e);
         }
 	}
-	
+
 	protected void onStompSend(StompFrame command) throws IOException, JMSException {
 		checkConnected();
 
@@ -192,12 +199,12 @@
                 throw new ProtocolException("Invalid transaction id: "+stompTx);
             message.setTransactionId(activemqTx);
         }
-		
+
         message.onSend();
 		sendToActiveMQ(message, createResponseHandler(command));
-		
+
 	}
-	
+
 
     protected void onStompAck(StompFrame command) throws ProtocolException {
 		checkConnected();
@@ -205,7 +212,7 @@
     	// TODO: acking with just a message id is very bogus
     	// since the same message id could have been sent to 2 different subscriptions
     	// on the same stomp connection. For example, when 2 subs are created on the same topic.
-    	
+
     	Map headers = command.getHeaders();
         String messageId = (String) headers.get(Stomp.Headers.Ack.MESSAGE_ID);
         if (messageId == null)
@@ -230,98 +237,94 @@
 		        break;
 			}
 		}
-        
+
         if( !acked )
         	throw new ProtocolException("Unexpected ACK received for message-id [" + messageId
+ "]");
 
 	}
-    
+
 
 	protected void onStompBegin(StompFrame command) throws ProtocolException {
 		checkConnected();
 
 		Map headers = command.getHeaders();
-		
+
         String stompTx = (String)headers.get(Stomp.Headers.TRANSACTION);
-        
+
         if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
             throw new ProtocolException("Must specify the transaction you are beginning");
         }
-        
+
         if( transactions.get(stompTx)!=null  ) {
             throw new ProtocolException("The transaction was allready started: "+stompTx);
         }
-        
+
         LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
         transactions.put(stompTx, activemqTx);
-        
+
         TransactionInfo tx = new TransactionInfo();
         tx.setConnectionId(connectionId);
         tx.setTransactionId(activemqTx);
         tx.setType(TransactionInfo.BEGIN);
-        
+
 		sendToActiveMQ(tx, createResponseHandler(command));
-		
+
 	}
-	
+
 	protected void onStompCommit(StompFrame command) throws ProtocolException {
 		checkConnected();
 
 		Map headers = command.getHeaders();
-		
+
         String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
         if (stompTx==null) {
             throw new ProtocolException("Must specify the transaction you are committing");
         }
-        
-        TransactionId activemqTx=null;
-        if (stompTx!=null) {
-        	activemqTx = (TransactionId) transactions.remove(stompTx);
-            if (activemqTx == null)
-                throw new ProtocolException("Invalid transaction id: "+stompTx);
+
+        TransactionId activemqTx = (TransactionId) transactions.remove(stompTx);
+        if (activemqTx == null) {
+            throw new ProtocolException("Invalid transaction id: "+stompTx);
         }
 
         TransactionInfo tx = new TransactionInfo();
         tx.setConnectionId(connectionId);
         tx.setTransactionId(activemqTx);
         tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
-		
+
 		sendToActiveMQ(tx, createResponseHandler(command));
 	}
 
 	protected void onStompAbort(StompFrame command) throws ProtocolException {
 		checkConnected();
     	Map headers = command.getHeaders();
-		
+
         String stompTx = (String) headers.get(Stomp.Headers.TRANSACTION);
         if (stompTx==null) {
             throw new ProtocolException("Must specify the transaction you are committing");
         }
-        
-        TransactionId activemqTx=null;
-        if (stompTx!=null) {
-        	activemqTx = (TransactionId) transactions.remove(stompTx);
-            if (activemqTx == null)
-                throw new ProtocolException("Invalid transaction id: "+stompTx);
+
+        TransactionId activemqTx = (TransactionId) transactions.remove(stompTx);
+        if (activemqTx == null) {
+            throw new ProtocolException("Invalid transaction id: "+stompTx);
         }
 
         TransactionInfo tx = new TransactionInfo();
         tx.setConnectionId(connectionId);
         tx.setTransactionId(activemqTx);
         tx.setType(TransactionInfo.ROLLBACK);
-		
+
 		sendToActiveMQ(tx, createResponseHandler(command));
-		
+
 	}
 
 	protected void onStompSubscribe(StompFrame command) throws ProtocolException {
 		checkConnected();
     	Map headers = command.getHeaders();
-        
+
         String subscriptionId = (String)headers.get(Stomp.Headers.Subscribe.ID);
         String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
-        
-        ActiveMQDestination actual_dest = convertDestination(destination);
+
+        ActiveMQDestination actual_dest = frameTranslator.convertDestination(destination);
         ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
         ConsumerInfo consumerInfo = new ConsumerInfo(id);
         consumerInfo.setPrefetchSize(1000);
@@ -329,14 +332,14 @@
 
         String selector = (String) headers.remove(Stomp.Headers.Subscribe.SELECTOR);
         consumerInfo.setSelector(selector);
-        
+
         IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
-        
-        consumerInfo.setDestination(convertDestination(destination));
-                
+
+        consumerInfo.setDestination(frameTranslator.convertDestination(destination));
+
         StompSubscription stompSubscription = new StompSubscription(this, subscriptionId,
consumerInfo);
         stompSubscription.setDestination(actual_dest);
-        
+
         String ackMode = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
         if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
             stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
@@ -346,7 +349,7 @@
 
         subscriptionsByConsumerId.put(id, stompSubscription);
 		sendToActiveMQ(consumerInfo, createResponseHandler(command));
-		
+
 	}
 
 	protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
@@ -355,11 +358,11 @@
 
         ActiveMQDestination destination=null;
         Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
-        if( o!=null ) 
-        	destination =convertDestination((String) o);
-        
+        if( o!=null )
+        	destination = frameTranslator.convertDestination((String) o);
+
         String subscriptionId = (String)headers.get(Stomp.Headers.Unsubscribe.ID);
-        
+
         if (subscriptionId==null && destination==null) {
             throw new ProtocolException("Must specify the subscriptionId or the destination
you are unsubscribing from");
         }
@@ -369,7 +372,7 @@
         //
         for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();)
{
 			StompSubscription sub = (StompSubscription) iter.next();
-			if ( 
+			if (
 				(subscriptionId!=null && subscriptionId.equals(sub.getSubscriptionId()) ) ||
 				(destination!=null && destination.equals(sub.getDestination()) )
 			) {
@@ -377,7 +380,7 @@
 				return;
 			}
 		}
-        
+
         throw new ProtocolException("No subscription matched.");
 	}
 
@@ -388,56 +391,56 @@
 		}
 
     	final Map headers = command.getHeaders();
-        
+
         // allow anyone to login for now
         String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
         String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
         String clientId = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
-        
+
         final ConnectionInfo connectionInfo = new ConnectionInfo();
-        
+
         IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
-        
+
         connectionInfo.setConnectionId(connectionId);
         if( clientId!=null )
             connectionInfo.setClientId(clientId);
         else
             connectionInfo.setClientId(""+connectionInfo.getConnectionId().toString());
-        
+
         connectionInfo.setResponseRequired(true);
         connectionInfo.setUserName(login);
         connectionInfo.setPassword(passcode);
 
 		sendToActiveMQ(connectionInfo, new ResponseHandler(){
 			public void onResponse(ProtocolConverter converter, Response response) throws IOException
{
-					            
+
 	            final SessionInfo sessionInfo = new SessionInfo(sessionId);
 	            sendToActiveMQ(sessionInfo,null);
-	            
-	            
+
+
 	            final ProducerInfo producerInfo = new ProducerInfo(producerId);
 	            sendToActiveMQ(producerInfo,new ResponseHandler(){
 					public void onResponse(ProtocolConverter converter, Response response) throws IOException
{
-						
+
 						connected.set(true);
 	                    HashMap responseHeaders = new HashMap();
-	                    
+
 	                    responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
 	                    String requestId = (String) headers.get(Stomp.Headers.Connect.REQUEST_ID);
 	                    if( requestId !=null ){
 		                    responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
 	            		}
-	                    
+
 	                    StompFrame sc = new StompFrame();
 	                    sc.setAction(Stomp.Responses.CONNECTED);
 	                    sc.setHeaders(responseHeaders);
 	                    sendToStomp(sc);
 					}
 				});
-	            
+
 			}
 		});
-		
+
 	}
 
 	protected void onStompDisconnect(StompFrame command) throws ProtocolException {
@@ -454,182 +457,42 @@
 	}
 
 	/**
-     * Convert a ActiveMQ command
+     * Dispatch a ActiveMQ command
      * @param command
-     * @throws IOException 
+     * @throws IOException
      */
 	public void onActiveMQCommad( Command command ) throws IOException, JMSException {
-		
+
     	if ( command.isResponse() ) {
-		    
+
 			Response response = (Response) command;
 		    ResponseHandler rh = (ResponseHandler) resposeHandlers.remove(new Integer(response.getCorrelationId()));
 		    if( rh !=null ) {
 		    	rh.onResponse(this, response);
 		    }
-		    
+
 		} else if( command.isMessageDispatch() ) {
-			
+
 		    MessageDispatch md = (MessageDispatch)command;
 		    StompSubscription sub = (StompSubscription) subscriptionsByConsumerId.get(md.getConsumerId());
-		    if (sub != null)
+		    if (sub != null) {
 		        sub.onMessageDispatch(md);
-		    
-		}
-		
-	}
-
-	public  ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException
{
-		Map headers = command.getHeaders();
-        
-        // now the body
-        ActiveMQMessage msg;
-        if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
-            headers.remove(Stomp.Headers.CONTENT_LENGTH);
-            ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
-            bm.writeBytes(command.getContent());
-            msg = bm;
-        } else {
-            ActiveMQTextMessage text = new ActiveMQTextMessage();
-            try {
-				text.setText(new String(command.getContent(), "UTF-8"));
-			} catch (Throwable e) {
-				throw new ProtocolException("Text could not bet set: "+e, false, e);
-			}
-            msg = text;
-        }
-
-        String destination = (String) headers.remove(Stomp.Headers.Send.DESTINATION);
-        msg.setDestination(convertDestination(destination));
-
-        // the standard JMS headers
-        msg.setJMSCorrelationID((String) headers.remove(Stomp.Headers.Send.CORRELATION_ID));
-
-        Object o = headers.remove(Stomp.Headers.Send.EXPIRATION_TIME);
-        if (o != null) {
-            msg.setJMSExpiration(Long.parseLong((String) o));
-        }
-        
-        o = headers.remove(Stomp.Headers.Send.PRIORITY);
-        if (o != null) {
-            msg.setJMSPriority(Integer.parseInt((String)o));
-        }
-        
-        o = headers.remove(Stomp.Headers.Send.TYPE);
-        if (o != null) {
-            msg.setJMSType((String) o);
-        }
-        
-        o = headers.remove(Stomp.Headers.Send.REPLY_TO);
-        if( o!=null ) {
-        	msg.setJMSReplyTo(convertDestination((String)o));
-        }
-
-        o = headers.remove(Stomp.Headers.Send.PERSISTENT);
-        if (o != null) {
-            msg.setPersistent("true".equals(o));
+            }
         }
-        
-        // now the general headers
-        msg.setProperties(headers);
-        
-        return msg;        
 	}
-	
-	public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException
{
-
-		StompFrame command = new StompFrame();
-		command.setAction(Stomp.Responses.MESSAGE);
-		
-		HashMap headers = new HashMap();
-		command.setHeaders(headers);
-		
-        headers.put(Stomp.Headers.Message.DESTINATION, convertDestination(message.getDestination()));
-        headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID());
-        if (message.getJMSCorrelationID() != null) {
-            headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getJMSCorrelationID());
-        }
-        headers.put(Stomp.Headers.Message.EXPIRATION_TIME, ""+message.getJMSExpiration());
-        if (message.getJMSRedelivered()) {
-            headers.put(Stomp.Headers.Message.REDELIVERED, "true");
-        }
-        headers.put(Stomp.Headers.Message.PRORITY, ""+message.getJMSPriority());
-        if (message.getJMSReplyTo() != null) {
-            headers.put(Stomp.Headers.Message.REPLY_TO, convertDestination(message.getJMSReplyTo()));
-        }
-        headers.put(Stomp.Headers.Message.TIMESTAMP, ""+message.getJMSTimestamp());
-        if (message.getJMSType() != null) {
-            headers.put(Stomp.Headers.Message.TYPE, message.getJMSType());
-        }
-
-        // now lets add all the message headers
-        Map properties = message.getProperties();
-        if (properties != null) {
-            headers.putAll(properties);
-        }
-        
-        if( message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE ) {
-        	
-            ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
-            command.setContent(msg.getText().getBytes("UTF-8"));
-            
-        } else if( message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE
) {
-            
-        	ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
-            byte[] data = new byte[(int)msg.getBodyLength()]; 
-            msg.readBytes(data);
-
-            headers.put(Stomp.Headers.CONTENT_LENGTH, ""+data.length);
-            command.setContent(data);
-            
-        }
-
-        return command;		
-	}
-	
-    protected ActiveMQDestination convertDestination(String name) throws ProtocolException
{
-        if (name == null) {
-            return null;
-        }
-        else if (name.startsWith("/queue/")) {
-            String q_name = name.substring("/queue/".length(), name.length());
-            return ActiveMQDestination.createDestination(q_name, ActiveMQDestination.QUEUE_TYPE);
-        }
-        else if (name.startsWith("/topic/")) {
-            String t_name = name.substring("/topic/".length(), name.length());
-            return ActiveMQDestination.createDestination(t_name, ActiveMQDestination.TOPIC_TYPE);
-        }
-        else {
-            throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ
STOMP destinations " + "must begine with /queue/ or /topic/");
-        }
 
-    }
+    public  ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException
{
 
-    protected String convertDestination(Destination d) {
-        if (d == null) {
-            return null;
-        }
-        ActiveMQDestination amq_d = (ActiveMQDestination) d;
-        String p_name = amq_d.getPhysicalName();
+        ActiveMQMessage msg = frameTranslator.convertFrame(command);
 
-        StringBuffer buffer = new StringBuffer();
-        if (amq_d.isQueue()) {
-            buffer.append("/queue/");
-        }
-        if (amq_d.isTopic()) {
-            buffer.append("/topic/");
-        }
-        buffer.append(p_name);
+        return msg;
+    }
 
-        return buffer.toString();
+	public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException
{
+		return frameTranslator.convertMessage(message);
     }
 
 	public StompTransportFilter getTransportFilter() {
 		return transportFilter;
 	}
-
-	public void setTransportFilter(StompTransportFilter transportFilter) {
-		this.transportFilter = transportFilter;
-	}
-	
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java?view=diff&rev=453497&r1=453496&r2=453497
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
Thu Oct  5 21:55:04 2006
@@ -25,7 +25,7 @@
 
 /**
  * A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
- * 
+ *
  * @version $Revision: 1.1.1.1 $
  */
 public class StompTransportFactory extends TcpTransportFactory {
@@ -33,9 +33,9 @@
     protected String getDefaultWireFormatType() {
         return "stomp";
     }
-    
+
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options)
{
-    	transport = new StompTransportFilter(transport);
+    	transport = new StompTransportFilter(transport, new LegacyFrameTranslator());
     	return super.compositeConfigure(transport, format, options);
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java?view=diff&rev=453497&r1=453496&r2=453497
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
Thu Oct  5 21:55:04 2006
@@ -30,22 +30,25 @@
  * The StompTransportFilter normally sits on top of a TcpTransport
  * that has been configured with the StompWireFormat and is used to
  * convert STOMP commands to ActiveMQ commands.
- * 
- * All of the coversion work is done by delegating to the ProtocolConverter. 
- *  
- * @author <a href="http://hiramchirino.com">chirino</a> 
+ *
+ * All of the coversion work is done by delegating to the ProtocolConverter.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
  */
 public class StompTransportFilter extends TransportFilter {
 
-    ProtocolConverter protocolConverter = new ProtocolConverter();
-    
+    private final ProtocolConverter protocolConverter;
+
     private final Object sendToActiveMQMutex = new Object();
     private final Object sendToStompMutex = new Object();
-    
-	public StompTransportFilter(Transport next) {
+
+    private final FrameTranslator frameTranslator;
+
+    public StompTransportFilter(Transport next, FrameTranslator translator) {
 		super(next);
-		protocolConverter.setTransportFilter(this);
-	}
+        this.frameTranslator = translator;
+        this.protocolConverter = new ProtocolConverter(this, translator);
+    }
 
 	public void oneway(Command command) throws IOException {
         try {
@@ -54,7 +57,7 @@
 			throw IOExceptionSupport.create(e);
 		}
 	}
-	
+
 	public void onCommand(Command command) {
         try {
         	protocolConverter.onStompCommad((StompFrame) command);
@@ -64,17 +67,21 @@
 			onException(IOExceptionSupport.create(e));
 		}
 	}
-	
+
 	public void sendToActiveMQ(Command command) {
 		synchronized(sendToActiveMQMutex) {
 			transportListener.onCommand(command);
 		}
 	}
-	
+
 	public void sendToStomp(StompFrame command) throws IOException {
 		synchronized(sendToStompMutex) {
 			next.oneway(command);
 		}
 	}
 
+    public FrameTranslator getFrameTranslator()
+    {
+        return frameTranslator;
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java?view=diff&rev=453497&r1=453496&r2=453497
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java
Thu Oct  5 21:55:04 2006
@@ -40,7 +40,7 @@
 import org.apache.commons.logging.LogFactory;
 
 /**
- * 
+ *
  * @version $Revision$
  */
 public class StompSubscriptionRemoveTest extends TestCase {
@@ -49,10 +49,7 @@
     private Socket stompSocket;
     private ByteArrayOutputStream inputBuffer;
 
-    /**
-     * @param args
-     * @throws Exception
-     */
+    
     public void testRemoveSubscriber() throws Exception {
         BrokerService broker = new BrokerService();
         broker.setPersistent(false);
@@ -115,7 +112,7 @@
             ++messagesCount;
             ++count;
         }
-        
+
         sendFrame("DISCONNECT\n\n");
         Thread.sleep(1000);
         stompSocket.close();
@@ -127,7 +124,7 @@
         sendFrame(connect_frame);
 
         f = receiveFrame(5000);
-        
+
         frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n" + "ack:client\n\n";
         sendFrame(frame);
         try {
@@ -147,7 +144,7 @@
                         }
                     }
                 }
-                                
+
                 line = input.readLine();
                 if (line == null) {
                     throw new IOException("connection was closed");
@@ -166,11 +163,11 @@
         catch (IOException ex) {
             ex.printStackTrace();
         }
-                
+
         sendFrame("DISCONNECT\n\n");
         stompSocket.close();
         broker.stop();
-        
+
         log.info("Total messages received: " + messagesCount);
         assertTrue("Messages received after connection loss: " + messagesCount, messagesCount
>= 2000);
 

Propchange: incubator/activemq/trunk/activemq-openwire-generator/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Oct  5 21:55:04 2006
@@ -1,4 +1,4 @@
-
+*.iml
 .settings
 eclipse-classes
 target

Propchange: incubator/activemq/trunk/activemq-optional/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Oct  5 21:55:04 2006
@@ -1,4 +1,4 @@
-
+*.iml
 .project
 .classpath
 target

Propchange: incubator/activemq/trunk/activemq-rar/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Oct  5 21:55:04 2006
@@ -1,4 +1,4 @@
-
+*.iml
 target
 .settings
 .project

Propchange: incubator/activemq/trunk/activemq-tooling/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Oct  5 21:55:04 2006
@@ -0,0 +1 @@
+*.iml

Propchange: incubator/activemq/trunk/activemq-tooling/maven-activemq-memtest-plugin/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Oct  5 21:55:04 2006
@@ -0,0 +1,2 @@
+*.iml
+target

Propchange: incubator/activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Oct  5 21:55:04 2006
@@ -0,0 +1,2 @@
+target
+*.iml

Propchange: incubator/activemq/trunk/activemq-tooling/maven-activemq-plugin/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Oct  5 21:55:04 2006
@@ -1,4 +1,4 @@
-
+*.iml
 .settings
 eclipse-classes
 target

Propchange: incubator/activemq/trunk/activemq-web/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Oct  5 21:55:04 2006
@@ -1,4 +1,4 @@
-
+*.iml
 target
 .project
 .classpath

Propchange: incubator/activemq/trunk/activemq-web-console/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Oct  5 21:55:04 2006
@@ -1,4 +1,4 @@
-
+*.iml
 activemq-data
 target
 .settings

Propchange: incubator/activemq/trunk/activemq-web-demo/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Oct  5 21:55:04 2006
@@ -1,4 +1,5 @@
-
+*.iml
+*.ipr
 target
 activemq-data
 .settings



Mime
View raw message