activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r612544 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/transport/stomp/ src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/ src/test/java/org/apache/activemq/transport/stomp/
Date Wed, 16 Jan 2008 19:15:54 GMT
Author: chirino
Date: Wed Jan 16 11:15:50 2008
New Revision: 612544

URL: http://svn.apache.org/viewvc?rev=612544&view=rev
Log:
Applying patch at https://issues.apache.org/activemq/browse/AMQ-943

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/XStreamFrameTranslator.java
  (with props)
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java
  (with props)
Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=612544&r1=612543&r2=612544&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Wed Jan 16 11:15:50 2008
@@ -108,6 +108,12 @@
       <artifactId>commons-pool</artifactId>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>com.thoughtworks.xstream</groupId>
+      <artifactId>xstream</artifactId>
+      <version>1.2.2</version>
+      <optional>true</optional>
+    </dependency>        
 
     <!-- for XML parsing -->
     <dependency>
@@ -169,6 +175,18 @@
       <optional>false</optional>
       <type>test-jar</type>
     </dependency>
+	<dependency>
+		<groupId>org.codehaus.jettison</groupId>
+		<artifactId>jettison</artifactId>
+		<version>1.0-RC1</version>
+		<scope>test</scope>
+	</dependency>	
+	<dependency>
+		<groupId>stax</groupId>
+		<artifactId>stax-api</artifactId>
+		<version>1.0.1</version>
+		<scope>test</scope>
+	</dependency>
 
     <!-- testing camel helpers -->
     <dependency>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=612544&r1=612543&r2=612544&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Wed Jan 16 11:15:50 2008
@@ -52,6 +52,7 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.FactoryFinder;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.IntrospectionSupport;
@@ -84,6 +85,7 @@
     private int lastCommandId;
     private final AtomicBoolean connected = new AtomicBoolean(false);
     private final FrameTranslator frameTranslator;
+    private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
 
     public ProtocolConverter(StompTransportFilter stompTransportFilter, FrameTranslator translator)
{
         this.transportFilter = stompTransportFilter;
@@ -131,12 +133,26 @@
     protected void sendToStomp(StompFrame command) throws IOException {
         transportFilter.sendToStomp(command);
     }
+    
+    protected FrameTranslator findTranslator(String header) {
+		FrameTranslator translator = frameTranslator;
+		try {
+			if (header != null) {
+				translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
+						.newInstance(header);
+			}
+		} catch (Exception ignore) {
+			// if anything goes wrong use the default translator
+		}
+		
+		return translator;
+	}
 
     /**
-     * Convert a stomp command
-     * 
-     * @param command
-     */
+	 * Convert a stomp command
+	 * 
+	 * @param command
+	 */
     public void onStompCommad(StompFrame command) throws IOException, JMSException {
         try {
 
@@ -340,12 +356,13 @@
 
     protected void onStompSubscribe(StompFrame command) throws ProtocolException {
         checkConnected();
+        FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
         Map<String, String> headers = command.getHeaders();
 
         String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
         String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
 
-        ActiveMQDestination actualDest = frameTranslator.convertDestination(this, destination);
+        ActiveMQDestination actualDest = translator.convertDestination(this, destination);
         ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
         ConsumerInfo consumerInfo = new ConsumerInfo(id);
         consumerInfo.setPrefetchSize(1000);
@@ -356,9 +373,9 @@
 
         IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
 
-        consumerInfo.setDestination(frameTranslator.convertDestination(this, destination));
+        consumerInfo.setDestination(translator.convertDestination(this, destination));
 
-        StompSubscription stompSubscription = new StompSubscription(this, subscriptionId,
consumerInfo);
+        StompSubscription stompSubscription = new StompSubscription(this, subscriptionId,
consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
         stompSubscription.setDestination(actualDest);
 
         String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
@@ -380,7 +397,7 @@
         ActiveMQDestination destination = null;
         Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
         if (o != null) {
-            destination = frameTranslator.convertDestination(this, (String)o);
+            destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this,
(String)o);
         }
 
         String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
@@ -533,12 +550,12 @@
     }
 
     public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException
{
-        ActiveMQMessage msg = frameTranslator.convertFrame(this, command);
+        ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this,
command);
         return msg;
     }
 
     public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException
{
-        return frameTranslator.convertMessage(this, message);
+        return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this,
message);
     }
 
     public StompTransportFilter getTransportFilter() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java?rev=612544&r1=612543&r2=612544&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
Wed Jan 16 11:15:50 2008
@@ -48,6 +48,7 @@
         String RECEIPT_REQUESTED = "receipt";
         String TRANSACTION = "transaction";
         String CONTENT_LENGTH = "content-length";
+        String TRANSFORMATION = "transformation";
 
         public interface Response {
             String RECEIPT_ID = "receipt-id";

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=612544&r1=612543&r2=612544&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
Wed Jan 16 11:15:50 2008
@@ -49,11 +49,13 @@
 
     private String ackMode = AUTO_ACK;
     private ActiveMQDestination destination;
+    private String transformation;
 
-    public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo
consumerInfo) {
+    public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo
consumerInfo, String transformation) {
         this.protocolConverter = stompTransport;
         this.subscriptionId = subscriptionId;
         this.consumerInfo = consumerInfo;
+        this.transformation = transformation;
     }
 
     void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
@@ -68,7 +70,10 @@
             MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
             protocolConverter.getTransportFilter().sendToActiveMQ(ack);
         }
-
+        if (transformation != null) {
+       		message.setReadOnlyProperties(false);
+        	message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
+        }
         StompFrame command = protocolConverter.convertMessage(message);
 
         command.setAction(Stomp.Responses.MESSAGE);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java?rev=612544&r1=612543&r2=612544&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
Wed Jan 16 11:15:50 2008
@@ -30,7 +30,7 @@
 /**
  * The StompTransportFilter normally sits on top of a TcpTransport that has been
  * configured with the StompWireFormat and is used to convert STOMP commands to
- * ActiveMQ commands. All of the coversion work is done by delegating to the
+ * ActiveMQ commands. All of the conversion work is done by delegating to the
  * ProtocolConverter.
  * 
  * @author <a href="http://hiramchirino.com">chirino</a>

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/XStreamFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/XStreamFrameTranslator.java?rev=612544&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/XStreamFrameTranslator.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/XStreamFrameTranslator.java
Wed Jan 16 11:15:50 2008
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.HierarchicalStreamReader;
+import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
+import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
+import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
+import com.thoughtworks.xstream.io.xml.XppReader;
+
+/**
+ * Frame translator implementation that uses XStream to convert messages to and from XML
and JSON
+ * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a> 
+ */
+public class XStreamFrameTranslator extends LegacyFrameTranslator {
+
+	XStream xStream = new XStream();
+	
+	public ActiveMQMessage convertFrame(ProtocolConverter converter,
+			StompFrame command) throws JMSException, ProtocolException {
+        Map headers = command.getHeaders();
+        ActiveMQMessage msg;
+        if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
+        	msg = super.convertFrame(converter, command);
+        } else {
+        	try {
+        		ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
+        		Object obj = unmarshall(new String(command.getContent(), "UTF-8"), (String)headers.get(Stomp.Headers.TRANSFORMATION));
+        		objMsg.setObject((Serializable)obj);
+        		msg = objMsg;
+        	} catch (Throwable e) {
+        		msg = super.convertFrame(converter, command);
+            }
+        }
+        FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command,
msg, this);
+        return msg;
+	}
+
+	public StompFrame convertMessage(ProtocolConverter converter,
+			ActiveMQMessage message) throws IOException, JMSException {
+        if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE)
{
+        	StompFrame command = new StompFrame();
+            command.setAction(Stomp.Responses.MESSAGE);
+            Map<String, String> headers = new HashMap<String, String>(25);
+            command.setHeaders(headers);
+
+            FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message,
command, this);
+            ActiveMQObjectMessage msg = (ActiveMQObjectMessage)message.copy();
+            command.setContent(marshall(msg.getObject(), headers.get(Stomp.Headers.TRANSFORMATION)).getBytes("UTF-8"));
+            return command;
+
+        } else {
+        	return super.convertMessage(converter, message);
+        }
+	}
+	
+    /**
+     * Marshalls the Object to a string using XML or JSON
+     * encoding
+     */
+    protected String marshall(Serializable object, String transformation) throws JMSException
{
+        StringWriter buffer = new StringWriter();
+        HierarchicalStreamWriter out;
+        if (transformation.equalsIgnoreCase("jms-json")) {
+        	out = new JettisonMappedXmlDriver().createWriter(buffer);
+        } else {
+        	out = new PrettyPrintWriter(buffer);
+        }
+        getXStream().marshal(object, out);
+        return buffer.toString();
+    }
+    
+    /**
+     * Unmarshalls the XML or JSON encoded message to an
+     * Object
+     */
+    protected Object unmarshall(String text, String transformation) {
+    	HierarchicalStreamReader in;
+    	if (transformation.equalsIgnoreCase("jms-json")) {
+    		in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
+    	} else {
+    		in = new XppReader(new StringReader(text));
+    	}
+        return getXStream().unmarshal(in);
+    }    
+    
+    // Properties
+    // -------------------------------------------------------------------------
+    public XStream getXStream() {
+        if (xStream == null) {
+            xStream = createXStream();
+        }
+        return xStream;
+    }
+
+    public void setXStream(XStream xStream) {
+        this.xStream = xStream;
+    }
+
+    // Implementation methods
+    // -------------------------------------------------------------------------
+    protected XStream createXStream() {
+        return new XStream();
+    }    
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/XStreamFrameTranslator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/XStreamFrameTranslator.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json?rev=612544&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json
(added)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json
Wed Jan 16 11:15:50 2008
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.stomp.XStreamFrameTranslator
\ No newline at end of file

Added: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml?rev=612544&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml
(added)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml
Wed Jan 16 11:15:50 2008
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.stomp.XStreamFrameTranslator
\ No newline at end of file

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java?rev=612544&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java
Wed Jan 16 11:15:50 2008
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.Serializable;
+
+public class SamplePojo implements Serializable {
+    private String name;
+    private String city;
+
+    public SamplePojo() {
+    }
+
+    public SamplePojo(String name, String city) {
+        this.name = name;
+        this.city = city;
+    }
+
+
+    public String getCity() {
+        return city;
+    }
+
+    public void setCity(String city) {
+        this.city = city;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=612544&r1=612543&r2=612544&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Wed Jan 16 11:15:50 2008
@@ -30,19 +30,16 @@
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.security.AuthorizationPlugin;
-import org.apache.activemq.security.SimpleSecurityBrokerSystemTest;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -57,6 +54,15 @@
     private Connection connection;
     private Session session;
     private ActiveMQQueue queue;
+    private String xmlText = "<org.apache.activemq.transport.stomp.SamplePojo>\n" 
+        	+ "  <name>Dejan</name>\n" 
+        	+ "  <city>Belgrade</city>\n" 
+        	+ "</org.apache.activemq.transport.stomp.SamplePojo>";
+    
+    private String jsonText = "{\"org.apache.activemq.transport.stomp.SamplePojo\":{" 
+        + "\"name\":\"Dejan\"," 
+        + "\"city\":\"Belgrade\"" 
+        + "}}";    
     
     protected void setUp() throws Exception {
         broker = BrokerFactory.createBroker(new URI(confUri));
@@ -556,6 +562,172 @@
         assertTrue(f.startsWith("ERROR"));
 
     } 
+    
+    public void testTransformationUnknownTranslator() throws Exception {
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:test"
+ "\n\n" + "Hello World" + Stomp.NULL;
+
+        stompConnection.sendFrame(frame);
+
+        TextMessage message = (TextMessage)consumer.receive(1000);
+        assertNotNull(message);
+        assertEquals("Hello World", message.getText());  	
+    }
+    
+    public void testTransformationFailed() throws Exception {
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:jms-xml"
+ "\n\n" + "Hello World" + Stomp.NULL;
+
+        stompConnection.sendFrame(frame);
+
+        TextMessage message = (TextMessage)consumer.receive(1000);
+        assertNotNull(message);
+        assertEquals("Hello World", message.getText());  	
+    }
+    
+    public void testTransformationSendXML() throws Exception {
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+        
+        frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:jms-xml"
+ "\n\n" + xmlText + Stomp.NULL;
+
+        stompConnection.sendFrame(frame);
+
+        ObjectMessage message = (ObjectMessage)consumer.receive(1000);
+        assertNotNull(message);
+        SamplePojo object = (SamplePojo)message.getObject();
+        assertEquals("Dejan", object.getName());
+    }       
+    
+    public void testTransformationSendJSON() throws Exception {
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+        
+        frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:jms-json"
+ "\n\n" + jsonText + Stomp.NULL;
+
+        stompConnection.sendFrame(frame);
+
+        ObjectMessage message = (ObjectMessage)consumer.receive(1000);
+        assertNotNull(message);
+        SamplePojo object = (SamplePojo)message.getObject();
+        assertEquals("Dejan", object.getName());
+    }
+    
+    public void testTransformationSubscribeXML() throws Exception {
+    	
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
+        ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
+        producer.send(message);
+    	
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto"
+ "\n" + "transformation:jms-xml" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        
+        frame = stompConnection.receiveFrame();
+
+        assertTrue(frame.trim().endsWith(xmlText));
+        
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+    
+    public void testTransformationReceiveJSON() throws Exception {
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
+        ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
+        producer.send(message);
+    	
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto"
+ "\n" + "transformation:jms-json" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        
+        frame = stompConnection.receiveFrame();
+
+        assertTrue(frame.trim().endsWith(jsonText));
+        
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);    	
+    }
+    
+    public void testTransformationReceiveXML() throws Exception {
+    	
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
+        ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
+        message.setStringProperty("transformation", "jms-xml");
+        producer.send(message);
+    	
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto"
+ "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        
+        frame = stompConnection.receiveFrame();
+
+        assertTrue(frame.trim().endsWith(xmlText));
+        
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }    
+    
+    public void testTransformationNotOverrideSubscription() throws Exception {
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
+        ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
+        message.setStringProperty("transformation", "jms-xml");
+        producer.send(message);
+    	
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto"
+ "\n" + "transformation:jms-json" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        
+        frame = stompConnection.receiveFrame();
+
+        assertTrue(frame.trim().endsWith(jsonText));
+        
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);    	
+    }
     
     protected void assertClients(int expected) throws Exception {
         org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();



Mime
View raw message