activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-669 Do binding query on sender link attach
Date Tue, 09 Aug 2016 07:22:17 GMT
ARTEMIS-669 Do binding query on sender link attach

QueueQuery was previously used instead of checking for bindings on a
particular address name.  This meant sending and receiving only worked
for those queues that happened to have the same queueName to address.
This patch replaces this with binding check.

There's also some minor ProtonTest fixes included.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0af13e0d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0af13e0d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0af13e0d

Branch: refs/heads/master
Commit: 0af13e0d03ace20bf913fb4f5d6956b6437db718
Parents: 1af6f5c
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Mon Aug 8 10:31:33 2016 +0100
Committer: Andy Taylor <andy.tayls67@gmail.com>
Committed: Tue Aug 9 08:21:41 2016 +0100

----------------------------------------------------------------------
 .../plug/ProtonSessionIntegrationCallback.java  | 23 ++++++++++
 .../org/proton/plug/AMQPSessionCallback.java    |  2 +
 .../server/ProtonServerReceiverContext.java     |  2 +-
 .../test/minimalserver/MinimalSessionSPI.java   |  5 +++
 .../tests/integration/proton/ProtonTest.java    | 46 ++++++++++++++------
 5 files changed, 63 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0af13e0d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index a00af71..107df8a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
 import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -224,6 +225,28 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback,
Se
    }
 
    @Override
+   public boolean bindingQuery(String address) throws Exception {
+      boolean queryResult = false;
+
+      BindingQueryResult queueQuery = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
+
+      if (queueQuery.isExists()) {
+         queryResult = true;
+      }
+      else {
+         if (queueQuery.isAutoCreateJmsQueues()) {
+            serverSession.createQueue(new SimpleString(address), new SimpleString(address),
null, false, true);
+            queryResult = true;
+         }
+         else {
+            queryResult = false;
+         }
+      }
+
+      return queryResult;
+   }
+
+   @Override
    public void closeSender(final Object brokerConsumer) throws Exception {
       Runnable runnable = new Runnable() {
          @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0af13e0d/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
index 637b538..4599318 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
@@ -50,6 +50,8 @@ public interface AMQPSessionCallback {
 
    boolean queueQuery(String queueName) throws Exception;
 
+   boolean bindingQuery(String address) throws Exception;
+
    void closeSender(Object brokerConsumer) throws Exception;
 
    // This one can be a lot improved

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0af13e0d/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
index fba869c..0bbe8ca 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
@@ -88,7 +88,7 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext
{
             }
 
             try {
-               if (!sessionSPI.queueQuery(address)) {
+               if (!sessionSPI.bindingQuery(address)) {
                   throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
                }
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0af13e0d/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
index b917aa6..7397612 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
@@ -105,6 +105,11 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
    }
 
    @Override
+   public boolean bindingQuery(String address) throws Exception {
+      return true;
+   }
+
+   @Override
    public void closeSender(Object brokerConsumer) {
       ((Consumer) brokerConsumer).close();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0af13e0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index 785543d..cd37bda 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -102,6 +102,8 @@ public class ProtonTest extends ActiveMQTestBase {
 
    private static final long maxSizeBytesRejectThreshold = 2 * 1024 * 1024;
 
+   private int messagesSent = 0;
+
    // this will ensure that all tests in this class are run twice,
    // once with "true" passed to the class' constructor and once with "false"
    @Parameterized.Parameters(name = "{0}")
@@ -401,7 +403,7 @@ public class ProtonTest extends ActiveMQTestBase {
 
          // Use blocking send to ensure buffered messages do not interfere with credit.
          sender.setSendTimeout(-1);
-         sendUntilFull(sender, destinationAddress);
+         sendUntilFull(sender);
 
          // This should be -1. A single message is buffered in the client, and 0 credit has
been allocated.
          assertTrue(sender.getSender().getCredit() == -1);
@@ -421,7 +423,7 @@ public class ProtonTest extends ActiveMQTestBase {
       setAddressFullBlockPolicy();
 
       String destinationAddress = address + 1;
-      int messagesSent = fillAddress(destinationAddress);
+      fillAddress(destinationAddress);
 
       AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
       AmqpConnection amqpConnection = amqpConnection = client.connect();
@@ -446,8 +448,7 @@ public class ProtonTest extends ActiveMQTestBase {
          // Wait for address to unblock and flow frame to arrive
          Thread.sleep(500);
 
-         assertTrue(sender.getSender().getCredit() == 0);
-         assertNotNull(receiver.receive());
+         assertTrue(sender.getSender().getCredit() >= 0);
       }
       finally {
          amqpConnection.close();
@@ -517,15 +518,14 @@ public class ProtonTest extends ActiveMQTestBase {
     * @return
     * @throws Exception
     */
-   private int fillAddress(String address) throws Exception {
+   private void fillAddress(String address) throws Exception {
       AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
       AmqpConnection amqpConnection = client.connect();
-      int messagesSent = 0;
       Exception exception = null;
       try {
          AmqpSession session = amqpConnection.createSession();
          AmqpSender sender = session.createSender(address);
-         messagesSent = sendUntilFull(sender, null);
+         sendUntilFull(sender);
       }
       catch (Exception e) {
          exception = e;
@@ -537,11 +537,9 @@ public class ProtonTest extends ActiveMQTestBase {
       // Should receive a rejected error
       assertNotNull(exception);
       assertTrue(exception.getMessage().contains("amqp:resource-limit-exceeded"));
-
-      return messagesSent;
    }
 
-   private int sendUntilFull(final AmqpSender sender, String expectedErrorMessage) throws
Exception {
+   private void sendUntilFull(final AmqpSender sender) throws Exception {
       final AmqpMessage message = new AmqpMessage();
       byte[] payload = new byte[50 * 1024];
       message.setBytes(payload);
@@ -572,11 +570,10 @@ public class ProtonTest extends ActiveMQTestBase {
 
       timeout.await(5, TimeUnit.SECONDS);
 
-      System.out.println("Messages Sent: " + sentMessages);
+      messagesSent = sentMessages.get();
       if (errors[0] != null) {
          throw errors[0];
       }
-      return sentMessages.get();
    }
 
    @Test
@@ -599,11 +596,32 @@ public class ProtonTest extends ActiveMQTestBase {
    }
 
    @Test
-   public void testReplyTo() throws Throwable {
+   public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception
{
+
+      String queueName = "TestQueueName";
+      String address = "TestAddress";
 
+      server.createQueue(new SimpleString(address), new SimpleString(queueName), null, true,
false);
+
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = client.connect();
+      AmqpSession session = amqpConnection.createSession();
+      AmqpSender sender = session.createSender(address);
+      AmqpReceiver receiver = session.createReceiver(queueName);
+      receiver.flow(1);
+
+      AmqpMessage message = new AmqpMessage();
+      message.setText("TestPayload");
+      sender.send(message);
+
+      AmqpMessage receivedMessage = receiver.receive();
+      assertNotNull(receivedMessage);
+   }
+
+   @Test
+   public void testReplyTo() throws Throwable {
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       TemporaryQueue queue = session.createTemporaryQueue();
-      System.out.println("queue:" + queue.getQueueName());
       MessageProducer p = session.createProducer(queue);
 
       TextMessage message = session.createTextMessage();


Mime
View raw message