activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [1/2] activemq-artemis git commit: Add default AMQP flow behaviour and fix proton test
Date Tue, 26 Jul 2016 11:30:26 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 110158bb8 -> 6cb681555


Add default AMQP flow behaviour and fix proton test


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b549bb24
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b549bb24
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b549bb24

Branch: refs/heads/master
Commit: b549bb243c2b4536f2ca9d84e777cca9bff019b9
Parents: 110158b
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Tue Jul 26 11:11:33 2016 +0100
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Tue Jul 26 11:14:25 2016 +0100

----------------------------------------------------------------------
 .../plug/ProtonSessionIntegrationCallback.java  |   1 +
 .../context/AbstractProtonReceiverContext.java  |  11 +-
 .../tests/integration/proton/ProtonTest.java    | 115 +++++++++++++------
 3 files changed, 92 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b549bb24/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index ab57fe1..b2d029f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -405,6 +405,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback,
Se
             public void run() {
                if (receiver.getRemoteCredit() < threshold) {
                   receiver.flow(credits);
+                  connection.flush();
                }
             }
          });

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b549bb24/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
index 5a43029..c210950 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
@@ -58,10 +58,17 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
    }
 
    public void flow(int credits, int threshold) {
-      synchronized (connection.getLock()) {
+      // Use the SessionSPI to allocate producer credits, or default, always allocate credit.
+      if (sessionSPI != null) {
          sessionSPI.offerProducerCredit(address, credits, threshold, receiver);
       }
-      connection.flush();
+      else {
+         synchronized (connection.getLock()) {
+            receiver.flow(credits);
+            connection.flush();
+         }
+      }
+
    }
 
    public void drain(int credits) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b549bb24/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index 8874271..2c68dde 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -45,7 +45,9 @@ import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -136,9 +138,9 @@ public class ProtonTest extends ActiveMQTestBase {
 
       server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
 
+      // Default Page
       AddressSettings addressSettings = new AddressSettings();
-      addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
-      addressSettings.setMaxSizeBytes(1 * 1024 * 1024);
+      addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
       server.getConfiguration().getAddressesSettings().put("#", addressSettings);
 
       server.start();
@@ -230,20 +232,25 @@ public class ProtonTest extends ActiveMQTestBase {
    @Test
    public void testResourceLimitExceptionOnAddressFull() throws Exception {
       if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP
protocol
+      setAddressFullBlockPolicy();
+
       fillAddress(address + 1);
    }
 
    @Test
    public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
       if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP
protocol
+      setAddressFullBlockPolicy();
+
       String destinationAddress = address + 1;
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Destination d = session.createQueue(destinationAddress);
+      MessageProducer p = session.createProducer(d);
+
       fillAddress(destinationAddress);
 
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       Exception e = null;
       try {
-         Destination d = session.createQueue(destinationAddress);
-         MessageProducer p = session.createProducer(d);
          p.send(session.createBytesMessage());
       }
       catch (ResourceAllocationException rae) {
@@ -256,6 +263,7 @@ public class ProtonTest extends ActiveMQTestBase {
    @Test
    public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception {
       if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP
protocol
+      setAddressFullBlockPolicy();
 
       // Only allow 1 credit to be submitted at a time.
       Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation");
@@ -269,9 +277,13 @@ public class ProtonTest extends ActiveMQTestBase {
       try {
          AmqpSession session = amqpConnection.createSession();
          AmqpSender sender = session.createSender(destinationAddress);
-         sender.setSendTimeout(1000);
-         sendUntilFull(sender);
-         assertTrue(sender.getSender().getCredit() <= 0);
+
+         // Use blocking send to ensure buffered messages do not interfere with credit.
+         sender.setSendTimeout(-1);
+         sendUntilFull(sender, destinationAddress);
+
+         // This should be -1. A single message is buffered in the client, and 0 credit has
been allocated.
+         assertTrue(sender.getSender().getCredit() == -1);
       }
       finally {
          amqpConnection.close();
@@ -282,13 +294,14 @@ public class ProtonTest extends ActiveMQTestBase {
    @Test
    public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception {
       if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP
protocol
+      setAddressFullBlockPolicy();
 
       String destinationAddress = address + 1;
       int messagesSent = fillAddress(destinationAddress);
 
-      AmqpConnection amqpConnection = null;
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = amqpConnection = client.connect();
       try {
-         amqpConnection = AmqpClient.connect(new URI(tcpAmqpConnectionUri));
          AmqpSession session = amqpConnection.createSession();
          AmqpSender sender = session.createSender(destinationAddress);
 
@@ -308,7 +321,8 @@ public class ProtonTest extends ActiveMQTestBase {
 
          // Wait for address to unblock and flow frame to arrive
          Thread.sleep(500);
-         assertTrue(sender.getSender().getCredit() > 0);
+
+         assertTrue(sender.getSender().getCredit() == 0);
          assertNotNull(receiver.receive());
       }
       finally {
@@ -319,11 +333,12 @@ public class ProtonTest extends ActiveMQTestBase {
    @Test
    public void testNewLinkAttachAreNotAllocatedCreditsWhenAddressIsBlocked() throws Exception
{
       if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP
protocol
+      setAddressFullBlockPolicy();
 
       fillAddress(address + 1);
-      AmqpConnection amqpConnection = null;
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = amqpConnection = client.connect();
       try {
-         amqpConnection = AmqpClient.connect(new URI(tcpAmqpConnectionUri));
          AmqpSession session = amqpConnection.createSession();
          AmqpSender sender = session.createSender(address + 1);
          // Wait for a potential flow frame.
@@ -344,38 +359,62 @@ public class ProtonTest extends ActiveMQTestBase {
    private int fillAddress(String address) throws Exception {
       AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
       AmqpConnection amqpConnection = client.connect();
+      int messagesSent = 0;
+      Exception exception = null;
       try {
          AmqpSession session = amqpConnection.createSession();
          AmqpSender sender = session.createSender(address);
-         return sendUntilFull(sender);
+         messagesSent = sendUntilFull(sender, null);
+      }
+      catch (Exception e) {
+         exception = e;
       }
       finally {
          amqpConnection.close();
       }
+
+      // Should receive a rejected error
+      assertNotNull(exception);
+      assertTrue(exception.getMessage().contains("amqp:resource-limit-exceeded"));
+
+      return messagesSent;
    }
 
-   private int sendUntilFull(AmqpSender sender) throws IOException {
-      AmqpMessage message = new AmqpMessage();
+   private int sendUntilFull(final AmqpSender sender, String expectedErrorMessage) throws
Exception {
+      final AmqpMessage message = new AmqpMessage();
       byte[] payload = new byte[50 * 1024];
+      message.setBytes(payload);
 
-      int sentMessages = 0;
-      int maxMessages = 50;
+      final int maxMessages = 50;
+      final AtomicInteger sentMessages = new AtomicInteger(0);
+      final Exception[] errors = new Exception[1];
+      final CountDownLatch timeout = new CountDownLatch(1);
 
-      Exception e = null;
-      try {
-         for (int i = 0; i < maxMessages; i++) {
-            message.setBytes(payload);
-            sender.send(message);
-            sentMessages++;
+      Runnable sendMessages = new Runnable() {
+         @Override
+         public void run() {
+            try {
+               for (int i = 0; i < maxMessages; i++) {
+                  sender.send(message);
+                  sentMessages.getAndIncrement();
+               }
+               timeout.countDown();
+            }
+            catch (IOException e) {
+               errors[0] = e;
+            }
          }
-      }
-      catch (IOException ioe) {
-         e = ioe;
-      }
+      };
+
+      Thread t = new Thread(sendMessages);
+      t.start();
+
+      timeout.await(5, TimeUnit.SECONDS);
 
-      assertNotNull(e);
-      assertTrue(e.getMessage().contains("amqp:resource-limit-exceeded"));
-      return sentMessages;
+      if (errors[0] != null) {
+         throw errors[0];
+      }
+      return sentMessages.get();
    }
 
    @Test
@@ -398,7 +437,6 @@ public class ProtonTest extends ActiveMQTestBase {
       Destination jmsReplyTo = message.getJMSReplyTo();
       Assert.assertNotNull(jmsReplyTo);
       Assert.assertNotNull(message);
-
    }
 
    @Test
@@ -729,10 +767,13 @@ public class ProtonTest extends ActiveMQTestBase {
 
       consumer.close();
       connection.close();
+
+      // Wait for Acks to be processed and message removed from queue.
+      Thread.sleep(500);
+
       Assert.assertEquals(0, getMessageCount(q));
       long taken = (System.currentTimeMillis() - time) / 1000;
       System.out.println("taken = " + taken);
-
    }
 
    @Test
@@ -1140,6 +1181,14 @@ public class ProtonTest extends ActiveMQTestBase {
       return connection;
    }
 
+   private void setAddressFullBlockPolicy() {
+      // For BLOCK tests
+      AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
+      addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+      addressSettings.setMaxSizeBytes(1 * 1024 * 1024);
+      server.getAddressSettingsRepository().addMatch("#", addressSettings);
+   }
+
    public static class AnythingSerializable implements Serializable {
 
       private int count;


Mime
View raw message