activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6438
Date Thu, 29 Sep 2016 15:12:42 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x aa32a0f79 -> a6d2a16b4


https://issues.apache.org/jira/browse/AMQ-6438

Better interop for MapMessage with Binary value in the entries of the
payload, should convert back and forth the byte to allow Message to be
treated as a MapMessage and not fall back to a BytesMessage encoding.
(cherry picked from commit d88c4e46ecffdba0040299df6efbcdc4171d1c3d)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a6d2a16b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a6d2a16b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a6d2a16b

Branch: refs/heads/activemq-5.14.x
Commit: a6d2a16b4c03d75a013947c8d4b6c48bdf7a60f0
Parents: aa32a0f
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu Sep 29 11:09:34 2016 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Thu Sep 29 11:12:34 2016 -0400

----------------------------------------------------------------------
 .../amqp/message/AmqpMessageSupport.java        | 14 +++-
 .../message/JMSMappingInboundTransformer.java   |  8 +-
 .../transport/amqp/JMSInteroperabilityTest.java | 84 +++++++++++++++++---
 .../JMSMappingInboundTransformerTest.java       | 35 ++++++++
 .../JMSMappingOutboundTransformerTest.java      | 28 +++++++
 5 files changed, 156 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a6d2a16b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
index 4f468ba..86151a1 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
@@ -23,7 +23,9 @@ import java.io.Serializable;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.zip.InflaterInputStream;
 
 import javax.jms.JMSException;
@@ -313,13 +315,19 @@ public final class AmqpMessageSupport {
      * @throws JMSException if an error occurs in constructing or fetching the Map.
      */
     public static Map<String, Object> getMapFromMessageBody(ActiveMQMapMessage message)
throws JMSException {
-        final HashMap<String, Object> map = new HashMap<String, Object>();
+        final HashMap<String, Object> map = new LinkedHashMap<String, Object>();
 
         final Map<String, Object> contentMap = message.getContentMap();
         if (contentMap != null) {
-            map.putAll(contentMap);
+            for (Entry<String, Object> entry : contentMap.entrySet()) {
+                Object value = entry.getValue();
+                if (value instanceof byte[]) {
+                    value = new Binary((byte[]) value);
+                }
+                map.put(entry.getKey(), value);
+            }
         }
 
-        return contentMap;
+        return map;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a6d2a16b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
index 79e4c2c..e121cec 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java
@@ -36,6 +36,7 @@ import java.nio.CharBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -202,7 +203,12 @@ public class JMSMappingInboundTransformer extends InboundTransformer
{
         ActiveMQMapMessage message = new ActiveMQMapMessage();
         final Set<Map.Entry<String, Object>> set = content.entrySet();
         for (Map.Entry<String, Object> entry : set) {
-            message.setObject(entry.getKey(), entry.getValue());
+            Object value = entry.getValue();
+            if (value instanceof Binary) {
+                Binary binary = (Binary) value;
+                value = Arrays.copyOfRange(binary.getArray(), binary.getArrayOffset(), binary.getLength());
+            }
+            message.setObject(entry.getKey(), value);
         }
         return message;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a6d2a16b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
index fa61e14..e3d3b17 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
@@ -34,6 +34,7 @@ import java.util.UUID;
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -42,6 +43,7 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.qpid.proton.amqp.Binary;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -253,7 +255,73 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testMapMessageSendReceive() throws Exception {
+    public void testMapMessageUsingPrimitiveSettersSendReceive() throws Exception {
+        Connection openwire = createJMSConnection();
+        Connection amqp = createConnection();
+
+        openwire.start();
+        amqp.start();
+
+        Session openwireSession = openwire.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session amqpSession = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Destination queue = openwireSession.createQueue(getDestinationName());
+
+        MessageProducer openwireProducer = openwireSession.createProducer(queue);
+        MessageConsumer amqpConsumer = amqpSession.createConsumer(queue);
+
+        byte[] bytesValue = new byte[] { 1, 2, 3, 4, 5 };
+
+        // Create the Message
+        MapMessage outgoing = openwireSession.createMapMessage();
+
+        outgoing.setBoolean("boolean", true);
+        outgoing.setByte("byte", (byte) 10);
+        outgoing.setBytes("bytes", bytesValue);
+        outgoing.setChar("char", 'B');
+        outgoing.setDouble("double", 24.42);
+        outgoing.setFloat("float", 3.14159f);
+        outgoing.setInt("integer", 1024);
+        outgoing.setLong("long", 8096l);
+        outgoing.setShort("short", (short) 255);
+
+        openwireProducer.send(outgoing);
+
+        // Now consume the MapMessage
+        Message received = amqpConsumer.receive(2000);
+        assertNotNull(received);
+        assertTrue("Expected MapMessage but got " + received, received instanceof ObjectMessage);
+        ObjectMessage incoming = (ObjectMessage) received;
+
+        Map<String, Object> incomingMap = (Map<String, Object>) incoming.getObject();
+
+        assertEquals(true, incomingMap.get("boolean"));
+        assertEquals(10, (byte) incomingMap.get("byte"));
+        assertEquals('B', incomingMap.get("char"));
+        assertEquals(24.42, (double) incomingMap.get("double"), 0.5);
+        assertEquals(3.14159f, (float) incomingMap.get("float"), 0.5f);
+        assertEquals(1024, incomingMap.get("integer"));
+        assertEquals(8096l, incomingMap.get("long"));
+        assertEquals(255, (short) incomingMap.get("short"));
+
+        // Test for the byte array which will be in an AMQP Binary as this message
+        // is received as an ObjectMessage by Qpid JMS
+        Object incomingValue = incomingMap.get("bytes");
+        assertNotNull(incomingValue);
+        assertTrue(incomingValue instanceof Binary);
+        Binary incomingBinary = (Binary) incomingValue;
+        byte[] incomingBytes = Arrays.copyOfRange(incomingBinary.getArray(), incomingBinary.getArrayOffset(),
incomingBinary.getLength());
+        assertTrue(Arrays.equals(bytesValue, incomingBytes));
+
+        amqp.close();
+        openwire.close();
+    }
+
+    //----- Tests for OpenWire <-> Qpid JMS using ObjectMessage --------------//
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testMapInObjectMessageSendReceive() throws Exception {
         Connection openwire = createJMSConnection();
         Connection amqp = createConnection();
 
@@ -284,7 +352,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
 
         openwireProducer.send(outgoing);
 
-        // Now consumer the ObjectMessage
+        // Now consume the ObjectMessage
         Message received = amqpConsumer.receive(2000);
         assertNotNull(received);
         assertTrue("Expected ObjectMessage but got " + received, received instanceof ObjectMessage);
@@ -300,8 +368,6 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
         openwire.close();
     }
 
-    //----- Tests for OpenWire <-> Qpid JMS using ObjectMessage --------------//
-
     @Test
     public void testQpidToOpenWireObjectMessage() throws Exception {
 
@@ -327,7 +393,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
         outgoing.setObject(UUID.randomUUID());
         amqpProducer.send(outgoing);
 
-        // Now consumer the ObjectMessage
+        // Now consume the ObjectMessage
         Message received = openwireConsumer.receive(2000);
         assertNotNull(received);
         LOG.info("Read new message: {}", received);
@@ -366,7 +432,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
         outgoing.setObject(UUID.randomUUID());
         openwireProducer.send(outgoing);
 
-        // Now consumer the ObjectMessage
+        // Now consume the ObjectMessage
         Message received = amqpConsumer.receive(2000);
         assertNotNull(received);
         LOG.info("Read new message: {}", received);
@@ -407,7 +473,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
         outgoing.setObject(UUID.randomUUID());
         openwireProducer.send(outgoing);
 
-        // Now consumer the ObjectMessage
+        // Now consume the ObjectMessage
         Message received = amqpConsumer.receive(2000);
         assertNotNull(received);
         LOG.info("Read new message: {}", received);
@@ -454,7 +520,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
 
         openwireProducer.send(outgoing);
 
-        // Now consumer the ObjectMessage
+        // Now consume the ObjectMessage
         Message received = amqpConsumer.receive(2000);
         assertNotNull(received);
         assertTrue(received instanceof ObjectMessage);
@@ -499,7 +565,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
             amqpProducer.send(outgoing);
         }
 
-        // Now consumer the message
+        // Now consume the message
         for (int i = 0; i < NUM_MESSAGES; ++i) {
             Message received = amqpConsumer.receive(2000);
             assertNotNull(received);

http://git-wip-us.apache.org/repos/asf/activemq/blob/a6d2a16b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
index 1427b5a..6ac080a 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
@@ -24,12 +24,14 @@ import static org.junit.Assert.assertTrue;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
 import javax.jms.Destination;
+import javax.jms.MapMessage;
 import javax.jms.Queue;
 import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
@@ -437,6 +439,39 @@ public class JMSMappingInboundTransformerTest {
     }
 
     /**
+     * Test that an amqp-value body containing a map that has an AMQP Binary as one of the
+     * entries encoded into the Map results in an MapMessage where a byte array can be read
+     * from the entry.
+     *
+     * @throws Exception if an error occurs during the test.
+     */
+    @Test
+    public void testCreateAmqpMapMessageFromAmqpValueWithMapContainingBinaryEntry() throws
Exception {
+        final String ENTRY_NAME = "bytesEntry";
+
+        Message message = Proton.message();
+        Map<String, Object> map = new HashMap<String, Object>();
+
+        byte[] inputBytes = new byte[] { 1, 2, 3, 4, 5 };
+        map.put(ENTRY_NAME, new Binary(inputBytes));
+
+        message.setBody(new AmqpValue(map));
+
+        EncodedMessage em = encodeMessage(message);
+
+        JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
+        javax.jms.Message jmsMessage = transformer.transform(em);
+
+        assertNotNull("Message should not be null", jmsMessage);
+        assertEquals("Unexpected message class type", ActiveMQMapMessage.class, jmsMessage.getClass());
+
+        MapMessage mapMessage = (MapMessage) jmsMessage;
+        byte[] outputBytes = mapMessage.getBytes(ENTRY_NAME);
+        assertNotNull(outputBytes);
+        assertTrue(Arrays.equals(inputBytes, outputBytes));
+    }
+
+    /**
      * Test that an amqp-value body containing a list results in an StreamMessage
      * when not otherwise annotated to indicate the type of JMS message it is.
      *

http://git-wip-us.apache.org/repos/asf/activemq/blob/a6d2a16b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
index ee69650..f0167b7 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
@@ -276,6 +276,34 @@ public class JMSMappingOutboundTransformerTest {
     }
 
     @Test
+    public void testConvertMapMessageToAmqpMessageWithByteArrayValueInBody() throws Exception
{
+        final byte[] byteArray = new byte[] { 1, 2, 3, 4, 5 };
+
+        ActiveMQMapMessage outbound = createMapMessage();
+        outbound.setBytes("bytes", byteArray);
+        outbound.onSend();
+        outbound.storeContent();
+
+        JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
+
+        EncodedMessage encoded = transformer.transform(outbound);
+        assertNotNull(encoded);
+
+        Message amqp = encoded.decode();
+
+        assertNotNull(amqp.getBody());
+        assertTrue(amqp.getBody() instanceof AmqpValue);
+        assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
+
+        @SuppressWarnings("unchecked")
+        Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue();
+
+        assertEquals(1, amqpMap.size());
+        Binary readByteArray = (Binary) amqpMap.get("bytes");
+        assertNotNull(readByteArray);
+    }
+
+    @Test
     public void testConvertMapMessageToAmqpMessage() throws Exception {
         ActiveMQMapMessage outbound = createMapMessage();
         outbound.setString("property-1", "string");


Mime
View raw message