activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r958103 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/ main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/ test/java/org/apache/activemq/transport/stomp/
Date Fri, 25 Jun 2010 20:23:57 GMT
Author: tabish
Date: Fri Jun 25 20:23:56 2010
New Revision: 958103

URL: http://svn.apache.org/viewvc?rev=958103&view=rev
Log:
fix for: https://issues.apache.org/activemq/browse/AMQ-2098

Added:
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-advisory-json
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-advisory-xml
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java?rev=958103&r1=958102&r2=958103&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java
Fri Jun 25 20:23:56 2010
@@ -25,24 +25,29 @@ import java.util.Map;
 
 import javax.jms.JMSException;
 
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.command.ActiveMQMapMessage;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.DataStructure;
 import org.springframework.beans.BeansException;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationContextAware;
 
+import com.sun.tools.javac.util.Log;
 import com.thoughtworks.xstream.XStream;
 import com.thoughtworks.xstream.io.HierarchicalStreamReader;
 import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
 import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
+import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
 import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
 import com.thoughtworks.xstream.io.xml.XppReader;
 
 /**
  * Frame translator implementation that uses XStream to convert messages to and
  * from XML and JSON
- * 
+ *
  * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
  */
 public class JmsFrameTranslator extends LegacyFrameTranslator implements
@@ -102,13 +107,20 @@ public class JmsFrameTranslator extends 
 
 			FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
 					converter, message, command, this);
-			ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
+
+            if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString()))
{
+            	headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString());
+            } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString()))
{
+            	headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString());
+            }
+
+            ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
 			command.setContent(marshall(msg.getObject(),
 					headers.get(Stomp.Headers.TRANSFORMATION))
 					.getBytes("UTF-8"));
 			return command;
 
-		} else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {

+		} else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
 			StompFrame command = new StompFrame();
 			command.setAction(Stomp.Responses.MESSAGE);
 			Map<String, String> headers = new HashMap<String, String>(25);
@@ -116,11 +128,39 @@ public class JmsFrameTranslator extends 
 
 			FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
 					converter, message, command, this);
+
+            if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString()))
{
+            	headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString());
+            } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString()))
{
+            	headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString());
+            }
+
 			ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
 			command.setContent(marshall((Serializable)msg.getContentMap(),
 					headers.get(Stomp.Headers.TRANSFORMATION))
 					.getBytes("UTF-8"));
-			return command;		
+			return command;
+        } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE
&&
+                AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
+
+			StompFrame command = new StompFrame();
+			command.setAction(Stomp.Responses.MESSAGE);
+			Map<String, String> headers = new HashMap<String, String>(25);
+			command.setHeaders(headers);
+
+            FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
+					converter, message, command, this);
+
+            if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString()))
{
+            	headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString());
+            } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString()))
{
+            	headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
+            }
+
+            String body = marshallAdvisory(message.getDataStructure(),
+            		headers.get(Stomp.Headers.TRANSFORMATION));
+            command.setContent(body.getBytes("UTF-8"));
+            return command;
 		} else {
 			return super.convertMessage(converter, message);
 		}
@@ -148,7 +188,7 @@ public class JmsFrameTranslator extends 
 		objMsg.setObject((Serializable) obj);
 		return objMsg;
 	}
-	
+
 	protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException
{
 		ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
 		Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
@@ -157,8 +197,23 @@ public class JmsFrameTranslator extends 
 		}
 		return mapMsg;
 	}
-	
-	
+
+    protected String marshallAdvisory(final DataStructure ds, String transformation) {
+
+		StringWriter buffer = new StringWriter();
+		HierarchicalStreamWriter out;
+		if (transformation.toLowerCase().endsWith("json")) {
+			out = new JettisonMappedXmlDriver().createWriter(buffer);
+		} else {
+			out = new PrettyPrintWriter(buffer);
+		}
+
+		XStream xstream = getXStream();
+        xstream.setMode(XStream.NO_REFERENCES);
+        xstream.aliasPackage("", "org.apache.activemq.command");
+		xstream.marshal(ds, out);
+		return buffer.toString();
+    }
 
 	// Properties
 	// -------------------------------------------------------------------------

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java?rev=958103&r1=958102&r2=958103&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
Fri Jun 25 20:23:56 2010
@@ -23,17 +23,17 @@ import java.util.Map;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTextMessage;
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.*;
 
 /**
  * Implements ActiveMQ 4.0 translations
  */
 public class LegacyFrameTranslator implements FrameTranslator {
-	
-	
+
+
     public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command)
throws JMSException, ProtocolException {
         final Map headers = command.getHeaders();
         final ActiveMQMessage msg;
@@ -77,6 +77,14 @@ public class LegacyFrameTranslator imple
 
             headers.put(Stomp.Headers.CONTENT_LENGTH, "" + data.length);
             command.setContent(data);
+        } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE
&&
+                AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
+
+            FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
+					converter, message, command, this);
+
+            String body = marshallAdvisory(message.getDataStructure());
+            command.setContent(body.getBytes("UTF-8"));
         }
         return command;
     }
@@ -92,7 +100,7 @@ public class LegacyFrameTranslator imple
         if( rc!=null ) {
         	return rc;
         }
-        
+
         StringBuffer buffer = new StringBuffer();
         if (activeMQDestination.isQueue()) {
             if (activeMQDestination.isTemporary()) {
@@ -135,4 +143,16 @@ public class LegacyFrameTranslator imple
                                         + "must begine with one of: /queue/ /topic/ /temp-queue/
/temp-topic/");
         }
     }
+
+    /**
+     * Return an Advisory message as a JSON formatted string
+     * @param ds
+     * @return
+     */
+    protected String marshallAdvisory(final DataStructure ds) {
+        XStream xstream = new XStream(new JsonHierarchicalStreamDriver());
+        xstream.setMode(XStream.NO_REFERENCES);
+        xstream.aliasPackage("", "org.apache.activemq.command");
+        return xstream.toXML(ds);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java?rev=958103&r1=958102&r2=958103&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
Fri Jun 25 20:23:56 2010
@@ -117,16 +117,24 @@ public interface Stomp {
             String MESSAGE_ID = "message-id";
         }
     }
-    
+
 	public enum Transformations {
-		JMS_BYTE, JMS_OBJECT_XML, JMS_OBJECT_JSON, JMS_MAP_XML, JMS_MAP_JSON;
-		
+		JMS_BYTE,
+		JMS_XML,
+		JMS_JSON,
+		JMS_OBJECT_XML,
+		JMS_OBJECT_JSON,
+		JMS_MAP_XML,
+		JMS_MAP_JSON,
+		JMS_ADVISORY_XML,
+		JMS_ADVISORY_JSON;
+
 		public String toString() {
 			return name().replaceAll("_", "-").toLowerCase();
 		}
-		
+
 		public static Transformations getValue(String value) {
 			return valueOf(value.replaceAll("-", "_").toUpperCase());
 		}
-	}    
+	}
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=958103&r1=958102&r2=958103&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
Fri Jun 25 20:23:56 2010
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
 
 import javax.jms.JMSException;
 
+import org.apache.activemq.command.ActiveMQBytesMessage;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ConsumerInfo;
@@ -80,7 +81,7 @@ public class StompSubscription {
 
         boolean ignoreTransformation = false;
 
-        if (transformation != null) {
+        if (transformation != null && !( message instanceof ActiveMQBytesMessage
) ) {
        		message.setReadOnlyProperties(false);
         	message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
         } else {

Added: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-advisory-json
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-advisory-json?rev=958103&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-advisory-json
(added)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-advisory-json
Fri Jun 25 20:23:56 2010
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.stomp.JmsFrameTranslator
\ No newline at end of file

Added: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-advisory-xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-advisory-xml?rev=958103&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-advisory-xml
(added)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-advisory-xml
Fri Jun 25 20:23:56 2010
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.stomp.JmsFrameTranslator
\ No newline at end of file

Added: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json?rev=958103&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json
(added)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-json
Fri Jun 25 20:23:56 2010
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.stomp.JmsFrameTranslator
\ No newline at end of file

Added: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml?rev=958103&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml
(added)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml
Fri Jun 25 20:23:56 2010
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.stomp.JmsFrameTranslator
\ No newline at end of file

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java?rev=958103&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java
Fri Jun 25 20:23:56 2010
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.stomp;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTestSupport;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.URISupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.*;
+import java.io.File;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @version $Revision: 1461 $
+ */
+public class StompAdvisoryTest extends TestCase {
+    private static final Log LOG = LogFactory.getLog(StompAdvisoryTest.class);
+
+    protected ConnectionFactory factory;
+    protected ActiveMQConnection connection;
+    protected BrokerService broker;
+
+    StompConnection stompConnection;
+    URI tcpBrokerUri;
+    URI stompBrokerUri;
+
+    private PolicyEntry createPolicyEntry() {
+        PolicyEntry policy = new PolicyEntry();
+        policy.setAdvisdoryForFastProducers(true);
+        policy.setAdvisoryForConsumed(true);
+        policy.setAdvisoryForDelivery(true);
+        policy.setAdvisoryForDiscardingMessages(true);
+        policy.setAdvisoryForSlowConsumers(true);
+        policy.setAdvisoryWhenFull(true);
+        policy.setProducerFlowControl(false);
+
+        ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy();
+        strategy.setLimit(10);
+        policy.setPendingMessageLimitStrategy(strategy);
+        return policy;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = BrokerFactory.createBroker(new URI("broker://()/localhost?useJmx=false"));
+
+        broker.setPersistent(false);
+        PolicyEntry policy = new PolicyEntry();
+        policy.setAdvisdoryForFastProducers(true);
+        policy.setAdvisoryForConsumed(true);
+        policy.setAdvisoryForDelivery(true);
+        policy.setAdvisoryForDiscardingMessages(true);
+        policy.setAdvisoryForSlowConsumers(true);
+        policy.setAdvisoryWhenFull(true);
+        policy.setProducerFlowControl(false);
+        ConstantPendingMessageLimitStrategy strategy  = new ConstantPendingMessageLimitStrategy();
+        strategy.setLimit(10);
+        policy.setPendingMessageLimitStrategy(strategy);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.addConnector("tcp://localhost:0");
+        broker.addConnector("stomp://localhost:0");
+        return broker;
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        if (System.getProperty("basedir") == null) {
+            File file = new File(".");
+            System.setProperty("basedir", file.getAbsolutePath());
+        }
+
+        broker = createBroker();
+        broker.start();
+
+        tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
+        stompBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(1).getConnectUri());
+        LOG.info("Producing using TCP uri: " + tcpBrokerUri);
+        LOG.info("consuming using STOMP uri: " + stompBrokerUri);
+
+        stompConnection = new StompConnection();
+        stompConnection.open(new Socket("localhost", stompBrokerUri.getPort()));
+
+    }
+
+    protected void tearDown() throws Exception {
+        stompConnection.disconnect();
+        stompConnection.close();
+
+    }
+
+    public void testConnectionAdvisory() throws Exception {
+
+        Destination dest = new ActiveMQQueue("testConnectionAdvisory");
+
+        stompConnection.connect("system", "manager");
+        stompConnection.subscribe("/topic/ActiveMQ.Advisory.Connection", Stomp.Headers.Subscribe.AckModeValues.AUTO);
+
+        // Now connect via openwire and check we get the advisory
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
+        Connection c = factory.createConnection();
+        c.start();
+
+        StompFrame f = stompConnection.receive();
+        LOG.debug(f);
+        assertEquals(f.getAction(),"MESSAGE");
+        assertTrue("Should have a body", f.getBody().length() > 0);
+        assertTrue(f.getBody().startsWith("{\"ConnectionInfo\":"));
+        Map<String,String> headers = f.getHeaders();
+
+        c.stop();
+        c.close();
+
+        f = stompConnection.receive();
+        LOG.debug(f);
+        assertEquals(f.getAction(),"MESSAGE");
+        assertNotNull("Body is not null", f.getBody());
+        assertTrue("Body should have content", f.getBody().length() > 0);
+        assertTrue(f.getBody().startsWith("{\"ConnectionInfo\":"));
+    }
+
+    public void testConnectionAdvisoryJSON() throws Exception {
+
+        Destination dest = new ActiveMQQueue("testConnectionAdvisory");
+
+        HashMap<String, String> subheaders = new HashMap<String, String>(1);
+        subheaders.put("transformation", Stomp.Transformations.JMS_JSON.toString());
+
+        stompConnection.connect("system", "manager");
+        stompConnection.subscribe("/topic/ActiveMQ.Advisory.Connection",
+        		Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders);
+
+        // Now connect via openwire and check we get the advisory
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
+        Connection c = factory.createConnection();
+        c.start();
+
+        StompFrame f = stompConnection.receive();
+        LOG.debug(f);
+        assertEquals(f.getAction(),"MESSAGE");
+        assertTrue("Should have a body", f.getBody().length() > 0);
+        assertTrue(f.getBody().startsWith("{\"ConnectionInfo\":"));
+        Map<String,String> headers = f.getHeaders();
+
+        c.stop();
+        c.close();
+
+        f = stompConnection.receive();
+        LOG.debug(f);
+        assertEquals(f.getAction(),"MESSAGE");
+        assertNotNull("Body is not null", f.getBody());
+        assertTrue("Body should have content", f.getBody().length() > 0);
+        assertTrue(f.getBody().startsWith("{\"ConnectionInfo\":"));
+    }
+
+    public void testConnectionAdvisoryXML() throws Exception {
+
+        Destination dest = new ActiveMQQueue("testConnectionAdvisory");
+
+        HashMap<String, String> subheaders = new HashMap<String, String>(1);
+        subheaders.put("transformation", Stomp.Transformations.JMS_XML.toString());
+
+        stompConnection.connect("system", "manager");
+        stompConnection.subscribe("/topic/ActiveMQ.Advisory.Connection",
+        		Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders);
+
+        // Now connect via openwire and check we get the advisory
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
+        Connection c = factory.createConnection();
+        c.start();
+
+        StompFrame f = stompConnection.receive();
+        LOG.debug(f);
+        assertEquals(f.getAction(),"MESSAGE");
+        assertTrue("Should have a body", f.getBody().length() > 0);
+        assertTrue(f.getBody().startsWith("<ConnectionInfo>"));
+        Map<String,String> headers = f.getHeaders();
+
+        c.stop();
+        c.close();
+
+        f = stompConnection.receive();
+        LOG.debug(f);
+        assertEquals(f.getAction(),"MESSAGE");
+        assertNotNull("Body is not null", f.getBody());
+        assertTrue("Body should have content", f.getBody().length() > 0);
+        assertTrue(f.getBody().startsWith("<ConnectionInfo>"));
+    }
+
+    public void testConsumerAdvisory() throws Exception {
+
+        Destination dest = new ActiveMQQueue("testConsumerAdvisory");
+
+        stompConnection.connect("system", "manager");
+        stompConnection.subscribe("/topic/ActiveMQ.Advisory.Consumer.>", Stomp.Headers.Subscribe.AckModeValues.AUTO);
+
+        // Now connect via openwire and check we get the advisory
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
+        Connection c = factory.createConnection();
+        c.start();
+
+        Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(dest);
+
+        StompFrame f = stompConnection.receive();
+        LOG.debug(f);
+        assertEquals(f.getAction(),"MESSAGE");
+        assertTrue("Should have a body", f.getBody().length() > 0);
+        assertTrue(f.getBody().startsWith("{\"ConsumerInfo\":"));
+
+        c.stop();
+        c.close();
+    }
+
+    public void testProducerAdvisory() throws Exception {
+
+        Destination dest = new ActiveMQQueue("testProducerAdvisory");
+
+        stompConnection.connect("system", "manager");
+        stompConnection.subscribe("/topic/ActiveMQ.Advisory.Producer.>", Stomp.Headers.Subscribe.AckModeValues.AUTO);
+
+        // Now connect via openwire and check we get the advisory
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
+        Connection c = factory.createConnection();
+        c.start();
+
+        Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(dest);
+        Message mess = session.createTextMessage("test");
+        producer.send(mess);
+
+        StompFrame f = stompConnection.receive();
+        LOG.debug(f);
+        assertEquals(f.getAction(),"MESSAGE");
+        assertTrue("Should have a body", f.getBody().length() > 0);
+        assertTrue(f.getBody().startsWith("{\"ProducerInfo\":"));
+
+        c.stop();
+        c.close();
+    }
+
+    public void testProducerAdvisoryXML() throws Exception {
+
+        Destination dest = new ActiveMQQueue("testProducerAdvisoryXML");
+
+        HashMap<String, String> subheaders = new HashMap<String, String>(1);
+        subheaders.put("transformation", Stomp.Transformations.JMS_ADVISORY_XML.toString());
+
+        stompConnection.connect("system", "manager");
+        stompConnection.subscribe("/topic/ActiveMQ.Advisory.Producer.>",
+        		Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders);
+
+        // Now connect via openwire and check we get the advisory
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
+        Connection c = factory.createConnection();
+        c.start();
+
+        Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(dest);
+        Message mess = session.createTextMessage("test");
+        producer.send(mess);
+
+        StompFrame f = stompConnection.receive();
+        LOG.debug(f);
+        assertEquals(f.getAction(),"MESSAGE");
+        assertTrue("Should have a body", f.getBody().length() > 0);
+        assertTrue(f.getBody().startsWith("<ProducerInfo>"));
+
+        c.stop();
+        c.close();
+    }
+
+    public void testProducerAdvisoryJSON() throws Exception {
+
+        Destination dest = new ActiveMQQueue("testProducerAdvisoryJSON");
+
+        HashMap<String, String> subheaders = new HashMap<String, String>(1);
+        subheaders.put("transformation", Stomp.Transformations.JMS_ADVISORY_JSON.toString());
+
+        stompConnection.connect("system", "manager");
+        stompConnection.subscribe("/topic/ActiveMQ.Advisory.Producer.>",
+        		Stomp.Headers.Subscribe.AckModeValues.AUTO, subheaders);
+
+        // Now connect via openwire and check we get the advisory
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
+        Connection c = factory.createConnection();
+        c.start();
+
+        Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(dest);
+        Message mess = session.createTextMessage("test");
+        producer.send(mess);
+
+        StompFrame f = stompConnection.receive();
+        LOG.debug(f);
+        assertEquals(f.getAction(),"MESSAGE");
+        assertTrue("Should have a body", f.getBody().length() > 0);
+        assertTrue(f.getBody().startsWith("{\"ProducerInfo\":"));
+
+        c.stop();
+        c.close();
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=958103&r1=958102&r2=958103&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Fri Jun 25 20:23:56 2010
@@ -245,7 +245,7 @@ public class StompTest extends Combinati
         assertEquals("foo", "abc", message.getStringProperty("foo"));
         assertEquals("bar", "123", message.getStringProperty("bar"));
     }
-    
+
     public void testSendMessageWithDelay() throws Exception {
 
         MessageConsumer consumer = session.createConsumer(queue);
@@ -823,6 +823,165 @@ public class StompTest extends Combinati
         stompConnection.sendFrame(frame);
     }
 
+    public void testTransformationReceiveObject() throws Exception {
+
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
+        ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
+        producer.send(message);
+
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto"
+ "\n" + "transformation:"	+ Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+
+        assertTrue(frame.trim().endsWith(xmlObject));
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    public void testTransformationReceiveXMLObjectAndMap() throws Exception {
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
+        ObjectMessage objMessage = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
+        producer.send(objMessage);
+
+        MapMessage mapMessage = session.createMapMessage();
+        mapMessage.setString("name", "Dejan");
+        mapMessage.setString("city", "Belgrade");
+        producer.send(mapMessage);
+
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto"
+ "\n" + "transformation:"	+ Stomp.Transformations.JMS_XML + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+
+        assertTrue(frame.trim().endsWith(xmlObject));
+
+        frame = stompConnection.receiveFrame();
+
+        assertTrue(frame.trim().endsWith(xmlMap.trim()));
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    public void testTransformationReceiveJSONObjectAndMap() throws Exception {
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
+        ObjectMessage objMessage = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
+        producer.send(objMessage);
+
+        MapMessage mapMessage = session.createMapMessage();
+        mapMessage.setString("name", "Dejan");
+        mapMessage.setString("city", "Belgrade");
+        producer.send(mapMessage);
+
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto"
+ "\n" + "transformation:"	+ Stomp.Transformations.JMS_JSON + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+
+        assertTrue(frame.trim().endsWith(jsonObject));
+
+        frame = stompConnection.receiveFrame();
+
+        assertTrue(frame.trim().endsWith(jsonMap.trim()));
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    public void testTransformationSendAndReceiveXmlMap() throws Exception {
+
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto"
+ "\n" + "transformation:" + Stomp.Transformations.JMS_XML + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:"
+ Stomp.Transformations.JMS_MAP_JSON + "\n\n" + jsonMap + Stomp.NULL;
+
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+
+        assertNotNull(frame);
+        assertTrue(frame.trim().endsWith(xmlMap.trim()));
+        assertTrue(frame.contains("jms-map-xml"));
+    }
+
+    public void testTransformationSendAndReceiveJsonMap() throws Exception {
+
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto"
+ "\n" + "transformation:" + Stomp.Transformations.JMS_JSON + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:"
+ Stomp.Transformations.JMS_MAP_XML + "\n\n" + xmlMap + Stomp.NULL;
+
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+
+        assertNotNull(frame);
+        assertTrue(frame.trim().endsWith(jsonMap.trim()));
+        assertTrue(frame.contains("jms-map-json"));
+    }
+
+    public void testTransformationReceiveBytesMessage() throws Exception {
+
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
+        BytesMessage message = session.createBytesMessage();
+        message.writeBytes(new byte[]{1, 2, 3, 4, 5});
+        producer.send(message);
+
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto"
+ "\n" + "transformation:"	+ Stomp.Transformations.JMS_XML + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("MESSAGE"));
+
+        Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
+        Matcher clMmatcher = cl.matcher(frame);
+        assertTrue(clMmatcher.find());
+        assertEquals("5", clMmatcher.group(1));
+
+        assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
     public void testTransformationNotOverrideSubscription() throws Exception {
         MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
         ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));



Mime
View raw message