activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-267 Add test for handling of AMQP header and durability
Date Wed, 03 May 2017 20:33:17 GMT
ARTEMIS-267 Add test for handling of AMQP header and durability

Adds a test that validates that messages that are either lacking a
header or are set to be non-durable are not persisted and are not
recovered on broker restart.  

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/cf3e2bf7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/cf3e2bf7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/cf3e2bf7

Branch: refs/heads/master
Commit: cf3e2bf7f06df833d22f4ae55e7b1308d1800de5
Parents: 0146109
Author: Timothy Bish <tabish121@gmail.com>
Authored: Wed May 3 14:42:11 2017 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed May 3 16:33:12 2017 -0400

----------------------------------------------------------------------
 .../integration/amqp/AmqpSendReceiveTest.java   | 75 +++++++++++++++++++-
 1 file changed, 74 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cf3e2bf7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index 0cae79f..9e5b03d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -679,7 +679,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
       assertFalse("First message sent should not be durable", message1.isDurable());
       message1.accept();
 
-      // Create default message that should be sent as non-durable
+      // Create default message that should be sent as durable
       AmqpMessage message2 = new AmqpMessage();
       message2.setText("Test-Message -> durable");
       message2.setDurable(true);
@@ -698,6 +698,79 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
    }
 
    @Test(timeout = 60000)
+   public void testMessageWithHeaderMarkedDurableIsPersisted() throws Exception {
+      doTestBrokerRestartAndDurability(true, true);
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageWithHeaderMarkedNonDurableIsNotPersisted() throws Exception {
+      doTestBrokerRestartAndDurability(false, true);
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageWithNoHeaderIsNotPersisted() throws Exception {
+      doTestBrokerRestartAndDurability(false, false);
+   }
+
+   private void doTestBrokerRestartAndDurability(boolean durable, boolean enforceHeader)
throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getQueueName());
+
+      final Queue queueView1 = getProxyToQueue(getQueueName());
+
+      // Create default message that should be sent as non-durable
+      AmqpMessage message = new AmqpMessage();
+      message.setText("Test-Message -> non-durable");
+      message.setMessageId("ID:Message:1");
+
+      if (durable) {
+         message.setDurable(true);
+      } else {
+         if (enforceHeader) {
+            message.setDurable(false);
+            assertNotNull(message.getWrappedMessage().getHeader());
+         } else {
+            assertNull(message.getWrappedMessage().getHeader());
+         }
+      }
+
+      sender.send(message);
+      connection.close();
+
+      assertTrue("Message did not arrive", Wait.waitFor(() -> queueView1.getMessageCount()
== 1));
+
+      // Restart the server and the Queue should be empty
+      server.stop();
+      server.start();
+
+      // Reconnect now
+      connection = addConnection(client.connect());
+      session = connection.createSession();
+      AmqpReceiver receiver = session.createReceiver(getQueueName());
+
+      final Queue queueView2 = getProxyToQueue(getQueueName());
+      if (durable) {
+         assertTrue("Message should not have returned", Wait.waitFor(() -> queueView2.getMessageCount()
== 1));
+      } else {
+         assertTrue("Message should have been restored", Wait.waitFor(() -> queueView2.getMessageCount()
== 0));
+      }
+
+      receiver.flow(1);
+      message = receiver.receive(1, TimeUnit.SECONDS);
+
+      if (durable) {
+         assertNotNull("Should have read a message", message);
+      } else {
+         assertNull("Should not have read a message", message);
+      }
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
    public void testReceiveMessageBeyondAckedAmountQueue() throws Exception {
       final int MSG_COUNT = 50;
 


Mime
View raw message