activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject activemq-artemis git commit: ARTEMIS-1934: fix handling/accounting of sent amqp connection data
Date Sat, 16 Jun 2018 18:10:12 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 838cca49e -> 21ee9fbde


ARTEMIS-1934: fix handling/accounting of sent amqp connection data

updates max frame size tests to verify behaviour seen with standalone
brokers rather than non represenative test-only conditions, as well
as more closely validate the recieved messages

(cherry picked from commit 9a3442f0bd29d77e02eca2f57b4e3adfec5b2f75)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/21ee9fbd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/21ee9fbd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/21ee9fbd

Branch: refs/heads/2.6.x
Commit: 21ee9fbdee8f94abb7cd94a3fca181a77b98c3aa
Parents: 838cca4
Author: Robbie Gemmell <robbie@apache.org>
Authored: Fri Jun 15 20:27:28 2018 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Sat Jun 16 14:10:03 2018 -0400

----------------------------------------------------------------------
 .../org/apache/activemq/artemis/junit/Wait.java |   6 +-
 .../amqp/proton/handler/ProtonHandler.java      |   4 +-
 .../transport/amqp/client/AmqpConnection.java   |   7 +-
 .../integration/amqp/AmqpMaxFrameSizeTest.java  | 236 ++++++++++++++++---
 4 files changed, 221 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21ee9fbd/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java
----------------------------------------------------------------------
diff --git a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java
index 679b112..e37539a 100644
--- a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java
+++ b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java
@@ -65,7 +65,11 @@ public class Wait {
    }
 
    public static void assertEquals(int size, IntCondition condition, long timeout) throws
Exception {
-      boolean result = waitFor(() -> condition.getCount() == size, timeout);
+      assertEquals(size, condition, timeout, SLEEP_MILLIS);
+   }
+
+   public static void assertEquals(int size, IntCondition condition, long timeout, long sleepMillis)
throws Exception {
+      boolean result = waitFor(() -> condition.getCount() == size, timeout, sleepMillis);
 
       if (!result) {
          Assert.fail(size + " != " + condition.getCount());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21ee9fbd/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index fb69208..585f658 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -200,7 +200,8 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener
{
       lock.lock();
       try {
          while (true) {
-            int pending = transport.pending();
+            ByteBuffer head = transport.head();
+            int pending = head.remaining();
 
             if (pending <= 0) {
                break;
@@ -208,7 +209,6 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener
{
 
             // We allocated a Pooled Direct Buffer, that will be sent down the stream
             ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(pending);
-            ByteBuffer head = transport.head();
             buffer.writeBytes(head);
 
             for (EventHandler handler : handlers) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21ee9fbd/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index c77184f..a00f4ae 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -103,6 +103,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection>
implements
    private boolean idleProcessingDisabled;
    private String containerId;
    private boolean authenticated;
+   private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
    private int channelMax = DEFAULT_CHANNEL_MAX;
    private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
    private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
@@ -367,7 +368,11 @@ public class AmqpConnection extends AmqpAbstractResource<Connection>
implements
     * @return the currently set Max Frame Size value.
     */
    public int getMaxFrameSize() {
-      return DEFAULT_MAX_FRAME_SIZE;
+      return maxFrameSize;
+   }
+
+   public void setMaxFrameSize(int maxFrameSize) {
+      this.maxFrameSize = maxFrameSize;
    }
 
    public int getChannelMax() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/21ee9fbd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
index de3ada4..5931ec8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
@@ -20,7 +20,10 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -28,22 +31,40 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.activemq.transport.amqp.client.AmqpValidator;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
 
-   private static final int FRAME_SIZE = 512;
+   protected static final Logger LOG = LoggerFactory.getLogger(AmqpMaxFrameSizeTest.class);
+
+   private boolean maxFrameSizeConfigSet = false;
+   private static final int CONFIGURED_FRAME_SIZE = 4321;
 
    @Override
    protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
-      params.put("maxFrameSize", FRAME_SIZE);
+      if ("testBrokerAdvertisedConfiguredMaxFrameSize".equals(getTestName())) {
+         maxFrameSizeConfigSet = true;
+         params.put("maxFrameSize", CONFIGURED_FRAME_SIZE);
+      }
+   }
+
+   @Override
+   protected void addConfiguration(ActiveMQServer server) {
+      // Make the journal file size larger than the frame+message sizes used in the tests,
+      // since it is by default for external brokers and it changes the behaviour.
+      server.getConfiguration().setJournalFileSize(2 * 1024 * 1024);
    }
 
    @Test(timeout = 60000)
-   public void testBrokerHonorsSetMaxFrameSize() throws Exception {
+   public void testBrokerAdvertisedDefaultMaxFrameSize() throws Exception {
+      assertFalse("maxFrameSize should not be explicitly configured", maxFrameSizeConfigSet);
+
       AmqpClient client = createAmqpClient();
       assertNotNull(client);
 
@@ -52,7 +73,7 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
          @Override
          public void inspectOpenedResource(Connection connection) {
             int brokerMaxFrameSize = connection.getTransport().getRemoteMaxFrameSize();
-            if (brokerMaxFrameSize != FRAME_SIZE) {
+            if (brokerMaxFrameSize != AmqpSupport.MAX_FRAME_SIZE_DEFAULT) {
                markAsInvalid("Broker did not send the expected max Frame Size");
             }
          }
@@ -68,56 +89,215 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
    }
 
    @Test(timeout = 60000)
-   public void testMultipleTransfers() throws Exception {
+   public void testBrokerAdvertisedConfiguredMaxFrameSize() throws Exception {
+      assertTrue("maxFrameSize should be explicitly configured", maxFrameSizeConfigSet);
+
+      AmqpClient client = createAmqpClient();
+      assertNotNull(client);
+
+      client.setValidator(new AmqpValidator() {
+
+         @Override
+         public void inspectOpenedResource(Connection connection) {
+            int brokerMaxFrameSize = connection.getTransport().getRemoteMaxFrameSize();
+            if (brokerMaxFrameSize != CONFIGURED_FRAME_SIZE) {
+               markAsInvalid("Broker did not send the expected max Frame Size");
+            }
+         }
+      });
+
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         assertNotNull(connection);
+         connection.getStateInspector().assertValid();
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testManyMultiFrameTransfersWithClientMaxFrameSizeSmallerThanBrokers() throws
Exception {
+      final int clientMaxFrameSize = 1024;
+      final int brokerMaxFrameSize = AmqpSupport.MAX_FRAME_SIZE_DEFAULT;
+      final int messageSize = 2 * AmqpSupport.MAX_FRAME_SIZE_DEFAULT + 5;
+
+      assertTrue("Client maxFrameSize should be smaller than brokers", clientMaxFrameSize
< brokerMaxFrameSize);
+
+      doManyMultiFrameTransfersTestImpl(clientMaxFrameSize, messageSize, brokerMaxFrameSize);
+   }
+
+   @Test(timeout = 60000)
+   public void testManyMultiFrameTransfersWithClientMaxFrameSizeLargerThanBrokers() throws
Exception {
+      final int clientMaxFrameSize = 2 * AmqpSupport.MAX_FRAME_SIZE_DEFAULT;
+      final int brokerMaxFrameSize = AmqpSupport.MAX_FRAME_SIZE_DEFAULT;
+      final int messageSize = 2 * AmqpSupport.MAX_FRAME_SIZE_DEFAULT + 5;
+
+      assertTrue("Client maxFrameSize should be larger than brokers", clientMaxFrameSize
> brokerMaxFrameSize);
+
+      doManyMultiFrameTransfersTestImpl(clientMaxFrameSize, messageSize, brokerMaxFrameSize);
+   }
+
+   private void doManyMultiFrameTransfersTestImpl(int maxFrameSize, int payloadSize, int
brokerMaxFrameSize) throws Exception {
       server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
 
-      String testQueueName = "ConnectionFrameSize";
-      int nMsgs = 200;
+      int numMsgs = 200;
+      String testQueueName = getTestName();
 
       AmqpClient client = createAmqpClient();
-      AmqpConnection connection = addConnection(client.connect());
 
-      try {
-         connection.connect();
+      client.setValidator(new AmqpValidator() {
+         @Override
+         public void inspectOpenedResource(Connection connection) {
+            int brokerMaxFrameSize = connection.getTransport().getRemoteMaxFrameSize();
+            if (brokerMaxFrameSize != AmqpSupport.MAX_FRAME_SIZE_DEFAULT) {
+               markAsInvalid("Broker did not send the expected max Frame Size");
+            }
+         }
+      });
+
+      AmqpConnection connection = client.createConnection();
+      connection.setMaxFrameSize(maxFrameSize);
 
+      connection.connect();
+      addConnection(connection);
+
+      try {
          AmqpSession session = connection.createSession();
          AmqpSender sender = session.createSender(testQueueName);
 
-         final int payload = FRAME_SIZE * 16;
-
-         for (int i = 0; i < nMsgs; ++i) {
-            AmqpMessage message = createAmqpMessage((byte) 'A', payload);
+         for (int i = 0; i < numMsgs; ++i) {
+            AmqpMessage message = createAmqpMessage(payloadSize);
             sender.send(message);
          }
 
-         int count = getMessageCount(server.getPostOffice(), testQueueName);
-         assertEquals(nMsgs, count);
+         Wait.assertEquals(numMsgs, () -> getMessageCount(server.getPostOffice(), testQueueName),
5000, 10);
 
          AmqpReceiver receiver = session.createReceiver(testQueueName);
-         receiver.flow(nMsgs);
-
-         for (int i = 0; i < nMsgs; ++i) {
-            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-            assertNotNull("failed at " + i, message);
-            MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
-            Data data = (Data) wrapped.getBody();
-            System.out.println("received : message: " + data.getValue().getLength());
-            assertEquals(payload, data.getValue().getLength());
-            message.accept();
+         receiver.flow(numMsgs);
+
+         for (int i = 1; i <= numMsgs; ++i) {
+            AmqpMessage receivedMessage = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull("Did not recieve message " + i, receivedMessage);
+
+            verifyMessage(receivedMessage, payloadSize);
+
+            LOG.trace("received : message " + i);
+            receivedMessage.accept();
+         }
+
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test(timeout = 60000)
+   public void testSingleAndMultiFrameTransferClientMaxFrameSizeSmallerThanBrokers() throws
Exception {
+      final int clientMaxFrameSize = 1024;
+      final int brokerMaxFrameSize = AmqpSupport.MAX_FRAME_SIZE_DEFAULT;
+
+      assertTrue("Client maxFrameSize should be smaller than brokers", clientMaxFrameSize
< brokerMaxFrameSize);
+
+      doSingleAndMultiFrameTransferTestImpl(clientMaxFrameSize, brokerMaxFrameSize);
+   }
+
+   @Test(timeout = 60000)
+   public void testSingleAndMultiFrameTransferWithClientMaxFrameSizeLargerThanBrokers() throws
Exception {
+      final int clientMaxFrameSize = 2 * AmqpSupport.MAX_FRAME_SIZE_DEFAULT;
+      final int brokerMaxFrameSize = AmqpSupport.MAX_FRAME_SIZE_DEFAULT;
+
+      assertTrue("Client maxFrameSize should be larger than brokers", clientMaxFrameSize
> brokerMaxFrameSize);
+
+      doSingleAndMultiFrameTransferTestImpl(clientMaxFrameSize, brokerMaxFrameSize);
+   }
+
+   private void doSingleAndMultiFrameTransferTestImpl(int maxFrameSize, int brokerMaxFrameSize)
throws Exception {
+      final int messageSize1 = 128;
+      final int messageSize2 = 2 * AmqpSupport.MAX_FRAME_SIZE_DEFAULT + 5;
+
+      assertTrue("messageSize1 should be much smaller than both of the maxFrameSizes",
+                 messageSize1 < maxFrameSize / 2 && messageSize1 < brokerMaxFrameSize
/ 2);
+      assertTrue("messageSize2 should be larger than one of the maxFrameSizes",
+                 messageSize2 > maxFrameSize || messageSize2 > brokerMaxFrameSize);
+
+      server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
+
+      String testQueueName = getTestName();
+
+      AmqpClient client = createAmqpClient();
+
+      client.setValidator(new AmqpValidator() {
+         @Override
+         public void inspectOpenedResource(Connection connection) {
+            int brokerMaxFrameSize = connection.getTransport().getRemoteMaxFrameSize();
+            if (brokerMaxFrameSize != AmqpSupport.MAX_FRAME_SIZE_DEFAULT) {
+               markAsInvalid("Broker did not send the expected max Frame Size");
+            }
          }
+      });
+
+      AmqpConnection connection = client.createConnection();
+      connection.setMaxFrameSize(maxFrameSize);
+
+      connection.connect();
+      addConnection(connection);
+
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(testQueueName);
+
+         AmqpMessage message1 = createAmqpMessage(messageSize1);
+         AmqpMessage message2 = createAmqpMessage(messageSize2);
+         sender.send(message1);
+         sender.send(message2);
+
+         Wait.assertEquals(2, () -> getMessageCount(server.getPostOffice(), testQueueName),
5000, 10);
+
+         AmqpReceiver receiver = session.createReceiver(testQueueName);
+         receiver.flow(2);
+
+         AmqpMessage receivedMessage1 = receiver.receive(5, TimeUnit.SECONDS);
+         assertNotNull("Did not recieve message 1", receivedMessage1);
+         verifyMessage(receivedMessage1, messageSize1);
+         receivedMessage1.accept();
+
+         AmqpMessage receivedMessage2 = receiver.receive(5, TimeUnit.SECONDS);
+         assertNotNull("Did not recieve message 2", receivedMessage2);
+         verifyMessage(receivedMessage2, messageSize2);
 
+         receivedMessage2.accept();
       } finally {
          connection.close();
       }
    }
 
-   private AmqpMessage createAmqpMessage(byte value, int payloadSize) {
+   private AmqpMessage createAmqpMessage(final int payloadSize) {
       AmqpMessage message = new AmqpMessage();
       byte[] payload = new byte[payloadSize];
       for (int i = 0; i < payload.length; i++) {
-         payload[i] = value;
+         // An odd number of digit characters
+         int offset = i % 7;
+         payload[i] = (byte) (48 + offset);
       }
       message.setBytes(payload);
       return message;
    }
+
+   private void verifyMessage(final AmqpMessage message, final int payloadSize) {
+      MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
+
+      assertNotNull("Message has no body", wrapped.getBody());
+      assertTrue("Unexpected body type: " + wrapped.getBody().getClass(), wrapped.getBody()
instanceof Data);
+
+      Data data = (Data) wrapped.getBody();
+      Binary binary = data.getValue();
+      assertNotNull("Data section has no content", binary);
+      assertEquals("Unexpected payload length", payloadSize, binary.getLength());
+
+      byte[] binaryContent = binary.getArray();
+      int offset = binary.getArrayOffset();
+      for (int i = 0; i < payloadSize; i++) {
+         byte expected = (byte) (48 + (i % 7));
+         assertEquals("Unexpected content at payload index " + i, expected, binaryContent[i
+ offset]);
+      }
+   }
 }


Mime
View raw message