activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1188839 - in /activemq/trunk/activemq-optional/src: main/java/org/apache/activemq/transport/http/ test/java/org/apache/activemq/transport/http/
Date Tue, 25 Oct 2011 18:03:38 GMT
Author: tabish
Date: Tue Oct 25 18:03:37 2011
New Revision: 1188839

URL: http://svn.apache.org/viewvc?rev=1188839&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3566

Added:
    activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpJMSMessagesWithCompressionTest.java
  (with props)
    activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpJmsSendAndReceiveWithCompressionTest.java
  (with props)
Modified:
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
    activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpSendCompressedMessagesTest.java

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java?rev=1188839&r1=1188838&r2=1188839&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
(original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
Tue Oct 25 18:03:37 2011
@@ -20,22 +20,30 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.URI;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 
 import org.apache.activemq.command.ShutdownInfo;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.util.TextWireFormat;
+import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.ServiceStopper;
+import org.apache.http.Header;
 import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpRequestInterceptor;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.HttpClient;
+import org.apache.http.client.HttpResponseException;
 import org.apache.http.client.ResponseHandler;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpOptions;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.conn.params.ConnRoutePNames;
 import org.apache.http.entity.ByteArrayEntity;
@@ -45,6 +53,7 @@ import org.apache.http.impl.conn.tsccm.T
 import org.apache.http.message.AbstractHttpMessage;
 import org.apache.http.params.HttpConnectionParams;
 import org.apache.http.params.HttpParams;
+import org.apache.http.protocol.HttpContext;
 import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,6 +79,10 @@ public class HttpClientTransport extends
 
     private int soTimeout = MAX_CLIENT_TIMEOUT;
 
+    private boolean useCompression = false;
+    private boolean canSendCompressed = false;
+    private int minSendAsCompressedSize = 0;
+
     public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) {
         super(wireFormat, remoteUrl);
     }
@@ -87,6 +100,17 @@ public class HttpClientTransport extends
         configureMethod(httpMethod);
         String data = getTextWireFormat().marshalText(command);
         byte[] bytes = data.getBytes("UTF-8");
+        if (useCompression && canSendCompressed && bytes.length > minSendAsCompressedSize)
{
+            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+            GZIPOutputStream stream = new GZIPOutputStream(bytesOut);
+            stream.write(bytes);
+            stream.close();
+            httpMethod.addHeader("Content-Type", "application/x-gzip");
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Sending compressed, size = " + bytes.length + ", compressed size
= " + bytesOut.size());
+            }
+            bytes = bytesOut.toByteArray();
+        }
         ByteArrayEntity entity = new ByteArrayEntity(bytes);
         httpMethod.setEntity(entity);
 
@@ -121,9 +145,20 @@ public class HttpClientTransport extends
         return null;
     }
 
+    private DataInputStream createDataInputStream(HttpResponse answer) throws IOException
{
+        Header encoding = answer.getEntity().getContentEncoding();
+        if (encoding != null && "gzip".equalsIgnoreCase(encoding.getValue())) {
+            return new DataInputStream(new GZIPInputStream(answer.getEntity().getContent()));
+        } else {
+            return new DataInputStream(answer.getEntity().getContent());
+        }
+    }
+
     public void run() {
 
-        LOG.trace("HTTP GET consumer thread starting: " + this);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("HTTP GET consumer thread starting: " + this);
+        }
         HttpClient httpClient = getReceiveHttpClient();
         URI remoteUrl = getRemoteUrl();
 
@@ -151,7 +186,7 @@ public class HttpClientTransport extends
                     }
                 } else {
                     receiveCounter++;
-                    DataInputStream stream = new DataInputStream(answer.getEntity().getContent());
+                    DataInputStream stream = createDataInputStream(answer);
                     Object command = (Object)getTextWireFormat().unmarshal(stream);
                     if (command == null) {
                         LOG.debug("Received null command from url: " + remoteUrl);
@@ -202,15 +237,40 @@ public class HttpClientTransport extends
     // -------------------------------------------------------------------------
     protected void doStart() throws Exception {
 
-        LOG.trace("HTTP GET consumer thread starting: " + this);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("HTTP GET consumer thread starting: " + this);
+        }
         HttpClient httpClient = getReceiveHttpClient();
         URI remoteUrl = getRemoteUrl();
 
         HttpHead httpMethod = new HttpHead(remoteUrl.toString());
         configureMethod(httpMethod);
-        ResponseHandler<String> handler = new BasicResponseHandler();
+
+        // Request the options from the server so we can find out if the broker we are
+        // talking to supports GZip compressed content.  If so and useCompression is on
+        // then we can compress our POST data, otherwise we must send it uncompressed to
+        // ensure backwards compatibility.
+        HttpOptions optionsMethod = new HttpOptions(remoteUrl.toString());
+        ResponseHandler<String> handler = new BasicResponseHandler() {
+            @Override
+            public String handleResponse(HttpResponse response) throws HttpResponseException,
IOException {
+
+                for(Header header : response.getAllHeaders()) {
+                    if (header.getName().equals("Accepts-Encoding") && header.getValue().contains("gzip"))
{
+                        LOG.info("Broker Servlet supports GZip compression.");
+                        canSendCompressed = true;
+                        break;
+                    }
+                }
+
+                return super.handleResponse(response);
+            }
+        };
+
+
         try {
-            httpClient.execute(httpMethod, handler);
+            httpClient.execute(httpMethod, new BasicResponseHandler());
+            httpClient.execute(optionsMethod, handler);
         } catch(Exception e) {
             throw new IOException("Failed to perform GET on: " + remoteUrl + " as response
was: " + e.getMessage());
         }
@@ -226,6 +286,15 @@ public class HttpClientTransport extends
 
     protected HttpClient createHttpClient() {
         DefaultHttpClient client = new DefaultHttpClient(new ThreadSafeClientConnManager());
+        if (useCompression) {
+            client.addRequestInterceptor( new HttpRequestInterceptor() {
+                @Override
+                public void process(HttpRequest request, HttpContext context) {
+                    // We expect to received a compression response that we un-gzip
+                    request.addHeader("Accept-Encoding", "gzip");
+                }
+            });
+        }
         if (getProxyHost() != null) {
             HttpHost proxy = new HttpHost(getProxyHost(), getProxyPort());
             client.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy);
@@ -262,4 +331,30 @@ public class HttpClientTransport extends
     public void setSoTimeout(int soTimeout) {
         this.soTimeout = soTimeout;
     }
+
+    public void setUseCompression(boolean useCompression) {
+        this.useCompression = useCompression;
+    }
+
+    public boolean isUseCompression() {
+        return this.useCompression;
+    }
+
+    public int getMinSendAsCompressedSize() {
+        return minSendAsCompressedSize;
+    }
+
+    /**
+     * Sets the minimum size that must be exceeded on a send before compression is used if
+     * the useCompression option is specified.  For very small payloads compression can be
+     * inefficient compared to the transmission size savings.
+     *
+     * Default value is 0.
+     *
+     * @param minSendAsCompressedSize
+     */
+    public void setMinSendAsCompressedSize(int minSendAsCompressedSize) {
+        this.minSendAsCompressedSize = minSendAsCompressedSize;
+    }
+
 }

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java?rev=1188839&r1=1188838&r2=1188839&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
(original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
Tue Oct 25 18:03:37 2011
@@ -26,6 +26,7 @@ import org.apache.activemq.transport.xst
 import org.apache.activemq.util.ServiceStopper;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.GzipHandler;
 import org.eclipse.jetty.server.nio.SelectChannelConnector;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 
@@ -88,6 +89,10 @@ public class HttpTransportServer extends
         contextHandler.setAttribute("wireFormat", getWireFormat());
         contextHandler.setAttribute("transportFactory", transportFactory);
         contextHandler.setAttribute("transportOptions", transportOptions);
+
+        GzipHandler gzipHandler = new GzipHandler();
+        contextHandler.setHandler(gzipHandler);
+
         server.start();
     }
 

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java?rev=1188839&r1=1188838&r2=1188839&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
(original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
Tue Oct 25 18:03:37 2011
@@ -19,14 +19,16 @@ package org.apache.activemq.transport.ht
 import java.io.BufferedReader;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.HashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPInputStream;
+
 import javax.servlet.ServletException;
-import javax.servlet.ServletInputStream;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -78,6 +80,12 @@ public class HttpTunnelServlet extends H
     }
 
     @Override
+    protected void doOptions(HttpServletRequest request, HttpServletResponse response) throws
ServletException, IOException {
+        response.addHeader("Accepts-Encoding", "gzip");
+        super.doOptions(request, response);
+    }
+
+    @Override
     protected void doHead(HttpServletRequest request, HttpServletResponse response) throws
ServletException, IOException {
         createTransportChannel(request, response);
     }
@@ -107,12 +115,16 @@ public class HttpTunnelServlet extends H
     }
 
     @Override
-    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws
ServletException,
-            IOException {
+    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws
ServletException, IOException {
+
+        InputStream stream = request.getInputStream();
+        String contentType = request.getContentType();
+        if (contentType != null && contentType.equals("application/x-gzip")) {
+            stream = new GZIPInputStream(stream);
+        }
 
         // Read the command directly from the reader, assuming UTF8 encoding
-        ServletInputStream sis = request.getInputStream();
-        Command command = (Command) wireFormat.unmarshalText(new InputStreamReader(sis, "UTF-8"));
+        Command command = (Command) wireFormat.unmarshalText(new InputStreamReader(stream,
"UTF-8"));
 
         if (command instanceof WireFormatInfo) {
             WireFormatInfo info = (WireFormatInfo) command;

Added: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpJMSMessagesWithCompressionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpJMSMessagesWithCompressionTest.java?rev=1188839&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpJMSMessagesWithCompressionTest.java
(added)
+++ activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpJMSMessagesWithCompressionTest.java
Tue Oct 25 18:03:37 2011
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.http;
+
+import java.net.URISyntaxException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HttpJMSMessagesWithCompressionTest {
+
+    private static final AtomicInteger counter = new AtomicInteger(1);
+
+    enum DESTINATION_TYPE { TOPIC, QUEUE };
+
+    protected BrokerService broker;
+    protected Connection connection;
+    protected DESTINATION_TYPE destinationType = DESTINATION_TYPE.QUEUE;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = createBroker();
+        broker.start();
+        WaitForJettyListener.waitForJettySocketToAccept(getBrokerURL());
+        ConnectionFactory factory = createConnectionFactory();
+        connection = factory.createConnection();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws URISyntaxException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(getBrokerURL());
+        return factory;
+    }
+
+    protected String getBrokerURL() {
+        return "http://localhost:8161?useCompression=true";
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(false);
+        answer.setUseJmx(false);
+        answer.setManagementContext(null);
+        answer.addConnector(getBrokerURL());
+        return answer;
+    }
+
+    protected Destination createDestination(Session session, DESTINATION_TYPE destinationType)
throws JMSException {
+        switch(destinationType) {
+        case TOPIC:
+            return session.createTopic("TOPIC." + counter.getAndIncrement());
+        case QUEUE:
+            return session.createQueue("QUEUE." + counter.getAndIncrement());
+        }
+        Assert.fail("Invalid destination type: " + destinationType);
+        return null;
+    }
+
+    abstract class MessageCommand<M extends Message> {
+
+        public final void assertMessage(M message) throws JMSException {
+            Assert.assertNotNull(message);
+            completeCheck(message);
+        }
+
+        public abstract void completeCheck(M message) throws JMSException;
+
+        public abstract M createMessage(Session session) throws JMSException;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <E extends Message> void executeTest(MessageCommand<E> messageCommand)
throws JMSException {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        {
+            E message = messageCommand.createMessage(session);
+            producer.send(message);
+        }
+
+        {
+            E message = (E)consumer.receive(1000);
+            messageCommand.assertMessage(message);
+        }
+
+        Assert.assertNull(consumer.receiveNoWait());
+    }
+
+    @Test
+    public void testTextMessage() throws Exception {
+        executeTest(new MessageCommand<TextMessage>() {
+            private String textString = "This is a simple text string";
+
+            public TextMessage createMessage(Session session) throws JMSException {
+                return session.createTextMessage(textString);
+            }
+
+            public void completeCheck(TextMessage message) throws JMSException {
+                Assert.assertEquals("The returned text string was different", textString,
message.getText());
+            }
+        });
+    }
+
+    @Test
+    public void testBytesMessage() throws Exception {
+        executeTest(new MessageCommand<BytesMessage>() {
+            private byte[] bytes = "This is a simple text string".getBytes();
+
+            public BytesMessage createMessage(Session session) throws JMSException {
+                BytesMessage message =  session.createBytesMessage();
+                message.writeBytes(bytes);
+                return message;
+            }
+
+            public void completeCheck(BytesMessage message) throws JMSException {
+                byte[] result = new byte[bytes.length];
+                message.readBytes(result);
+                Assert.assertArrayEquals("The returned byte array was different", bytes,
result);
+            }
+        });
+    }
+
+    @Test
+    public void testMapMessage() throws Exception {
+        executeTest(new MessageCommand<MapMessage>() {
+            public MapMessage createMessage(Session session) throws JMSException {
+                MapMessage message =  session.createMapMessage();
+                message.setInt("value", 13);
+                return message;
+            }
+
+            public void completeCheck(MapMessage message) throws JMSException {
+                Assert.assertEquals("The returned mapped value was different", 13, message.getInt("value"));
+            }
+        });
+    }
+
+    @Test
+    public void testObjectMessage() throws Exception {
+        executeTest(new MessageCommand<ObjectMessage>() {
+            private Long value = new Long(101);
+
+            public ObjectMessage createMessage(Session session) throws JMSException {
+                return session.createObjectMessage(value);
+            }
+
+            public void completeCheck(ObjectMessage message) throws JMSException {
+                Assert.assertEquals("The returned object was different", value, message.getObject());
+            }
+        });
+    }
+
+    @Test
+    public void testStreamMessage() throws Exception {
+        executeTest(new MessageCommand<StreamMessage>() {
+            private Long value = new Long(1013);
+
+            public StreamMessage createMessage(Session session) throws JMSException {
+                StreamMessage message = session.createStreamMessage();
+                message.writeObject(value);
+                return message;
+            }
+
+            public void completeCheck(StreamMessage message) throws JMSException {
+                Assert.assertEquals("The returned stream object was different", value, message.readObject());
+            }
+        });
+    }
+}

Propchange: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpJMSMessagesWithCompressionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpJmsSendAndReceiveWithCompressionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpJmsSendAndReceiveWithCompressionTest.java?rev=1188839&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpJmsSendAndReceiveWithCompressionTest.java
(added)
+++ activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpJmsSendAndReceiveWithCompressionTest.java
Tue Oct 25 18:03:37 2011
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.http;
+
+import java.util.List;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests the Wire Level Http GZip compression.
+ */
+public class HttpJmsSendAndReceiveWithCompressionTest extends JmsTopicSendReceiveWithTwoConnectionsTest
{
+    private static final Logger logger = LoggerFactory.getLogger(HttpJmsSendAndReceiveWithCompressionTest.class);
+
+    protected BrokerService broker;
+
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+            broker.start();
+        }
+        super.setUp();
+        WaitForJettyListener.waitForJettySocketToAccept(getBrokerURL());
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() {
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(getBrokerURL());
+        return connectionFactory;
+    }
+
+    protected String getBrokerURL() {
+        return "http://localhost:8161?useCompression=true";
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(false);
+        answer.addConnector(getBrokerURL());
+        return answer;
+    }
+
+    protected void consumeMessage(Message message, List<Message> messageList) {
+        super.consumeMessage(message, messageList);
+        if (message instanceof TextMessage) {
+            TextMessage textMessage = TextMessage.class.cast(message);
+            try {
+                logger.debug("Received text message with text: {}", textMessage.getText());
+            } catch( javax.jms.JMSException jmsE) {
+                logger.debug("Received an exception while trying to retrieve the text message",
jmsE);
+                throw new RuntimeException(jmsE);
+            }
+        } else {
+            logger.debug("Received a non text message: {}", message);
+        }
+    }
+}

Propchange: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpJmsSendAndReceiveWithCompressionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpSendCompressedMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpSendCompressedMessagesTest.java?rev=1188839&r1=1188838&r2=1188839&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpSendCompressedMessagesTest.java
(original)
+++ activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/HttpSendCompressedMessagesTest.java
Tue Oct 25 18:03:37 2011
@@ -44,6 +44,12 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * This test covers the Message Compression feature of the ActiveMQConnectionFactory.setUseCompression
+ * and has no relation to Http transport level compression.  The Messages are compressed
using the
+ * deflate algorithm by the ActiveMQ layer before marshalled to XML so only the Message body
will
+ * be compressed.
+ */
 public class HttpSendCompressedMessagesTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(HttpSendCompressedMessagesTest.class);



Mime
View raw message