activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1187001 - in /activemq/trunk/activemq-optional/src: main/java/org/apache/activemq/transport/util/ main/java/org/apache/activemq/transport/xstream/ test/java/org/apache/activemq/transport/http/
Date Thu, 20 Oct 2011 19:21:55 GMT
Author: tabish
Date: Thu Oct 20 19:21:55 2011
New Revision: 1187001

URL: http://svn.apache.org/viewvc?rev=1187001&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3556

Modified:
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/util/TextWireFormat.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/xstream/XStreamWireFormat.java
    activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpSendCompressedMessagesTest.java

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/util/TextWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/util/TextWireFormat.java?rev=1187001&r1=1187000&r2=1187001&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/util/TextWireFormat.java
(original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/util/TextWireFormat.java
Thu Oct 20 19:21:55 2011
@@ -22,6 +22,7 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.Reader;
+
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
@@ -29,16 +30,16 @@ import org.apache.activemq.wireformat.Wi
 
 /**
  * Adds the extra methods available to text based wire format implementations
- * 
- * 
+ *
+ *
  */
 public abstract class TextWireFormat implements WireFormat {
 
-    public abstract Object unmarshalText(String text);
+    public abstract Object unmarshalText(String text) throws IOException;
 
-    public abstract Object unmarshalText(Reader reader);
+    public abstract Object unmarshalText(Reader reader) throws IOException;
 
-    public abstract String marshalText(Object command);
+    public abstract String marshalText(Object command) throws IOException;
 
     public void marshal(Object command, DataOutput out) throws IOException {
         String text = marshalText(command);

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/xstream/XStreamWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/xstream/XStreamWireFormat.java?rev=1187001&r1=1187000&r2=1187001&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/xstream/XStreamWireFormat.java
(original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/xstream/XStreamWireFormat.java
Thu Oct 20 19:21:55 2011
@@ -16,11 +16,14 @@
  */
 package org.apache.activemq.transport.xstream;
 
+import java.io.IOException;
 import java.io.Reader;
 
 import com.thoughtworks.xstream.XStream;
 
 import org.apache.activemq.command.Command;
+import org.apache.activemq.command.MarshallAware;
+import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.transport.util.TextWireFormat;
 import org.apache.activemq.wireformat.WireFormat;
 
@@ -28,8 +31,8 @@ import org.apache.activemq.wireformat.Wi
  * A {@link WireFormat} implementation which uses the <a
  * href="http://xstream.codehaus.org/>XStream</a> library to marshall commands
  * onto the wire
- * 
- * 
+ *
+ *
  */
 public class XStreamWireFormat extends TextWireFormat {
     private XStream xStream;
@@ -55,13 +58,22 @@ public class XStreamWireFormat extends T
         return (Command)getXStream().fromXML(reader);
     }
 
-    public String marshalText(Object command) {
+    public String marshalText(Object command) throws IOException {
+        if (command instanceof MarshallAware) {
+            ((MarshallAware)command).beforeMarshall(this);
+        } else if(command instanceof MessageDispatch) {
+            MessageDispatch dispatch = (MessageDispatch) command;
+            if (dispatch != null && dispatch.getMessage() != null) {
+                dispatch.getMessage().beforeMarshall(this);
+            }
+        }
+
         return getXStream().toXML(command);
     }
 
     /**
      * Can this wireformat process packets of this version
-     * 
+     *
      * @param version the version number to test
      * @return true if can accept the version
      */

Modified: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpSendCompressedMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpSendCompressedMessagesTest.java?rev=1187001&r1=1187000&r2=1187001&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpSendCompressedMessagesTest.java
(original)
+++ activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpSendCompressedMessagesTest.java
Thu Oct 20 19:21:55 2011
@@ -63,76 +63,19 @@ public class HttpSendCompressedMessagesT
 
     private static final String destinationName = "HttpCompressionTopic";
 
-    private void sendTextMessage(boolean compressed) throws Exception {
-
-        StringBuilder builder = new StringBuilder();
-        for(int i = 0; i < 10; ++i) {
-            builder.append(UUID.randomUUID().toString());
-        }
-
-        ActiveMQConnection connection = (ActiveMQConnection) tcpConnectionFactory.createConnection();
-        connection.setUseCompression(compressed);
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Topic destination = session.createTopic(destinationName);
-        MessageProducer producer = session.createProducer(destination);
-        producer.send(session.createTextMessage(builder.toString()));
-    }
-
-    private void sendBytesMessage(boolean compressed) throws Exception {
-
-        StringBuilder builder = new StringBuilder();
-        for(int i = 0; i < 10; ++i) {
-            builder.append(UUID.randomUUID().toString());
-        }
-
-        ActiveMQConnection connection = (ActiveMQConnection) tcpConnectionFactory.createConnection();
-        connection.setUseCompression(compressed);
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Topic destination = session.createTopic(destinationName);
-        MessageProducer producer = session.createProducer(destination);
-        BytesMessage message = session.createBytesMessage();
-        message.writeUTF(builder.toString());
-        producer.send(message);
-    }
-
-    private void sendStreamMessage(boolean compressed) throws Exception {
-
-        StringBuilder builder = new StringBuilder();
-        for(int i = 0; i < 10; ++i) {
-            builder.append(UUID.randomUUID().toString());
-        }
-
-        ActiveMQConnection connection = (ActiveMQConnection) tcpConnectionFactory.createConnection();
-        connection.setUseCompression(compressed);
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Topic destination = session.createTopic(destinationName);
-        MessageProducer producer = session.createProducer(destination);
-        StreamMessage message = session.createStreamMessage();
-        message.writeString(builder.toString());
-        producer.send(message);
-    }
-
-    private void sendMapMessage(boolean compressed) throws Exception {
-
-        StringBuilder builder = new StringBuilder();
-        for(int i = 0; i < 10; ++i) {
-            builder.append(UUID.randomUUID().toString());
-        }
-
-        ActiveMQConnection connection = (ActiveMQConnection) tcpConnectionFactory.createConnection();
-        connection.setUseCompression(compressed);
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Topic destination = session.createTopic(destinationName);
-        MessageProducer producer = session.createProducer(destination);
-        MapMessage message = session.createMapMessage();
-        message.setString("content", builder.toString());
-        producer.send(message);
+    @Test
+    public void testTextMessageCompressionFromTcp() throws Exception {
+        sendTextMessage(true);
+        doTestTextMessageCompression();
     }
 
     @Test
-    public void testTextMessageCompression() throws Exception {
-        sendTextMessage(true);
+    public void testTextMessageCompressionFromHttp() throws Exception {
+        sendTextMessage(httpConnectionFactory, true);
+        doTestTextMessageCompression();
+    }
 
+    private void doTestTextMessageCompression() throws Exception {
         ActiveMQTextMessage tcpMessage = (ActiveMQTextMessage) tcpConsumer.receive(TimeUnit.SECONDS.toMillis(3));
         ActiveMQTextMessage httpMessage = (ActiveMQTextMessage) httpConsumer.receive(TimeUnit.SECONDS.toMillis(3));
 
@@ -168,9 +111,18 @@ public class HttpSendCompressedMessagesT
     }
 
     @Test
-    public void testBytesMessageCompression() throws Exception {
+    public void testBytesMessageCompressionFromTcp() throws Exception {
         sendBytesMessage(true);
+        doTestBytesMessageCompression();
+    }
 
+    @Test
+    public void testBytesMessageCompressionFromHttp() throws Exception {
+        sendBytesMessage(httpConnectionFactory, true);
+        doTestBytesMessageCompression();
+    }
+
+    private void doTestBytesMessageCompression() throws Exception {
         ActiveMQBytesMessage tcpMessage = (ActiveMQBytesMessage) tcpConsumer.receive(TimeUnit.SECONDS.toMillis(3));
         ActiveMQBytesMessage httpMessage = (ActiveMQBytesMessage) httpConsumer.receive(TimeUnit.SECONDS.toMillis(3));
 
@@ -206,9 +158,18 @@ public class HttpSendCompressedMessagesT
     }
 
     @Test
-    public void testStreamMessageCompression() throws Exception {
+    public void testStreamMessageCompressionFromTcp() throws Exception {
         sendStreamMessage(true);
+        doTestStreamMessageCompression();
+    }
 
+    @Test
+    public void testStreamMessageCompressionFromHttp() throws Exception {
+        sendStreamMessage(httpConnectionFactory, true);
+        doTestStreamMessageCompression();
+    }
+
+    private void doTestStreamMessageCompression() throws Exception {
         ActiveMQStreamMessage tcpMessage = (ActiveMQStreamMessage) tcpConsumer.receive(TimeUnit.SECONDS.toMillis(3));
         ActiveMQStreamMessage httpMessage = (ActiveMQStreamMessage) httpConsumer.receive(TimeUnit.SECONDS.toMillis(3));
 
@@ -244,9 +205,18 @@ public class HttpSendCompressedMessagesT
     }
 
     @Test
-    public void testMapMessageCompression() throws Exception {
+    public void testMapMessageCompressionFromTcp() throws Exception {
         sendMapMessage(true);
+        doTestMapMessageCompression();
+    }
 
+    @Test
+    public void testMapMessageCompressionFromHttp() throws Exception {
+        sendMapMessage(httpConnectionFactory, true);
+        doTestMapMessageCompression();
+    }
+
+    private void doTestMapMessageCompression() throws Exception {
         ActiveMQMapMessage tcpMessage = (ActiveMQMapMessage) tcpConsumer.receive(TimeUnit.SECONDS.toMillis(3));
         ActiveMQMapMessage httpMessage = (ActiveMQMapMessage) httpConsumer.receive(TimeUnit.SECONDS.toMillis(3));
 
@@ -317,4 +287,86 @@ public class HttpSendCompressedMessagesT
             broker.waitUntilStopped();
         }
     }
+
+    private void sendTextMessage(boolean compressed) throws Exception {
+        sendTextMessage(tcpConnectionFactory, compressed);
+    }
+
+    private void sendTextMessage(ActiveMQConnectionFactory factory, boolean compressed) throws
Exception {
+
+        StringBuilder builder = new StringBuilder();
+        for(int i = 0; i < 10; ++i) {
+            builder.append(UUID.randomUUID().toString());
+        }
+
+        ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+        connection.setUseCompression(compressed);
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic destination = session.createTopic(destinationName);
+        MessageProducer producer = session.createProducer(destination);
+        producer.send(session.createTextMessage(builder.toString()));
+    }
+
+    private void sendBytesMessage(boolean compressed) throws Exception {
+        sendBytesMessage(tcpConnectionFactory, compressed);
+    }
+
+    private void sendBytesMessage(ActiveMQConnectionFactory factory, boolean compressed)
throws Exception {
+
+        StringBuilder builder = new StringBuilder();
+        for(int i = 0; i < 10; ++i) {
+            builder.append(UUID.randomUUID().toString());
+        }
+
+        ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+        connection.setUseCompression(compressed);
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic destination = session.createTopic(destinationName);
+        MessageProducer producer = session.createProducer(destination);
+        BytesMessage message = session.createBytesMessage();
+        message.writeUTF(builder.toString());
+        producer.send(message);
+    }
+
+    private void sendStreamMessage(boolean compressed) throws Exception {
+        sendStreamMessage(tcpConnectionFactory, compressed);
+    }
+
+    private void sendStreamMessage(ActiveMQConnectionFactory factory, boolean compressed)
throws Exception {
+
+        StringBuilder builder = new StringBuilder();
+        for(int i = 0; i < 10; ++i) {
+            builder.append(UUID.randomUUID().toString());
+        }
+
+        ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+        connection.setUseCompression(compressed);
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic destination = session.createTopic(destinationName);
+        MessageProducer producer = session.createProducer(destination);
+        StreamMessage message = session.createStreamMessage();
+        message.writeString(builder.toString());
+        producer.send(message);
+    }
+
+    private void sendMapMessage(boolean compressed) throws Exception {
+        sendMapMessage(tcpConnectionFactory, compressed);
+    }
+
+    private void sendMapMessage(ActiveMQConnectionFactory factory, boolean compressed) throws
Exception {
+
+        StringBuilder builder = new StringBuilder();
+        for(int i = 0; i < 10; ++i) {
+            builder.append(UUID.randomUUID().toString());
+        }
+
+        ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+        connection.setUseCompression(compressed);
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic destination = session.createTopic(destinationName);
+        MessageProducer producer = session.createProducer(destination);
+        MapMessage message = session.createMapMessage();
+        message.setString("content", builder.toString());
+        producer.send(message);
+    }
 }



Mime
View raw message