activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: AMQ-7001 Resolve issues with encode failures on copied messages
Date Wed, 01 Aug 2018 22:14:54 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.15.x a8b2fa1a0 -> ad0ae74f3


AMQ-7001 Resolve issues with encode failures on copied messages

Ensure that messages are copied to avoid contention on message content
when concurrent store and dispatch is enabled and better handle the AMQP
message type value.  Adds an AMQP Encoder for UTF8Buffer to encode down
to AMQP String encodings to allow for encoded OpenWire messages such as
MapMessage which can contain UTF8Buffer instances for String keys and
values.

(cherry picked from commit 9ec6ee43b146736fdf635090b2829a0c621a00f3)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ad0ae74f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ad0ae74f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ad0ae74f

Branch: refs/heads/activemq-5.15.x
Commit: ad0ae74f3b432f96aba17957bbb6dca0e0bdf8f5
Parents: a8b2fa1
Author: Timothy Bish <tabish121@gmail.com>
Authored: Wed Aug 1 18:13:37 2018 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Wed Aug 1 18:14:45 2018 -0400

----------------------------------------------------------------------
 .../message/AMQPNativeOutboundTransformer.java  |  14 +-
 .../message/JMSMappingOutboundTransformer.java  |  20 +-
 .../transport/amqp/message/UTF8BufferType.java  | 205 +++++++++++++++++++
 .../transport/amqp/protocol/AmqpSender.java     |   7 +-
 .../activemq/transport/amqp/JMSClientTest.java  |  28 +++
 .../amqp/message/UTF8BufferTypeTest.java        | 160 +++++++++++++++
 6 files changed, 416 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ad0ae74f/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
index cbc3461..37053a8 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
@@ -41,11 +41,15 @@ public class AMQPNativeOutboundTransformer implements OutboundTransformer
{
     }
 
     static EncodedMessage transform(OutboundTransformer options, ActiveMQBytesMessage message)
throws JMSException {
-        long messageFormat;
-        try {
-            messageFormat = message.getLongProperty(JMS_AMQP_MESSAGE_FORMAT);
-        } catch (MessageFormatException e) {
-            return null;
+        final long messageFormat;
+        if (message.propertyExists(JMS_AMQP_MESSAGE_FORMAT)) {
+            try {
+                messageFormat = message.getLongProperty(JMS_AMQP_MESSAGE_FORMAT);
+            } catch (MessageFormatException e) {
+                return null;
+            }
+        } else {
+            messageFormat = 0;
         }
 
         Binary encodedMessage = getBinaryFromMessageBody(message);

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad0ae74f/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
index 5b22099..f3db5e4 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -102,11 +102,17 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer
{
     public static final byte TEMP_QUEUE_TYPE = 0x02;
     public static final byte TEMP_TOPIC_TYPE = 0x03;
 
+    private final UTF8BufferType utf8BufferEncoding;
+
     // For now Proton requires that we create a decoder to create an encoder
     private final DecoderImpl decoder = new DecoderImpl();
     private final EncoderImpl encoder = new EncoderImpl(decoder);
     {
         AMQPDefinedTypes.registerAllTypes(decoder, encoder);
+
+        utf8BufferEncoding = new UTF8BufferType(encoder, decoder);
+
+        encoder.register(utf8BufferEncoding);
     }
 
     @Override
@@ -159,7 +165,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer
{
             }
             properties.setTo(destination.getQualifiedName());
             if (maMap == null) {
-                maMap = new HashMap<Symbol, Object>();
+                maMap = new HashMap<>();
             }
             maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination));
         }
@@ -170,7 +176,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer
{
             }
             properties.setReplyTo(replyTo.getQualifiedName());
             if (maMap == null) {
-                maMap = new HashMap<Symbol, Object>();
+                maMap = new HashMap<>();
             }
             maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
         }
@@ -276,7 +282,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer
{
                     continue;
                 } else if (key.startsWith(MESSAGE_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH))
{
                     if (maMap == null) {
-                        maMap = new HashMap<Symbol, Object>();
+                        maMap = new HashMap<>();
                     }
                     String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length());
                     maMap.put(Symbol.valueOf(name), value);
@@ -307,14 +313,14 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer
{
                     continue;
                 } else if (key.startsWith(DELIVERY_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH))
{
                     if (daMap == null) {
-                        daMap = new HashMap<Symbol, Object>();
+                        daMap = new HashMap<>();
                     }
                     String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length());
                     daMap.put(Symbol.valueOf(name), value);
                     continue;
                 } else if (key.startsWith(FOOTER_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
                     if (footerMap == null) {
-                        footerMap = new HashMap<Object, Object>();
+                        footerMap = new HashMap<>();
                     }
                     String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
                     footerMap.put(name, value);
@@ -328,7 +334,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer
{
             // The property didn't map into any other slot so we store it in the
             // Application Properties section of the message.
             if (apMap == null) {
-                apMap = new HashMap<String, Object>();
+                apMap = new HashMap<>();
             }
             apMap.put(key, value);
         }
@@ -409,7 +415,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer
{
         } else if (messageType == CommandTypes.ACTIVEMQ_MAP_MESSAGE) {
             body = new AmqpValue(getMapFromMessageBody((ActiveMQMapMessage) message));
         } else if (messageType == CommandTypes.ACTIVEMQ_STREAM_MESSAGE) {
-            ArrayList<Object> list = new ArrayList<Object>();
+            ArrayList<Object> list = new ArrayList<>();
             final ActiveMQStreamMessage m = (ActiveMQStreamMessage) message;
             try {
                 while (true) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad0ae74f/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/UTF8BufferType.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/UTF8BufferType.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/UTF8BufferType.java
new file mode 100644
index 0000000..387032e
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/UTF8BufferType.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.EncodingCodes;
+import org.apache.qpid.proton.codec.PrimitiveType;
+import org.apache.qpid.proton.codec.PrimitiveTypeEncoding;
+import org.apache.qpid.proton.codec.TypeEncoding;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import org.fusesource.hawtbuf.UTF8Buffer;
+
+/**
+ * AMQP Type used to allow to proton-j codec to deal with UTF8Buffer types as if
+ * they were String elements.
+ */
+public class UTF8BufferType implements PrimitiveType<UTF8Buffer> {
+
+    private final UTF8BufferEncoding largeBufferEncoding;
+    private final UTF8BufferEncoding smallBufferEncoding;
+
+    public UTF8BufferType(EncoderImpl encoder, DecoderImpl decoder) {
+        this.largeBufferEncoding = new LargeUTF8BufferEncoding(encoder, decoder);
+        this.smallBufferEncoding = new SmallUTF8BufferEncoding(encoder, decoder);
+    }
+
+    @Override
+    public Class<UTF8Buffer> getTypeClass() {
+        return UTF8Buffer.class;
+    }
+
+    @Override
+    public PrimitiveTypeEncoding<UTF8Buffer> getEncoding(UTF8Buffer value) {
+        return value.getLength() <= 255 ? smallBufferEncoding : largeBufferEncoding;
+    }
+
+    @Override
+    public PrimitiveTypeEncoding<UTF8Buffer> getCanonicalEncoding() {
+        return largeBufferEncoding;
+    }
+
+    @Override
+    public Collection<? extends PrimitiveTypeEncoding<UTF8Buffer>> getAllEncodings()
{
+        return Arrays.asList(smallBufferEncoding, largeBufferEncoding);
+    }
+
+    @Override
+    public void write(UTF8Buffer value) {
+        final TypeEncoding<UTF8Buffer> encoding = getEncoding(value);
+        encoding.writeConstructor();
+        encoding.writeValue(value);
+    }
+
+    public abstract class UTF8BufferEncoding implements PrimitiveTypeEncoding<UTF8Buffer>
{
+
+        private final EncoderImpl encoder;
+        private final DecoderImpl decoder;
+
+        public UTF8BufferEncoding(EncoderImpl encoder, DecoderImpl decoder) {
+            this.encoder = encoder;
+            this.decoder = decoder;
+        }
+
+        @Override
+        public int getConstructorSize() {
+            return 1;
+        }
+
+        @Override
+        public boolean isFixedSizeVal() {
+            return false;
+        }
+
+        @Override
+        public boolean encodesJavaPrimitive() {
+            return false;
+        }
+
+        /**
+         * @return the number of bytes the size portion of the encoded value requires.
+         */
+        public abstract int getSizeBytes();
+
+        @Override
+        public void writeConstructor() {
+            getEncoder().writeRaw(getEncodingCode());
+        }
+
+        @Override
+        public void writeValue(UTF8Buffer value) {
+            writeSize(value);
+            WritableBuffer buffer = getEncoder().getBuffer();
+            buffer.put(value.getData(), value.getOffset(), value.getLength());
+        }
+
+        /**
+         * Write the size of the buffer using the appropriate type (byte or int) depending
+         * on the encoding type being used.
+         *
+         * @param value
+         *      The UTF8Buffer value that is being encoded.
+         */
+        public abstract void writeSize(UTF8Buffer value);
+
+        @Override
+        public int getValueSize(UTF8Buffer value) {
+            return getSizeBytes() + value.getLength();
+        }
+
+        @Override
+        public Class<UTF8Buffer> getTypeClass() {
+            return UTF8Buffer.class;
+        }
+
+        @Override
+        public PrimitiveType<UTF8Buffer> getType() {
+            return UTF8BufferType.this;
+        }
+
+        @Override
+        public boolean encodesSuperset(TypeEncoding<UTF8Buffer> encoding) {
+            return (getType() == encoding.getType());
+        }
+
+        @Override
+        public UTF8Buffer readValue() {
+            throw new UnsupportedOperationException("No decoding to UTF8Buffer exists");
+        }
+
+        @Override
+        public void skipValue() {
+            throw new UnsupportedOperationException("No decoding to UTF8Buffer exists");
+        }
+
+        public DecoderImpl getDecoder() {
+            return decoder;
+        }
+
+        public EncoderImpl getEncoder() {
+            return encoder;
+        }
+    }
+
+    public class LargeUTF8BufferEncoding extends UTF8BufferEncoding {
+
+        public LargeUTF8BufferEncoding(EncoderImpl encoder, DecoderImpl decoder) {
+            super(encoder, decoder);
+        }
+
+        @Override
+        public byte getEncodingCode() {
+            return EncodingCodes.STR32;
+        }
+
+        @Override
+        public int getSizeBytes() {
+            return Integer.BYTES;
+        }
+
+        @Override
+        public void writeSize(UTF8Buffer value) {
+            getEncoder().getBuffer().putInt(value.getLength());
+        }
+    }
+
+    public class SmallUTF8BufferEncoding extends UTF8BufferEncoding {
+
+        public SmallUTF8BufferEncoding(EncoderImpl encoder, DecoderImpl decoder) {
+            super(encoder, decoder);
+        }
+
+        @Override
+        public byte getEncodingCode() {
+            return EncodingCodes.STR8;
+        }
+
+        @Override
+        public int getSizeBytes() {
+            return Byte.BYTES;
+        }
+
+        @Override
+        public void writeSize(UTF8Buffer value) {
+            getEncoder().getBuffer().put((byte) value.getLength());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad0ae74f/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 17185a0..9b75c1a 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.transport.amqp.protocol;
 
 import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
-import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
 
 import java.io.IOException;
 import java.util.LinkedList;
@@ -449,11 +448,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
 
                 ActiveMQMessage temp = null;
                 if (md.getMessage() != null) {
-                    temp = (ActiveMQMessage) md.getMessage();
-                    if (!temp.getProperties().containsKey(JMS_AMQP_MESSAGE_FORMAT)) {
-                        temp = (ActiveMQMessage) md.getMessage().copy();
-                        temp.setProperty(JMS_AMQP_MESSAGE_FORMAT, 0);
-                    }
+                    temp = (ActiveMQMessage) md.getMessage().copy();
                 }
 
                 final ActiveMQMessage jms = temp;

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad0ae74f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 97ce106..c240398 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -36,6 +36,7 @@ import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
+import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -99,6 +100,33 @@ public class JMSClientTest extends JMSClientTestSupport {
         }
     }
 
+    @Test(timeout = 60000)
+    public void testSendJMSMapMessage() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+
+        connection = createConnection();
+        {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            assertNotNull(session);
+            Queue queue = session.createQueue(name.getMethodName());
+            MessageProducer producer = session.createProducer(queue);
+            MapMessage message = session.createMapMessage();
+            message.setBoolean("Boolean", false);
+            message.setString("STRING", "TEST");
+            producer.send(message);
+            QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+            assertEquals(1, proxy.getQueueSize());
+
+            MessageConsumer consumer = session.createConsumer(queue);
+            Message received = consumer.receive(5000);
+            assertNotNull(received);
+            assertTrue(received instanceof MapMessage);
+            MapMessage map = (MapMessage) received;
+            assertEquals("TEST", map.getString("STRING"));
+            assertEquals(false, map.getBooleanProperty("Boolean"));
+        }
+    }
+
     @Test(timeout=30000)
     public void testAnonymousProducerConsume() throws Exception {
         ActiveMQAdmin.enableJMSFrameTracing();

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad0ae74f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/UTF8BufferTypeTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/UTF8BufferTypeTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/UTF8BufferTypeTest.java
new file mode 100644
index 0000000..c9bc52c
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/UTF8BufferTypeTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+import org.apache.qpid.proton.codec.AMQPDefinedTypes;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.EncodingCodes;
+import org.apache.qpid.proton.codec.PrimitiveTypeEncoding;
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the UTF8Buffer type encoder
+ */
+public class UTF8BufferTypeTest {
+
+    private final UTF8BufferType utf8BufferEncoding;
+    private final DecoderImpl decoder = new DecoderImpl();
+    private final EncoderImpl encoder = new EncoderImpl(decoder);
+    {
+        AMQPDefinedTypes.registerAllTypes(decoder, encoder);
+
+        utf8BufferEncoding = new UTF8BufferType(encoder, decoder);
+
+        encoder.register(utf8BufferEncoding);
+    }
+
+    private String smallString = UUID.randomUUID().toString();
+    private String largeString = UUID.randomUUID().toString() + UUID.randomUUID().toString()
+
+                                 UUID.randomUUID().toString() + UUID.randomUUID().toString()
+
+                                 UUID.randomUUID().toString() + UUID.randomUUID().toString()
+
+                                 UUID.randomUUID().toString() + UUID.randomUUID().toString();
+
+    private UTF8Buffer smallBuffer;
+    private UTF8Buffer largeBuffer;
+
+    @Before
+    public void setUp() {
+        smallBuffer = new UTF8Buffer(smallString.getBytes(StandardCharsets.UTF_8));
+        largeBuffer = new UTF8Buffer(largeString.getBytes(StandardCharsets.UTF_8));
+    }
+
+    @Test
+    public void testGetAllEncodings() {
+        assertEquals(2, utf8BufferEncoding.getAllEncodings().size());
+    }
+
+    @Test
+    public void testGetTypeClass() {
+        assertEquals(UTF8Buffer.class, utf8BufferEncoding.getTypeClass());
+    }
+
+    @Test
+    public void testGetCanonicalEncoding() {
+        assertNotNull(utf8BufferEncoding.getCanonicalEncoding());
+    }
+
+    @Test
+    public void testGetEncodingForSmallUTF8Buffer() {
+        PrimitiveTypeEncoding<UTF8Buffer> encoding = utf8BufferEncoding.getEncoding(smallBuffer);
+
+        assertTrue(encoding instanceof UTF8BufferType.SmallUTF8BufferEncoding);
+        assertEquals(1, encoding.getConstructorSize());
+        assertEquals(smallBuffer.getLength() + Byte.BYTES, encoding.getValueSize(smallBuffer));
+        assertEquals(EncodingCodes.STR8, encoding.getEncodingCode());
+        assertFalse(encoding.encodesJavaPrimitive());
+        assertEquals(utf8BufferEncoding, encoding.getType());
+    }
+
+    @Test
+    public void testGetEncodingForLargeUTF8Buffer() {
+        PrimitiveTypeEncoding<UTF8Buffer> encoding = utf8BufferEncoding.getEncoding(largeBuffer);
+
+        assertTrue(encoding instanceof UTF8BufferType.LargeUTF8BufferEncoding);
+        assertEquals(1, encoding.getConstructorSize());
+        assertEquals(largeBuffer.getLength() + Integer.BYTES, encoding.getValueSize(largeBuffer));
+        assertEquals(EncodingCodes.STR32, encoding.getEncodingCode());
+        assertFalse(encoding.encodesJavaPrimitive());
+        assertEquals(utf8BufferEncoding, encoding.getType());
+    }
+
+    @Test
+    public void testEncodeDecodeEmptyStringBuffer() {
+        final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
+        encoder.setByteBuffer(buffer);
+        encoder.writeObject(new UTF8Buffer(""));
+
+        byte[] copy = new byte[buffer.getArrayLength()];
+        System.arraycopy(buffer.getArray(), 0, copy, 0, buffer.getArrayLength());
+
+        ReadableBuffer encoded = ReadableBuffer.ByteBufferReader.wrap(copy);
+        decoder.setBuffer(encoded);
+
+        Object valueRead = decoder.readObject();
+        assertTrue(valueRead instanceof String);
+        String decodedString = (String) valueRead;
+        assertEquals("", decodedString);
+    }
+
+    @Test
+    public void testEncodeDecodeSmallBuffer() {
+        final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
+        encoder.setByteBuffer(buffer);
+        encoder.writeObject(smallBuffer);
+
+        byte[] copy = new byte[buffer.getArrayLength()];
+        System.arraycopy(buffer.getArray(), 0, copy, 0, buffer.getArrayLength());
+
+        ReadableBuffer encoded = ReadableBuffer.ByteBufferReader.wrap(copy);
+        decoder.setBuffer(encoded);
+
+        Object valueRead = decoder.readObject();
+        assertTrue(valueRead instanceof String);
+        String decodedString = (String) valueRead;
+        assertEquals(smallString, decodedString);
+    }
+
+    @Test
+    public void testEncodeDecodeLargeBuffer() {
+        final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
+        encoder.setByteBuffer(buffer);
+        encoder.writeObject(largeBuffer);
+
+        byte[] copy = new byte[buffer.getArrayLength()];
+        System.arraycopy(buffer.getArray(), 0, copy, 0, buffer.getArrayLength());
+
+        ReadableBuffer encoded = ReadableBuffer.ByteBufferReader.wrap(copy);
+        decoder.setBuffer(encoded);
+
+        Object valueRead = decoder.readObject();
+        assertTrue(valueRead instanceof String);
+        String decodedString = (String) valueRead;
+        assertEquals(largeString, decodedString);
+    }
+}


Mime
View raw message