activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-775 Tests for AMQP credit handling
Date Fri, 07 Oct 2016 22:40:57 GMT
ARTEMIS-775 Tests for AMQP credit handling

Some new tests that cover some AMQP credit handling scenarios.  Test
case 'testCloseBusyReceiver' currently fails due to being dispatched a
duplicate message.

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bc6ad1df
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bc6ad1df
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bc6ad1df

Branch: refs/heads/master
Commit: bc6ad1dfdbada759e06c50107bbc7bbee2c09a15
Parents: 07ab1be
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Oct 7 12:09:28 2016 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Fri Oct 7 18:40:49 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/client/AmqpMessage.java      |   6 +-
 .../integration/amqp/AmqpSendReceiveTest.java   | 557 +++++++++++++++++++
 2 files changed, 560 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc6ad1df/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 2d95d29..3ca17a6 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -269,7 +269,7 @@ public class AmqpMessage {
     * @return the set message ID in String form or null if not set.
     */
    public String getMessageId() {
-      if (message.getProperties() == null) {
+      if (message.getProperties() == null || message.getProperties().getMessageId() == null)
{
          return null;
       }
 
@@ -319,7 +319,7 @@ public class AmqpMessage {
     * @return the set correlation ID in String form or null if not set.
     */
    public String getCorrelationId() {
-      if (message.getProperties() == null) {
+      if (message.getProperties() == null || message.getProperties().getCorrelationId() ==
null) {
          return null;
       }
 
@@ -394,7 +394,7 @@ public class AmqpMessage {
     * @return true if the message is marked as being durable.
     */
    public boolean isDurable() {
-      if (message.getHeader() == null) {
+      if (message.getHeader() == null || message.getHeader().getDurable() == null) {
          return false;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bc6ad1df/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
new file mode 100644
index 0000000..6597a62
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test basic send and receive scenarios using only AMQP sender and receiver links.
+ */
+public class AmqpSendReceiveTest extends AmqpClientTestSupport {
+
+   protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);
+
+   @Test(timeout = 60000)
+   public void testSimpleSendOneReceiveOne() throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+
+      AmqpMessage message = new AmqpMessage();
+
+      message.setMessageId("msg" + 1);
+      message.setMessageAnnotation("serialNo", 1);
+      message.setText("Test-Message");
+
+      sender.send(message);
+      sender.close();
+
+      LOG.info("Attempting to read message with receiver");
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.flow(2);
+      AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
+      assertNotNull("Should have read message", received);
+      assertEquals("msg1", received.getMessageId());
+      received.accept();
+
+      receiver.close();
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testCloseBusyReceiver() throws Exception {
+      final int MSG_COUNT = 20;
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+      AmqpSender sender = session.createSender(getTestName());
+
+      for (int i = 0; i < MSG_COUNT; i++) {
+         AmqpMessage message = new AmqpMessage();
+
+         message.setMessageId("msg" + i);
+         message.setMessageAnnotation("serialNo", i);
+         message.setText("Test-Message");
+
+         System.out.println("Sending message: " + message.getMessageId());
+
+         sender.send(message);
+      }
+
+      sender.close();
+
+      Queue queue = getProxyToQueue(getTestName());
+      assertEquals(MSG_COUNT, queue.getMessageCount());
+
+      AmqpReceiver receiver1 = session.createReceiver(getTestName());
+      receiver1.flow(MSG_COUNT);
+      AmqpMessage received = receiver1.receive(5, TimeUnit.SECONDS);
+      assertNotNull("Should have got a message", received);
+      assertEquals("msg0", received.getMessageId());
+      receiver1.close();
+
+      AmqpReceiver receiver2 = session.createReceiver(getTestName());
+      receiver2.flow(200);
+      for (int i = 0; i < MSG_COUNT; ++i) {
+         received = receiver2.receive(5, TimeUnit.SECONDS);
+         assertNotNull("Should have got a message", received);
+         System.out.println("Read message: " + received.getMessageId());
+         assertEquals("msg" + i, received.getMessageId());
+      }
+
+      receiver2.close();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testReceiveWithJMSSelectorFilter() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpMessage message1 = new AmqpMessage();
+      message1.setGroupId("abcdefg");
+      message1.setApplicationProperty("sn", 100);
+
+      AmqpMessage message2 = new AmqpMessage();
+      message2.setGroupId("hijklm");
+      message2.setApplicationProperty("sn", 200);
+
+      AmqpSender sender = session.createSender(getTestName());
+      sender.send(message1);
+      sender.send(message2);
+      sender.close();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName(), "sn = 100");
+      receiver.flow(2);
+      AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull("Should have read a message", received);
+      assertEquals(100, received.getApplicationProperty("sn"));
+      assertEquals("abcdefg", received.getGroupId());
+      received.accept();
+
+      assertNull(receiver.receive(1, TimeUnit.SECONDS));
+
+      receiver.close();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testAdvancedLinkFlowControl() throws Exception {
+      final int MSG_COUNT = 20;
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+
+      for (int i = 0; i < MSG_COUNT; i++) {
+         AmqpMessage message = new AmqpMessage();
+
+         message.setMessageId("msg" + i);
+         message.setMessageAnnotation("serialNo", i);
+         message.setText("Test-Message");
+
+         sender.send(message);
+      }
+
+      sender.close();
+
+      LOG.info("Attempting to read first two messages with receiver #1");
+      AmqpReceiver receiver1 = session.createReceiver(getTestName());
+      receiver1.flow(2);
+      AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
+      AmqpMessage message2 = receiver1.receive(10, TimeUnit.SECONDS);
+      assertNotNull("Should have read message 1", message1);
+      assertNotNull("Should have read message 2", message2);
+      assertEquals("msg0", message1.getMessageId());
+      assertEquals("msg1", message2.getMessageId());
+      message1.accept();
+      message2.accept();
+
+      LOG.info("Attempting to read next two messages with receiver #2");
+      AmqpReceiver receiver2 = session.createReceiver(getTestName());
+      receiver2.flow(2);
+      AmqpMessage message3 = receiver2.receive(10, TimeUnit.SECONDS);
+      AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS);
+      assertNotNull("Should have read message 3", message3);
+      assertNotNull("Should have read message 4", message4);
+      assertEquals("msg2", message3.getMessageId());
+      assertEquals("msg3", message4.getMessageId());
+      message3.accept();
+      message4.accept();
+
+      LOG.info("Attempting to read remaining messages with receiver #1");
+      receiver1.flow(MSG_COUNT - 4);
+      for (int i = 4; i < MSG_COUNT; i++) {
+         AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
+         assertNotNull("Should have read a message", message);
+         assertEquals("msg" + i, message.getMessageId());
+         message.accept();
+      }
+
+      receiver1.close();
+      receiver2.close();
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testDispatchOrderWithPrefetchOfOne() throws Exception {
+      final int MSG_COUNT = 20;
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+
+      for (int i = 0; i < MSG_COUNT; i++) {
+         AmqpMessage message = new AmqpMessage();
+
+         message.setMessageId("msg" + i);
+         message.setMessageAnnotation("serialNo", i);
+         message.setText("Test-Message");
+
+         sender.send(message);
+      }
+
+      sender.close();
+
+      AmqpReceiver receiver1 = session.createReceiver(getTestName());
+      receiver1.flow(1);
+
+      AmqpReceiver receiver2 = session.createReceiver(getTestName());
+      receiver2.flow(1);
+
+      AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
+      AmqpMessage message2 = receiver2.receive(10, TimeUnit.SECONDS);
+      assertNotNull("Should have read message 1", message1);
+      assertNotNull("Should have read message 2", message2);
+      assertEquals("msg0", message1.getMessageId());
+      assertEquals("msg1", message2.getMessageId());
+      message1.accept();
+      message2.accept();
+
+      receiver1.flow(1);
+      AmqpMessage message3 = receiver1.receive(10, TimeUnit.SECONDS);
+      receiver2.flow(1);
+      AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS);
+      assertNotNull("Should have read message 3", message3);
+      assertNotNull("Should have read message 4", message4);
+      assertEquals("msg2", message3.getMessageId());
+      assertEquals("msg3", message4.getMessageId());
+      message3.accept();
+      message4.accept();
+
+      LOG.info("*** Attempting to read remaining messages with both receivers");
+      int splitCredit = (MSG_COUNT - 4) / 2;
+
+      LOG.info("**** Receiver #1 granting creadit[{}] for its block of messages", splitCredit);
+      receiver1.flow(splitCredit);
+      for (int i = 0; i < splitCredit; i++) {
+         AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
+         assertNotNull("Receiver #1 should have read a message", message);
+         LOG.info("Receiver #1 read message: {}", message.getMessageId());
+         message.accept();
+      }
+
+      LOG.info("**** Receiver #2 granting creadit[{}] for its block of messages", splitCredit);
+      receiver2.flow(splitCredit);
+      for (int i = 0; i < splitCredit; i++) {
+         AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
+         assertNotNull("Receiver #2 should have read message[" + i + "]", message);
+         LOG.info("Receiver #2 read message: {}", message.getMessageId());
+         message.accept();
+      }
+
+      receiver1.close();
+      receiver2.close();
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testReceiveMessageAndRefillCreditBeforeAccept() throws Exception {
+      AmqpClient client = createAmqpClient();
+
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      final String address = getTestName();
+
+      AmqpReceiver receiver = session.createReceiver(address);
+      AmqpSender sender = session.createSender(address);
+
+      for (int i = 0; i < 2; i++) {
+         AmqpMessage message = new AmqpMessage();
+         message.setMessageId("msg" + i);
+         sender.send(message);
+      }
+      sender.close();
+
+      receiver.flow(1);
+      AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(received);
+
+      receiver.flow(1);
+      received.accept();
+
+      received = receiver.receive(10, TimeUnit.SECONDS);
+      assertNotNull(received);
+      received.accept();
+
+      receiver.close();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testReceiveMessageAndRefillCreditBeforeAcceptOnQueueAsync() throws Exception
{
+      final AmqpClient client = createAmqpClient();
+      final LinkedList<Throwable> errors = new LinkedList<>();
+      final CountDownLatch receiverReady = new CountDownLatch(1);
+      ExecutorService executorService = Executors.newCachedThreadPool();
+
+      final String address = getTestName();
+
+      executorService.submit(new Runnable() {
+         @Override
+         public void run() {
+            try {
+               LOG.info("Starting consumer connection");
+               AmqpConnection connection = addConnection(client.connect());
+               AmqpSession session = connection.createSession();
+               AmqpReceiver receiver = session.createReceiver(address);
+               receiver.flow(1);
+               receiverReady.countDown();
+               AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+               assertNotNull(received);
+
+               receiver.flow(1);
+               received.accept();
+
+               received = receiver.receive(5, TimeUnit.SECONDS);
+               assertNotNull(received);
+               received.accept();
+
+               receiver.close();
+               connection.close();
+
+            } catch (Exception error) {
+               errors.add(error);
+            }
+         }
+      });
+
+      // producer
+      executorService.submit(new Runnable() {
+         @Override
+         public void run() {
+            try {
+               receiverReady.await(20, TimeUnit.SECONDS);
+               AmqpConnection connection = addConnection(client.connect());
+               AmqpSession session = connection.createSession();
+
+               AmqpSender sender = session.createSender(address);
+               for (int i = 0; i < 2; i++) {
+                  AmqpMessage message = new AmqpMessage();
+                  message.setMessageId("msg" + i);
+                  sender.send(message);
+               }
+               sender.close();
+               connection.close();
+            } catch (Exception ignored) {
+               ignored.printStackTrace();
+            }
+         }
+      });
+
+      executorService.shutdown();
+      executorService.awaitTermination(20, TimeUnit.SECONDS);
+      assertTrue("no errors: " + errors, errors.isEmpty());
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageDurabliltyFollowsSpec() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+      AmqpReceiver receiver1 = session.createReceiver(getTestName());
+
+      Queue queue = getProxyToQueue(getTestName());
+
+      // Create default message that should be sent as non-durable
+      AmqpMessage message1 = new AmqpMessage();
+      message1.setText("Test-Message -> non-durable");
+      message1.setDurable(false);
+      message1.setMessageId("ID:Message:1");
+      sender.send(message1);
+
+      assertEquals(1, queue.getMessageCount());
+      receiver1.flow(1);
+      message1 = receiver1.receive(50, TimeUnit.SECONDS);
+      assertNotNull("Should have read a message", message1);
+      assertFalse("First message sent should not be durable", message1.isDurable());
+      message1.accept();
+
+      // Create default message that should be sent as non-durable
+      AmqpMessage message2 = new AmqpMessage();
+      message2.setText("Test-Message -> durable");
+      message2.setDurable(true);
+      message2.setMessageId("ID:Message:2");
+      sender.send(message2);
+
+      assertEquals(1, queue.getMessageCount());
+      receiver1.flow(1);
+      message2 = receiver1.receive(50, TimeUnit.SECONDS);
+      assertNotNull("Should have read a message", message2);
+      assertTrue("Second message sent should be durable", message2.isDurable());
+      message2.accept();
+
+      sender.close();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testReceiveMessageBeyondAckedAmountQueue() throws Exception {
+      final int MSG_COUNT = 50;
+
+      AmqpClient client = createAmqpClient();
+
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      final String address = getTestName();
+
+      AmqpReceiver receiver = session.createReceiver(address);
+      AmqpSender sender = session.createSender(address);
+
+      final Queue destinationView = getProxyToQueue(address);
+
+      for (int i = 0; i < MSG_COUNT; i++) {
+         AmqpMessage message = new AmqpMessage();
+         message.setMessageId("msg" + i);
+         sender.send(message);
+      }
+
+      List<AmqpMessage> pendingAcks = new ArrayList<>();
+
+      for (int i = 0; i < MSG_COUNT; i++) {
+         receiver.flow(1);
+         AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+         assertNotNull(received);
+         pendingAcks.add(received);
+      }
+
+      // Send one more to check in-flight stays at zero with no credit and all
+      // pending messages settled.
+      AmqpMessage message = new AmqpMessage();
+      message.setMessageId("msg-final");
+      sender.send(message);
+
+      for (AmqpMessage pendingAck : pendingAcks) {
+         pendingAck.accept();
+      }
+
+      assertTrue("Should be no inflight messages: " + destinationView.getDeliveringCount(),
Wait.waitFor(new Wait.Condition() {
+
+         @Override
+         public boolean isSatisified() throws Exception {
+            return destinationView.getDeliveringCount() == 0;
+         }
+      }));
+
+      sender.close();
+      receiver.close();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testTwoPresettledReceiversReceiveAllMessages() throws Exception {
+      final int MSG_COUNT = 100;
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      final String address = getTestName();
+
+      AmqpSender sender = session.createSender(address);
+      AmqpReceiver receiver1 = session.createReceiver(address, null, false, true);
+      AmqpReceiver receiver2 = session.createReceiver(address, null, false, true);
+
+      for (int i = 0; i < MSG_COUNT; i++) {
+         AmqpMessage message = new AmqpMessage();
+         message.setMessageId("msg" + i);
+         sender.send(message);
+      }
+
+      LOG.info("Attempting to read first two messages with receiver #1");
+      receiver1.flow(2);
+      AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
+      AmqpMessage message2 = receiver1.receive(10, TimeUnit.SECONDS);
+      assertNotNull("Should have read message 1", message1);
+      assertNotNull("Should have read message 2", message2);
+      assertEquals("msg0", message1.getMessageId());
+      assertEquals("msg1", message2.getMessageId());
+      message1.accept();
+      message2.accept();
+
+      LOG.info("Attempting to read next two messages with receiver #2");
+      receiver2.flow(2);
+      AmqpMessage message3 = receiver2.receive(10, TimeUnit.SECONDS);
+      AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS);
+      assertNotNull("Should have read message 3", message3);
+      assertNotNull("Should have read message 4", message4);
+      assertEquals("msg2", message3.getMessageId());
+      assertEquals("msg3", message4.getMessageId());
+      message3.accept();
+      message4.accept();
+
+      LOG.info("*** Attempting to read remaining messages with both receivers");
+      int splitCredit = (MSG_COUNT - 4) / 2;
+
+      LOG.info("**** Receiver #1 granting credit[{}] for its block of messages", splitCredit);
+      receiver1.flow(splitCredit);
+      for (int i = 0; i < splitCredit; i++) {
+         AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
+         assertNotNull("Receiver #1 should have read a message", message);
+         LOG.info("Receiver #1 read message: {}", message.getMessageId());
+         message.accept();
+      }
+
+      LOG.info("**** Receiver #2 granting credit[{}] for its block of messages", splitCredit);
+      receiver2.flow(splitCredit);
+      for (int i = 0; i < splitCredit; i++) {
+         AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
+         assertNotNull("Receiver #2 should have read a message[" + i + "]", message);
+         LOG.info("Receiver #2 read message: {}", message.getMessageId());
+         message.accept();
+      }
+
+      receiver1.close();
+      receiver2.close();
+
+      connection.close();
+   }
+}


Mime
View raw message