activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6422
Date Fri, 09 Sep 2016 22:34:21 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 2fdc2600a -> da9fedead


https://issues.apache.org/jira/browse/AMQ-6422

Adds a split consumer test that uses presettled receivers.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/da9fedea
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/da9fedea
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/da9fedea

Branch: refs/heads/master
Commit: da9fedead4078cc82efb32e15d8d9cd53c8e82dc
Parents: 2fdc260
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Sep 9 18:34:03 2016 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Sep 9 18:34:03 2016 -0400

----------------------------------------------------------------------
 .../amqp/interop/AmqpSendReceiveTest.java       | 79 ++++++++++++++++++++
 1 file changed, 79 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/da9fedea/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index f39fc3e..3132e6e 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -615,4 +615,83 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
         receiver.close();
         connection.close();
     }
+
+    @Test(timeout = 60000)
+    public void testTwoPresettledReceiversReceiveAllMessages() throws Exception {
+        final int MSG_COUNT = 100;
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        final String address = "queue://" + getTestName();
+
+        AmqpSender sender = session.createSender(address);
+        AmqpReceiver receiver1 = session.createReceiver(address, null, false, true);
+        AmqpReceiver receiver2 = session.createReceiver(address, null, false, true);
+
+        for (int i = 0; i < MSG_COUNT; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setMessageId("msg" + i);
+            sender.send(message);
+        }
+
+        final DestinationViewMBean destinationView = getProxyToQueue(getTestName());
+
+        LOG.info("Attempting to read first two messages with receiver #1");
+        receiver1.flow(2);
+        AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
+        AmqpMessage message2 = receiver1.receive(10, TimeUnit.SECONDS);
+        assertNotNull("Should have read message 1", message1);
+        assertNotNull("Should have read message 2", message2);
+        assertEquals("msg0", message1.getMessageId());
+        assertEquals("msg1", message2.getMessageId());
+        message1.accept();
+        message2.accept();
+
+        LOG.info("Attempting to read next two messages with receiver #2");
+        receiver2.flow(2);
+        AmqpMessage message3 = receiver2.receive(10, TimeUnit.SECONDS);
+        AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS);
+        assertNotNull("Should have read message 3", message3);
+        assertNotNull("Should have read message 4", message4);
+        assertEquals("msg2", message3.getMessageId());
+        assertEquals("msg3", message4.getMessageId());
+        message3.accept();
+        message4.accept();
+
+        assertTrue("Should be no inflight messages: " + destinationView.getInFlightCount(),
Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return destinationView.getInFlightCount() == 0;
+            }
+        }));
+
+        LOG.info("*** Attempting to read remaining messages with both receivers");
+        int splitCredit = (MSG_COUNT - 4) / 2;
+
+        LOG.info("**** Receiver #1 granting creadit[{}] for its block of messages", splitCredit);
+        receiver1.flow(splitCredit);
+        for (int i = 0; i < splitCredit; i++) {
+            AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
+            assertNotNull("Receiver #1 should have read a message", message);
+            LOG.info("Receiver #1 read message: {}", message.getMessageId());
+            message.accept();
+        }
+
+        LOG.info("**** Receiver #2 granting creadit[{}] for its block of messages", splitCredit);
+        receiver2.flow(splitCredit);
+        for (int i = 0; i < splitCredit; i++) {
+            AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
+            assertNotNull("Receiver #2 should have read a message", message);
+            LOG.info("Receiver #2 read message: {}", message.getMessageId());
+            message.accept();
+        }
+
+        receiver1.close();
+        receiver2.close();
+
+        connection.close();
+    }
 }


Mime
View raw message