activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6305
Date Fri, 03 Jun 2016 21:07:39 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 857597ca9 -> db71b43b1


https://issues.apache.org/jira/browse/AMQ-6305

Lower prefetch on settlement of allow for dispatch of the full amount of
granted credit.  Adds additional tests.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/db71b43b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/db71b43b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/db71b43b

Branch: refs/heads/master
Commit: db71b43b19e24019fe7ef08dd80d53998b9eec9b
Parents: 857597c
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Jun 3 17:05:00 2016 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Jun 3 17:05:00 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpSender.java     |  21 ++--
 .../transport/amqp/client/AmqpSession.java      |  21 ++++
 .../amqp/interop/AmqpReceiverTest.java          |  32 ++++++
 .../amqp/interop/AmqpSendReceiveTest.java       | 101 +++++++++++++++++--
 .../src/test/resources/log4j.properties         |   2 +-
 5 files changed, 159 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/db71b43b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 6c30828..567c507 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -427,16 +427,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                         LOG.trace("Sender:[{}] browse complete.", getEndpoint().getName());
                         getEndpoint().drained();
                         draining = false;
-                    } else {
-                        LOG.trace("Sender:[{}] updating conumser prefetch:{} after dispatch.",
-                                  getEndpoint().getName(), getEndpoint().getCredit());
-
-                        ConsumerControl control = new ConsumerControl();
-                        control.setConsumerId(getConsumerId());
-                        control.setDestination(getDestination());
-                        control.setPrefetch(Math.max(0, getEndpoint().getCredit() - 1));
-
-                        sendToActiveMQ(control);
                     }
 
                     jms.setRedeliveryCounter(md.getRedeliveryCounter());
@@ -467,6 +457,17 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
             tagCache.returnTag(tag);
         }
 
+        int newCredit = Math.max(0, getEndpoint().getCredit() - 1);
+        LOG.trace("Sender:[{}] updating conumser prefetch:{} after delivery settled.",
+                  getEndpoint().getName(), newCredit);
+
+        ConsumerControl control = new ConsumerControl();
+        control.setConsumerId(getConsumerId());
+        control.setDestination(getDestination());
+        control.setPrefetch(newCredit);
+
+        sendToActiveMQ(control);
+
         if (ackType == -1) {
             // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
             delivery.settle();

http://git-wip-us.apache.org/repos/asf/activemq/blob/db71b43b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index e327534..6ed7861 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -179,12 +179,33 @@ public class AmqpSession extends AmqpAbstractResource<Session>
{
      * @throws Exception if an error occurs while creating the receiver.
      */
     public AmqpReceiver createReceiver(String address, String selector, boolean noLocal)
throws Exception {
+        return createReceiver(address, selector, noLocal, false);
+    }
+
+    /**
+     * Create a receiver instance using the given address
+     *
+     * @param address
+     *        the address to which the receiver will subscribe for its messages.
+     * @param selector
+     *        the JMS selector to use for the subscription
+     * @param noLocal
+     *        should the subscription have messages from its connection filtered.
+     * @param presettle
+     *        should the receiver be created with a settled sender mode.
+     *
+     * @return a newly created receiver that is ready for use.
+     *
+     * @throws Exception if an error occurs while creating the receiver.
+     */
+    public AmqpReceiver createReceiver(String address, String selector, boolean noLocal,
boolean presettle) throws Exception {
         checkClosed();
 
         final ClientFuture request = new ClientFuture();
         final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId());
 
         receiver.setNoLocal(noLocal);
+        receiver.setPresettle(presettle);
         if (selector != null && !selector.isEmpty()) {
             receiver.setSelector(selector);
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/db71b43b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index 1226f3e..c68e850 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -29,6 +29,8 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.junit.ActiveMQTestRunner;
+import org.apache.activemq.junit.Repeat;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -46,10 +48,12 @@ import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.message.Message;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 /**
  * Test various behaviors of AMQP receivers with the broker.
  */
+@RunWith(ActiveMQTestRunner.class)
 public class AmqpReceiverTest extends AmqpClientTestSupport {
 
     @Override
@@ -193,6 +197,34 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
     }
 
     @Test(timeout = 60000)
+    @Repeat(repetitions = 1)
+    public void testPresettledReceiverReadsAllMessages() throws Exception {
+        final int MSG_COUNT = 100;
+        sendMessages(getTestName(), MSG_COUNT, false);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), null,
false, true);
+
+        QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertEquals(MSG_COUNT, queueView.getQueueSize());
+        assertEquals(0, queueView.getDispatchCount());
+
+        receiver.flow(MSG_COUNT);
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+        }
+        receiver.close();
+
+        assertEquals(0, queueView.getQueueSize());
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    @Repeat(repetitions = 1)
     public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws
Exception {
         int MSG_COUNT = 4;
         sendMessages(getTestName(), MSG_COUNT, false);

http://git-wip-us.apache.org/repos/asf/activemq/blob/db71b43b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index 4dd2f0c..b9dda61 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertTrue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.junit.ActiveMQTestRunner;
+import org.apache.activemq.junit.Repeat;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -33,12 +35,18 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test basic send and receive scenarios using only AMQP sender and receiver links.
  */
+@RunWith(ActiveMQTestRunner.class)
 public class AmqpSendReceiveTest extends AmqpClientTestSupport {
 
+    protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);
+
     @Test(timeout = 60000)
     public void testCloseBusyReceiver() throws Exception {
         final int MSG_COUNT = 20;
@@ -113,9 +121,11 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
         assertNull(receiver.receive(1, TimeUnit.SECONDS));
 
         receiver.close();
+        connection.close();
     }
 
-    @Test(timeout = 30000)
+    @Test(timeout = 60000)
+    @Repeat(repetitions = 1)
     public void testAdvancedLinkFlowControl() throws Exception {
         final int MSG_COUNT = 20;
 
@@ -137,10 +147,11 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
 
         sender.close();
 
+        LOG.info("Attempting to read first two messages with receiver #1");
         AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
         receiver1.flow(2);
-        AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS);
-        AmqpMessage message2 = receiver1.receive(5, TimeUnit.SECONDS);
+        AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
+        AmqpMessage message2 = receiver1.receive(10, TimeUnit.SECONDS);
         assertNotNull("Should have read message 1", message1);
         assertNotNull("Should have read message 2", message2);
         assertEquals("msg0", message1.getMessageId());
@@ -148,10 +159,11 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
         message1.accept();
         message2.accept();
 
+        LOG.info("Attempting to read next two messages with receiver #2");
         AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName());
         receiver2.flow(2);
-        AmqpMessage message3 = receiver2.receive(5, TimeUnit.SECONDS);
-        AmqpMessage message4 = receiver2.receive(5, TimeUnit.SECONDS);
+        AmqpMessage message3 = receiver2.receive(10, TimeUnit.SECONDS);
+        AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS);
         assertNotNull("Should have read message 3", message3);
         assertNotNull("Should have read message 4", message4);
         assertEquals("msg2", message3.getMessageId());
@@ -159,9 +171,10 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
         message3.accept();
         message4.accept();
 
+        LOG.info("Attempting to read remaining messages with receiver #1");
         receiver1.flow(MSG_COUNT - 4);
         for (int i = 4; i < MSG_COUNT - 4; i++) {
-            AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
+            AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
             assertNotNull("Should have read a message", message);
             assertEquals("msg" + i, message.getMessageId());
             message.accept();
@@ -169,6 +182,80 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
 
         receiver1.close();
         receiver2.close();
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    @Repeat(repetitions = 1)
+    public void testDispatchOrderWithPrefetchOfOne() throws Exception {
+        final int MSG_COUNT = 20;
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        for (int i = 0; i < MSG_COUNT; i++) {
+            AmqpMessage message = new AmqpMessage();
+
+            message.setMessageId("msg" + i);
+            message.setMessageAnnotation("serialNo", i);
+            message.setText("Test-Message");
+
+            sender.send(message);
+        }
+
+        sender.close();
+
+        AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
+        receiver1.flow(1);
+
+        AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName());
+        receiver2.flow(1);
+
+        AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
+        AmqpMessage message2 = receiver2.receive(10, TimeUnit.SECONDS);
+        assertNotNull("Should have read message 1", message1);
+        assertNotNull("Should have read message 2", message2);
+        assertEquals("msg0", message1.getMessageId());
+        assertEquals("msg1", message2.getMessageId());
+        message1.accept();
+        message2.accept();
+
+        receiver1.flow(1);
+        AmqpMessage message3 = receiver1.receive(10, TimeUnit.SECONDS);
+        receiver2.flow(1);
+        AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS);
+        assertNotNull("Should have read message 3", message3);
+        assertNotNull("Should have read message 4", message4);
+        assertEquals("msg2", message3.getMessageId());
+        assertEquals("msg3", message4.getMessageId());
+        message3.accept();
+        message4.accept();
+
+        LOG.info("Attempting to read remaining messages with both receivers");
+        int splitCredit = (MSG_COUNT - 4) / 2;
+
+        receiver1.flow(splitCredit);
+        for (int i = 4; i < splitCredit; i++) {
+            AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
+            assertNotNull("Should have read a message", message);
+            message.accept();
+        }
+
+        receiver2.flow(splitCredit);
+        for (int i = 4; i < splitCredit; i++) {
+            AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
+            assertNotNull("Should have read a message", message);
+            message.accept();
+        }
+
+        receiver1.close();
+        receiver2.close();
+
+        connection.close();
     }
 
     @Test(timeout = 60000)

http://git-wip-us.apache.org/repos/asf/activemq/blob/db71b43b/activemq-amqp/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/resources/log4j.properties b/activemq-amqp/src/test/resources/log4j.properties
index 5208b3f..d25017d 100755
--- a/activemq-amqp/src/test/resources/log4j.properties
+++ b/activemq-amqp/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@
 #
 log4j.rootLogger=WARN, console, file
 log4j.logger.org.apache.activemq=INFO
-log4j.logger.org.apache.activemq.transport.amqp=INFO
+log4j.logger.org.apache.activemq.transport.amqp=DEBUG
 log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO
 log4j.logger.org.fusesource=INFO
 


Mime
View raw message