activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-902 OpenWire Compression Issue
Date Fri, 23 Dec 2016 15:08:46 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 22f0fcf08 -> 79b48767d


ARTEMIS-902 OpenWire Compression Issue


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/eecbbb18
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/eecbbb18
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/eecbbb18

Branch: refs/heads/master
Commit: eecbbb18db195d97bb0135362be67acb0cc6a793
Parents: 22f0fcf
Author: Howard Gao <howard.gao@gmail.com>
Authored: Fri Dec 23 21:29:28 2016 +0800
Committer: Howard Gao <howard.gao@gmail.com>
Committed: Fri Dec 23 21:29:28 2016 +0800

----------------------------------------------------------------------
 .../openwire/OpenWireMessageConverter.java      | 16 +++--
 .../openwire/interop/CompressedInteropTest.java | 69 +++++++++++---------
 2 files changed, 50 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/eecbbb18/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index f49c972..950210b 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -490,10 +490,11 @@ public class OpenWireMessageConverter implements MessageConverter {
                   ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length()
+ 4);
                   OutputStream out = bytesOut;
                   if (isCompressed) {
-                     out = new DeflaterOutputStream(out);
+                     out = new DeflaterOutputStream(out, true);
                   }
                   try (DataOutputStream dataOut = new DataOutputStream(out)) {
                      MarshallingSupport.writeUTF8(dataOut, text.toString());
+                     dataOut.flush();
                      bytes = bytesOut.toByteArray();
                   }
                }
@@ -506,10 +507,11 @@ public class OpenWireMessageConverter implements MessageConverter {
                   ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
                   OutputStream os = out;
                   if (isCompressed) {
-                     os = new DeflaterOutputStream(os);
+                     os = new DeflaterOutputStream(os, true);
                   }
                   try (DataOutputStream dataOut = new DataOutputStream(os)) {
                      MarshallingSupport.marshalPrimitiveMap(map, dataOut);
+                     dataOut.flush();
                   }
                   bytes = out.toByteArray();
                }
@@ -520,8 +522,9 @@ public class OpenWireMessageConverter implements MessageConverter {
                buffer.readBytes(bytes);
                if (isCompressed) {
                   ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
-                  try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut)) {
+                  try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true))
{
                      out.write(bytes);
+                     out.flush();
                   }
                   bytes = bytesOut.toByteArray();
                }
@@ -529,7 +532,7 @@ public class OpenWireMessageConverter implements MessageConverter {
                org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
                OutputStream out = bytesOut;
                if (isCompressed) {
-                  out = new DeflaterOutputStream(bytesOut);
+                  out = new DeflaterOutputStream(bytesOut, true);
                }
                try (DataOutputStream dataOut = new DataOutputStream(out)) {
 
@@ -583,6 +586,7 @@ public class OpenWireMessageConverter implements MessageConverter {
                            stop = true;
                            break;
                      }
+                     dataOut.flush();
                   }
                }
                bytes = bytesOut.toByteArray();
@@ -602,6 +606,7 @@ public class OpenWireMessageConverter implements MessageConverter {
                         int count = deflater.deflate(bytesBuf);
                         compressed.write(bytesBuf, 0, count);
                      }
+                     compressed.flush();
                      ByteSequence byteSeq = compressed.toByteSequence();
                      ByteSequenceData.writeIntBig(byteSeq, length);
                      bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
@@ -615,8 +620,9 @@ public class OpenWireMessageConverter implements MessageConverter {
                buffer.readBytes(bytes);
                if (isCompressed) {
                   try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
-                       DeflaterOutputStream out = new DeflaterOutputStream(bytesOut)) {
+                       DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true))
{
                      out.write(bytes);
+                     out.flush();
                      bytes = bytesOut.toByteArray();
                   }
                }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/eecbbb18/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java
index ada2e55..1966609 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.openwire.interop;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -63,24 +64,32 @@ public class CompressedInteropTest extends BasicOpenWireTest {
       xaFactory.setUseCompression(true);
    }
 
-
    @Test
    public void testCoreReceiveOpenWireCompressedMessages() throws Exception {
+      testCompressedMessageSendReceive(true);
+   }
+
+   @Test
+   public void testOpenWireReceiveOpenWireCompressedMessages() throws Exception {
+      testCompressedMessageSendReceive(false);
+   }
+
+   private void testCompressedMessageSendReceive(boolean useCore) throws Exception {
       //TextMessage
       sendCompressedTextMessageUsingOpenWire();
-      receiveTextMessageUsingCore();
+      receiveTextMessage(useCore);
       //BytesMessage
       sendCompressedBytesMessageUsingOpenWire();
-      receiveBytesMessageUsingCore();
+      receiveBytesMessage(useCore);
       //MapMessage
       sendCompressedMapMessageUsingOpenWire();
-      receiveMapMessageUsingCore();
+      receiveMapMessage(useCore);
       //StreamMessage
       sendCompressedStreamMessageUsingOpenWire();
-      receiveStreamMessageUsingCore();
+      receiveStreamMessage(useCore);
       //ObjectMessage
       sendCompressedObjectMessageUsingOpenWire();
-      receiveObjectMessageUsingCore();
+      receiveObjectMessage(useCore);
    }
 
    private void sendCompressedStreamMessageUsingOpenWire() throws Exception {
@@ -106,8 +115,8 @@ public class CompressedInteropTest extends BasicOpenWireTest {
       producer.send(streamMessage);
    }
 
-   private void receiveStreamMessageUsingCore() throws Exception {
-      StreamMessage streamMessage = (StreamMessage) receiveMessageUsingCore();
+   private void receiveStreamMessage(boolean useCore) throws Exception {
+      StreamMessage streamMessage = (StreamMessage) receiveMessage(useCore);
       boolean booleanVal = streamMessage.readBoolean();
       assertTrue(booleanVal);
       byte byteVal = streamMessage.readByte();
@@ -149,8 +158,8 @@ public class CompressedInteropTest extends BasicOpenWireTest {
       producer.send(objectMessage);
    }
 
-   private void receiveObjectMessageUsingCore() throws Exception {
-      ObjectMessage objectMessage = (ObjectMessage) receiveMessageUsingCore();
+   private void receiveObjectMessage(boolean useCore) throws Exception {
+      ObjectMessage objectMessage = (ObjectMessage) receiveMessage(useCore);
       Object objectVal = objectMessage.getObject();
       assertEquals(TEXT, objectVal);
    }
@@ -178,8 +187,8 @@ public class CompressedInteropTest extends BasicOpenWireTest {
       producer.send(mapMessage);
    }
 
-   private void receiveMapMessageUsingCore() throws Exception {
-      MapMessage mapMessage = (MapMessage) receiveMessageUsingCore();
+   private void receiveMapMessage(boolean useCore) throws Exception {
+      MapMessage mapMessage = (MapMessage) receiveMessage(useCore);
 
       boolean booleanVal = mapMessage.getBoolean("boolean-type");
       assertTrue(booleanVal);
@@ -222,8 +231,8 @@ public class CompressedInteropTest extends BasicOpenWireTest {
       producer.send(bytesMessage);
    }
 
-   private void receiveBytesMessageUsingCore() throws Exception {
-      BytesMessage bytesMessage = (BytesMessage) receiveMessageUsingCore();
+   private void receiveBytesMessage(boolean useCore) throws Exception {
+      BytesMessage bytesMessage = (BytesMessage) receiveMessage(useCore);
 
       byte[] bytes = new byte[TEXT.getBytes(StandardCharsets.UTF_8).length];
       bytesMessage.readBytes(bytes);
@@ -233,16 +242,28 @@ public class CompressedInteropTest extends BasicOpenWireTest {
       assertEquals(TEXT, rcvString);
    }
 
-   private void receiveTextMessageUsingCore() throws Exception {
-      TextMessage txtMessage = (TextMessage) receiveMessageUsingCore();
+   private void receiveTextMessage(boolean useCore) throws Exception {
+      TextMessage txtMessage = (TextMessage) receiveMessage(useCore);
       assertEquals(TEXT, txtMessage.getText());
    }
 
-   private Message receiveMessageUsingCore() throws Exception {
+   private void sendCompressedTextMessageUsingOpenWire() throws Exception {
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+      final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
+
+      TextMessage textMessage = session.createTextMessage(TEXT);
+
+      producer.send(textMessage);
+   }
+
+   private Message receiveMessage(boolean useCore) throws Exception {
+      ConnectionFactory factoryToUse = useCore ? coreCf : factory;
       Connection jmsConn = null;
       Message message = null;
       try {
-         jmsConn = coreCf.createConnection();
+         jmsConn = factoryToUse.createConnection();
          jmsConn.start();
 
          Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -257,16 +278,4 @@ public class CompressedInteropTest extends BasicOpenWireTest {
       }
       return message;
    }
-
-   private void sendCompressedTextMessageUsingOpenWire() throws Exception {
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
-
-      final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
-
-      TextMessage textMessage = session.createTextMessage(TEXT);
-
-      producer.send(textMessage);
-   }
-
 }


Mime
View raw message