activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-331 support 0-length large msg
Date Tue, 05 Jan 2016 15:54:06 GMT
ARTEMIS-331 support 0-length large msg


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/978f8eed
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/978f8eed
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/978f8eed

Branch: refs/heads/master
Commit: 978f8eeda8c1a597402363e62c1f26e2a5cce9b7
Parents: 2838128
Author: jbertram <jbertram@apache.org>
Authored: Mon Jan 4 14:44:09 2016 -0600
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Jan 5 10:44:54 2016 -0500

----------------------------------------------------------------------
 .../core/server/impl/ServerConsumerImpl.java    |   9 +-
 .../integration/client/LargeMessageTest.java    | 150 +++++++++++++++++++
 2 files changed, 158 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/978f8eed/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 7d54d31..7936c76 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -1036,7 +1036,14 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
 
                context.encode(bodyBuffer, localChunkLen);
 
-               byte[] body = bodyBuffer.toByteBuffer().array();
+               byte[] body;
+
+               if (bodyBuffer.toByteBuffer().hasArray()) {
+                  body = bodyBuffer.toByteBuffer().array();
+               }
+               else {
+                  body = new byte[0];
+               }
 
                int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this,
body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/978f8eed/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index d99ba63..bc94ab3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -2104,6 +2104,156 @@ public class LargeMessageTest extends LargeMessageTestBase {
       }
    }
 
+   // https://issues.apache.org/jira/browse/ARTEMIS-331
+   @Test
+   public void testSendStreamingSingleEmptyMessage() throws Exception {
+      final String propertyName = "myStringPropertyName";
+      final String propertyValue = "myStringPropertyValue";
+      ClientSession session = null;
+      ActiveMQServer server = null;
+
+      final int SIZE = 0;
+      try {
+
+         server = createServer(true, isNetty());
+
+         server.start();
+
+         locator.setMinLargeMessageSize(100 * 1024);
+
+         ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
+
+         session = sf.createSession(null, null, false, true, true, false, 0);
+
+         session.createQueue(ADDRESS, ADDRESS, null, true);
+
+         ClientMessage clientFile = session.createMessage(true);
+         clientFile.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(SIZE));
+         clientFile.putStringProperty(propertyName, propertyValue);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         session.start();
+
+         log.debug("Sending");
+         producer.send(clientFile);
+
+         producer.close();
+
+         log.debug("Waiting");
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+         ClientMessage msg2 = consumer.receive(10000);
+
+         msg2.acknowledge();
+
+         msg2.setOutputStream(createFakeOutputStream());
+         Assert.assertTrue(msg2.waitOutputStreamCompletion(60000));
+         Assert.assertEquals(propertyValue, msg2.getStringProperty(propertyName));
+
+         session.commit();
+
+         Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
+         Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable())));
+
+      }
+      finally {
+         try {
+            session.close();
+         }
+         catch (Throwable ignored) {
+         }
+
+         try {
+            server.stop();
+         }
+         catch (Throwable ignored) {
+         }
+      }
+   }
+
+   // https://issues.apache.org/jira/browse/ARTEMIS-331
+   @Test
+   public void testSendStreamingEmptyMessagesWithRestart() throws Exception {
+      final String propertyName = "myStringPropertyName";
+      final String propertyValue = "myStringPropertyValue";
+      ClientSession session = null;
+      ActiveMQServer server = null;
+
+      final int SIZE = 0;
+      try {
+
+         server = createServer(true, isNetty());
+
+         server.start();
+
+         locator.setMinLargeMessageSize(100 * 1024);
+
+         ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
+
+         session = sf.createSession(null, null, false, true, true, false, 0);
+
+         session.createQueue(ADDRESS, ADDRESS, null, true);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         for (int i = 0; i < 10; i++) {
+            ClientMessage clientFile = session.createMessage(true);
+            clientFile.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(SIZE));
+            clientFile.putStringProperty(propertyName, propertyValue + i);
+            producer.send(clientFile);
+         }
+
+         producer.close();
+
+         session.close();
+
+         sf.close();
+
+         server.stop();
+
+         server.start();
+
+         sf = addSessionFactory(createSessionFactory(locator));
+
+         session = sf.createSession(null, null, false, true, true, false, 0);
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+         session.start();
+
+         for (int i = 0; i < 10; i++) {
+            ClientMessage msg2 = consumer.receive(10000);
+
+            msg2.acknowledge();
+
+            msg2.setOutputStream(createFakeOutputStream());
+            Assert.assertTrue(msg2.waitOutputStreamCompletion(60000));
+            Assert.assertEquals(propertyValue + i, msg2.getStringProperty(propertyName));
+
+            session.commit();
+         }
+
+         Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
+         Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable())));
+
+      }
+      finally {
+         try {
+            session.close();
+         }
+         catch (Throwable ignored) {
+         }
+
+         try {
+            server.stop();
+         }
+         catch (Throwable ignored) {
+         }
+      }
+   }
+
    /**
     * Receive messages but never reads them, leaving the buffer pending
     */


Mime
View raw message