qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject svn commit: r1545693 - in /qpid/jms/trunk/src: main/java/org/apache/qpid/jms/engine/AmqpBytesMessage.java test/java/org/apache/qpid/jms/engine/AmqpBytesMessageTest.java test/java/org/apache/qpid/jms/impl/BytesMessageImplTest.java
Date Tue, 26 Nov 2013 15:52:09 GMT
Author: robbie
Date: Tue Nov 26 15:52:08 2013
New Revision: 1545693

URL: http://svn.apache.org/r1545693
Log:
QPIDJMS-9: round out BytesMessage handling of amqp data section, add unit testing

Added:
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpBytesMessageTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/BytesMessageImplTest.java
Modified:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpBytesMessage.java

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpBytesMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpBytesMessage.java?rev=1545693&r1=1545692&r2=1545693&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpBytesMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpBytesMessage.java Tue Nov
26 15:52:08 2013
@@ -46,9 +46,13 @@ public class AmqpBytesMessage extends Am
     public void setBytes(byte[] bytes)
     {
         getMessage().setBody(new Data(new Binary(bytes)));
+        //TODO: set the content type in the case we received an amqp-value section containing
binary
     }
 
-    public ByteArrayInputStream getByteArrayInputStream()
+    /**
+     * @throws IllegalStateException if the underlying message content can't be retrieved
as bytes
+     */
+    public ByteArrayInputStream getByteArrayInputStream() throws IllegalStateException
     {
         Section body = getMessage().getBody();
 
@@ -66,7 +70,7 @@ public class AmqpBytesMessage extends Am
                 _length = 0;
                 return createEmptyByteArrayInputStream();
             }
-            if(value instanceof Binary)
+            else if(value instanceof Binary)
             {
                 Binary b = (Binary)value;
                 _length = b.getLength();
@@ -74,18 +78,26 @@ public class AmqpBytesMessage extends Am
             }
             else
             {
-                throw new RuntimeException("Unexpected body content type: " + value.getClass().getSimpleName());
+                throw new IllegalStateException("Unexpected amqp-value body content type:
" + value.getClass().getSimpleName());
             }
         }
         else if(body instanceof Data)
         {
             Binary b = ((Data) body).getValue();
-            _length = b.getLength();
-            return new ByteArrayInputStream(b.getArray(), b.getArrayOffset(), b.getLength());
+            if(b == null)
+            {
+                _length = 0;
+                return createEmptyByteArrayInputStream();
+            }
+            else
+            {
+                _length = b.getLength();
+                return new ByteArrayInputStream(b.getArray(), b.getArrayOffset(), b.getLength());
+            }
         }
         else
         {
-            throw new RuntimeException("Unexpected message body type: " + body.getClass().getSimpleName());
+            throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
         }
     }
 
@@ -94,11 +106,12 @@ public class AmqpBytesMessage extends Am
         return new ByteArrayInputStream(new byte[0]);
     }
 
-    public long getBytesLength()
+    /**
+     * @throws IllegalStateException if the underlying message content can't be retrieved
as bytes
+     */
+    public long getBytesLength() throws IllegalStateException
     {
         getByteArrayInputStream();
         return _length;
     }
-
-    //TODO: methods to access/set content
 }

Added: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpBytesMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpBytesMessageTest.java?rev=1545693&view=auto
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpBytesMessageTest.java (added)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpBytesMessageTest.java Tue
Nov 26 15:52:08 2013
@@ -0,0 +1,251 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.engine;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.qpid.jms.QpidJmsTestCase;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class AmqpBytesMessageTest extends QpidJmsTestCase
+{
+    private AmqpConnection _mockAmqpConnection;
+    private Delivery _mockDelivery;
+
+    @Before
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _mockAmqpConnection = Mockito.mock(AmqpConnection.class);
+        _mockDelivery = Mockito.mock(Delivery.class);
+    }
+
+    @Test
+    public void testGetInputStreamWithNewMessageToSendReturnsEmptyInputStream() throws Exception
+    {
+        AmqpBytesMessage amqpBytesMessage = new AmqpBytesMessage();
+
+        ByteArrayInputStream byteArrayInputStream = amqpBytesMessage.getByteArrayInputStream();
+        assertNotNull(byteArrayInputStream);
+
+        //try to read a byte, it should return -1 bytes read, i.e EOS.
+        assertEquals(-1, byteArrayInputStream.read(new byte[1]));
+    }
+
+    @Test
+    public void testGetInputStreamUsingReceivedMessageWithNoBodySectionReturnsEmptyInputStream()
throws Exception
+    {
+        Message message = Proton.message();
+        AmqpBytesMessage amqpBytesMessage = new AmqpBytesMessage(_mockDelivery, message,
_mockAmqpConnection);
+
+        ByteArrayInputStream byteArrayInputStream = amqpBytesMessage.getByteArrayInputStream();
+        assertNotNull(byteArrayInputStream);
+
+        //try to read a byte, it should return -1 bytes read, i.e EOS.
+        assertEquals(-1, byteArrayInputStream.read(new byte[1]));
+    }
+
+    @Test
+    public void testGetBytesLengthUsingNewMessageToSend() throws Exception
+    {
+        AmqpBytesMessage amqpBytesMessage = new AmqpBytesMessage();
+
+        assertEquals(0, amqpBytesMessage.getBytesLength());
+    }
+
+    @Test
+    public void testNewMessageToSendHasContentTypeButNoBodySection() throws Exception
+    {
+        //TODO: this test assumes we can omit the body section. If we decide otherwise
+        //it should instead check for e.g. a data section containing 0 length binary
+        AmqpBytesMessage amqpBytesMessage = new AmqpBytesMessage();
+        Message protonMessage = amqpBytesMessage.getMessage();
+
+        assertNotNull(protonMessage);
+        assertNull(protonMessage.getBody());
+
+        String contentType = protonMessage.getContentType();
+        assertNotNull("content type should be set", contentType);
+        assertEquals(AmqpBytesMessage.CONTENT_TYPE, contentType);
+    }
+
+    @Test
+    public void testGetBytesLengthUsingPopulatedMessageToSend() throws Exception
+    {
+        AmqpBytesMessage amqpBytesMessage = new AmqpBytesMessage();
+
+        byte[] bytes = "myBytes".getBytes();
+        amqpBytesMessage.setBytes(bytes);
+
+        assertEquals(bytes.length, amqpBytesMessage.getBytesLength());
+    }
+
+    @Test
+    public void testGetBytesLengthUsingReceivedMessageWithDataSectionContainingNonZeroLengthBinary()
throws Exception
+    {
+        Message message = Proton.message();
+        int length = 5;
+        message.setBody(new Data(new Binary(new byte[length])));
+        AmqpBytesMessage amqpBytesMessage = new AmqpBytesMessage(_mockDelivery, message,
_mockAmqpConnection);
+
+        assertEquals(length, amqpBytesMessage.getBytesLength());
+    }
+
+    @Test
+    public void testGetBytesLengthUsingReceivedMessageWithAmqpValueSectionContainingNonZeroLengthBinary()
throws Exception
+    {
+        Message message = Proton.message();
+        int length = 10;
+        message.setBody(new AmqpValue(new Binary(new byte[length])));
+        AmqpBytesMessage amqpBytesMessage = new AmqpBytesMessage(_mockDelivery, message,
_mockAmqpConnection);
+
+        assertEquals(length, amqpBytesMessage.getBytesLength());
+    }
+
+    @Test
+    public void testInputStreamUsingReceivedMessageWithAmqpValueSectionContainingBinary()
throws Exception
+    {
+        byte[] bytes = "myBytes".getBytes();
+
+        Message message = Proton.message();
+        message.setBody(new AmqpValue(new Binary(bytes)));
+
+        AmqpBytesMessage amqpBytesMessage = new AmqpBytesMessage(_mockDelivery, message,
_mockAmqpConnection);
+        ByteArrayInputStream bytesStream = amqpBytesMessage.getByteArrayInputStream();
+
+        //retrieve the expected bytes, check they match
+        byte[] receivedBytes = new byte[bytes.length];
+        bytesStream.read(receivedBytes);
+        assertTrue(Arrays.equals(bytes, receivedBytes));
+
+        //verify no more bytes remain, i.e EOS
+        assertEquals(-1, bytesStream.read(new byte[1]));
+    }
+
+    @Test
+    public void testInputStreamUsingReceivedMessageWithDataSection() throws Exception
+    {
+        byte[] bytes = "myBytes".getBytes();
+
+        Message message = Proton.message();
+        message.setBody(new Data(new Binary(bytes)));
+
+        AmqpBytesMessage amqpBytesMessage = new AmqpBytesMessage(_mockDelivery, message,
_mockAmqpConnection);
+        ByteArrayInputStream bytesStream = amqpBytesMessage.getByteArrayInputStream();
+        assertNotNull(bytesStream);
+
+        //retrieve the expected bytes, check they match
+        byte[] receivedBytes = new byte[bytes.length];
+        bytesStream.read(receivedBytes);
+        assertTrue(Arrays.equals(bytes, receivedBytes));
+
+        //verify no more bytes remain, i.e EOS
+        assertEquals(-1, bytesStream.read(new byte[1]));
+    }
+
+    @Test
+    public void testGetTextUsingReceivedMessageWithDataSectionContainingNullReturnsEmptyBAIS()
throws Exception
+    {
+        Message message = Proton.message();
+        message.setBody(new Data(null));
+
+        AmqpBytesMessage amqpBytesMessage = new AmqpBytesMessage(_mockDelivery, message,
_mockAmqpConnection);
+        ByteArrayInputStream bytesStream = amqpBytesMessage.getByteArrayInputStream();
+        assertNotNull(bytesStream);
+
+        assertEquals(0, amqpBytesMessage.getBytesLength());
+        assertEquals(-1, bytesStream.read(new byte[1]));
+    }
+
+    @Test
+    public void testGetMethodsWithNonAmqpValueNonDataSectionThrowsISE() throws Exception
+    {
+        Message message = Proton.message();
+        message.setBody(new AmqpSequence(new ArrayList<Object>()));
+        AmqpBytesMessage amqpBytesMessage = new AmqpBytesMessage(_mockDelivery, message,
_mockAmqpConnection);
+
+        try
+        {
+            amqpBytesMessage.getByteArrayInputStream();
+            fail("expected exception not thrown");
+        }
+        catch(IllegalStateException ise)
+        {
+            //expected
+        }
+
+        try
+        {
+            amqpBytesMessage.getBytesLength();
+            fail("expected exception not thrown");
+        }
+        catch(IllegalStateException ise)
+        {
+            //expected
+        }
+    }
+
+    @Test
+    public void testGetMethodsWithAmqpValueContainingNonNullNonBinaryValueThrowsISE() throws
Exception
+    {
+        Message message = Proton.message();
+        message.setBody(new AmqpValue(true));
+        AmqpBytesMessage amqpBytesMessage = new AmqpBytesMessage(_mockDelivery, message,
_mockAmqpConnection);
+
+        try
+        {
+            amqpBytesMessage.getByteArrayInputStream();
+            fail("expected exception not thrown");
+        }
+        catch(IllegalStateException ise)
+        {
+            //expected
+        }
+
+        try
+        {
+            amqpBytesMessage.getBytesLength();
+            fail("expected exception not thrown");
+        }
+        catch(IllegalStateException ise)
+        {
+            //expected
+        }
+    }
+}

Added: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/BytesMessageImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/BytesMessageImplTest.java?rev=1545693&view=auto
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/BytesMessageImplTest.java (added)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/BytesMessageImplTest.java Tue Nov
26 15:52:08 2013
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+
+import org.apache.qpid.jms.QpidJmsTestCase;
+import org.apache.qpid.jms.engine.AmqpBytesMessage;
+import org.apache.qpid.jms.engine.AmqpConnection;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class BytesMessageImplTest extends QpidJmsTestCase
+{
+    private Delivery _mockDelivery;
+    private ConnectionImpl _mockConnectionImpl;
+    private SessionImpl _mockSessionImpl;
+    private AmqpConnection _mockAmqpConnection;
+
+    @Before
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _mockAmqpConnection = Mockito.mock(AmqpConnection.class);
+        _mockConnectionImpl = Mockito.mock(ConnectionImpl.class);
+        _mockSessionImpl = Mockito.mock(SessionImpl.class);
+    }
+
+    @Test
+    public void testGetBodyLengthUsingReceivedMessageWithNoBodySection() throws Exception
+    {
+        Message message = Proton.message();
+        AmqpBytesMessage testAmqpMessage = new AmqpBytesMessage(_mockDelivery, message, _mockAmqpConnection);
+        BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(testAmqpMessage, _mockSessionImpl,_mockConnectionImpl);
+
+        assertEquals(0, bytesMessageImpl.getBodyLength());
+    }
+
+    @Test
+    public void testReadBytesUsingReceivedMessageWithNoBodySectionReturnsEOS() throws Exception
+    {
+        Message message = Proton.message();
+        AmqpBytesMessage testAmqpMessage = new AmqpBytesMessage(_mockDelivery, message, _mockAmqpConnection);
+        BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(testAmqpMessage, _mockSessionImpl,_mockConnectionImpl);
+
+        //verify attempting to read bytes returns -1, i.e EOS
+        assertEquals(-1, bytesMessageImpl.readBytes(new byte[2]));
+    }
+
+    @Test
+    public void testReadBytesUsingReceivedMessageWithDataSectionReturnsBytes() throws Exception
+    {
+        byte[] bytes = "myBytesData".getBytes();
+
+        Message message = Proton.message();
+        message.setBody(new Data(new Binary(bytes)));
+
+        AmqpBytesMessage amqpBytesMessage = new AmqpBytesMessage(_mockDelivery, message,
_mockAmqpConnection);
+        BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(amqpBytesMessage, _mockSessionImpl,_mockConnectionImpl);
+
+        //retrieve the expected bytes, check they match
+        byte[] receivedBytes = new byte[bytes.length];
+        bytesMessageImpl.readBytes(receivedBytes);
+        assertTrue(Arrays.equals(bytes, receivedBytes));
+
+        //verify no more bytes remain, i.e EOS
+        assertEquals(-1, bytesMessageImpl.readBytes(receivedBytes));
+
+        assertEquals(bytes.length, bytesMessageImpl.getBodyLength());
+    }
+
+    @Test
+    public void testReadBytesUsingReceivedMessageWithAmpValueSectionReturnsBytes() throws
Exception
+    {
+        byte[] bytes = "myBytesAmqpValue".getBytes();
+
+        Message message = Proton.message();
+        message.setBody(new AmqpValue(new Binary(bytes)));
+
+        AmqpBytesMessage amqpBytesMessage = new AmqpBytesMessage(_mockDelivery, message,
_mockAmqpConnection);
+        BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(amqpBytesMessage, _mockSessionImpl,_mockConnectionImpl);
+
+        //retrieve the expected bytes, check they match
+        byte[] receivedBytes = new byte[bytes.length];
+        bytesMessageImpl.readBytes(receivedBytes);
+        assertTrue(Arrays.equals(bytes, receivedBytes));
+
+        //verify no more bytes remain, i.e EOS
+        assertEquals(-1, bytesMessageImpl.readBytes(receivedBytes));
+
+        assertEquals(bytes.length, bytesMessageImpl.getBodyLength());
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message