qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject qpid-broker-j git commit: QPID-8038: [Broker-J] [AMQP 0-8..0-91] Add more queue protocol tests
Date Wed, 06 Dec 2017 22:43:17 GMT
Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 85c9caae6 -> 93c1e0b8d


QPID-8038: [Broker-J] [AMQP 0-8..0-91] Add more queue protocol tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/93c1e0b8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/93c1e0b8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/93c1e0b8

Branch: refs/heads/master
Commit: 93c1e0b8d04783c028d25e59fe69088cd039a263
Parents: 85c9caa
Author: Keith Wall <keith.wall@gmail.com>
Authored: Wed Dec 6 22:42:37 2017 +0000
Committer: Keith Wall <keith.wall@gmail.com>
Committed: Wed Dec 6 22:42:37 2017 +0000

----------------------------------------------------------------------
 .../protocol/v0_8/ExchangeInteraction.java      |  12 +
 .../tests/protocol/v0_8/QueueInteraction.java   |   7 +
 .../qpid/tests/protocol/v0_8/QueueTest.java     | 343 ++++++++++++++++++-
 3 files changed, 361 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/93c1e0b8/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
index f62a4b3..fedd481 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
@@ -109,6 +109,18 @@ public class ExchangeInteraction
         return this;
     }
 
+    public ExchangeInteraction boundQueue(final String name)
+    {
+        _boundQueue = name;
+        return this;
+    }
+
+    public ExchangeInteraction boundRoutingKey(final String routingKey)
+    {
+        _boundRoutingKey = routingKey;
+        return this;
+    }
+
     public Interaction bound() throws Exception
     {
         return _interaction.sendPerformative(new ExchangeBoundBody(AMQShortString.valueOf(_boundExchange),

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/93c1e0b8/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
index 49ec8bf..cbf3a1a 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.tests.protocol.v0_8;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -95,6 +96,12 @@ public class QueueInteraction
         return this;
     }
 
+    public QueueInteraction declareArguments(final Map<String,Object> args)
+    {
+        _declareArguments = args == null ? Collections.emptyMap() : new HashMap<>(args);
+        return this;
+    }
+
     public Interaction declare() throws Exception
     {
         return _interaction.sendPerformative(new QueueDeclareBody(0,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/93c1e0b8/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
index fc65e02..ac7e72c 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
@@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.isEmptyString;
 import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
+import java.util.Collections;
 
 import org.hamcrest.Matchers;
 import org.junit.Before;
@@ -42,9 +43,14 @@ import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeBoundOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeleteOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueBindOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.QueueDeleteOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.QueuePurgeOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueUnbindOkBody;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -222,7 +228,7 @@ public class QueueTest extends BrokerAdminUsingTestBase
     @Test
     @SpecificationTest(section = "1.7.2.1",
             description = "The server MUST ignore the auto-delete field if the queue already
exists.")
-    @Ignore("The server does not implement this rule.")
+    @Ignore("The server does not ignore the auto-delete field if the queue already exists.")
     public void queueDeclareAutoDeletePreexistingQueue() throws Exception
     {
         try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -409,4 +415,339 @@ public class QueueTest extends BrokerAdminUsingTestBase
         }
     }
 
+    @Test
+    @SpecificationTest(section = "1.7.2.3", description = "bind queue to an exchange")
+    public void queueBind() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            String testExchange = "testExchange";
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .exchange().declareName(testExchange).declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class)
+                       .queue().bindName(testExchange).bindQueueName(BrokerAdmin.TEST_QUEUE_NAME).bindRoutingKey("rk1").bind()
+                       .consumeResponse(QueueBindOkBody.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.7.2.3", description = "A server MUST allow ignore duplicate
bindings")
+    public void queueBindIgnoreDuplicates() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            String testExchange = "testExchange";
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .exchange().declareName(testExchange).declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class)
+                       .queue().bindName(testExchange).bindQueueName(BrokerAdmin.TEST_QUEUE_NAME).bindRoutingKey("rk1").bind()
+                       .consumeResponse(QueueBindOkBody.class)
+                       .queue().bindName(testExchange).bindQueueName(BrokerAdmin.TEST_QUEUE_NAME).bindRoutingKey("rk1").bind()
+                       .consumeResponse(QueueBindOkBody.class);
+
+            ExchangeBoundOkBody response = interaction.exchange()
+                                                      .boundExchangeName(testExchange)
+                                                      .bound()
+                                                      .consumeResponse()
+                                                      .getLatestResponse(ExchangeBoundOkBody.class);
+            assertThat(response.getReplyCode(), is(equalTo(ExchangeBoundOkBody.OK)));
+
+            interaction.queue()
+                       .unbindName(testExchange)
+                       .unbindQueueName(BrokerAdmin.TEST_QUEUE_NAME)
+                       .unbindRoutingKey("rk1")
+                       .unbind()
+                       .consumeResponse(QueueUnbindOkBody.class);
+
+            response = interaction.exchange()
+                                  .boundExchangeName(testExchange)
+                                  .bound()
+                                  .consumeResponse()
+                                  .getLatestResponse(ExchangeBoundOkBody.class);
+            assertThat(response.getReplyCode(), is(equalTo(ExchangeBoundOkBody.NO_BINDINGS)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.7.2.3",
+            description = "The client MUST NOT attempt to bind a queue that does not exist.")
+    public void queueBindUnknownQueue() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            String testExchange = "testExchange";
+            final Interaction interaction = transport.newInteraction();
+            ChannelCloseBody response = interaction.openAnonymousConnection()
+                                              .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                              .exchange().declareName(testExchange).declare()
+                                              .consumeResponse(ExchangeDeclareOkBody.class)
+                                              .queue().bindName(testExchange).bindQueueName(BrokerAdmin.TEST_QUEUE_NAME)
+                                              .bind()
+                                              .consumeResponse().getLatestResponse(ChannelCloseBody.class);
+            assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.7.2.3",
+            description = "Bindings of durable queues to durable exchanges are automatically
durable and the server "
+                          + "MUST restore such bindings after a server restart.")
+    public void queueDurableBind() throws Exception
+    {
+        String testExchange = "testExchange";
+        String testRoutingKey = "rk1";
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .queue().declareName(BrokerAdmin.TEST_QUEUE_NAME).declareDurable(true).declare()
+                       .consumeResponse(QueueDeclareOkBody.class)
+                       .exchange().declareName(testExchange).declareDurable(true).declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class)
+                       .queue().bindName(testExchange).bindQueueName(BrokerAdmin.TEST_QUEUE_NAME).bindRoutingKey(testRoutingKey)
+                       .bind()
+                       .consumeResponse(QueueBindOkBody.class);
+
+            ExchangeBoundOkBody response = interaction.exchange()
+                                                      .boundExchangeName(testExchange)
+                                                      .boundQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                                                      .boundRoutingKey(testRoutingKey)
+                                                      .bound()
+                                                      .consumeResponse()
+                                                      .getLatestResponse(ExchangeBoundOkBody.class);
+            assertThat(response.getReplyCode(), is(equalTo(ExchangeBoundOkBody.OK)));
+        }
+
+        assumeThat(getBrokerAdmin().supportsRestart(), Matchers.is(true));
+        getBrokerAdmin().restart();
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ExchangeBoundOkBody response = interaction.openAnonymousConnection()
+                                                      .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                      .exchange()
+                                                      .boundExchangeName(testExchange)
+                                                      .boundQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                                                      .boundRoutingKey(testRoutingKey)
+                                                      .bound()
+                                                      .consumeResponse()
+                                                      .getLatestResponse(ExchangeBoundOkBody.class);
+            assertThat(response.getReplyCode(), is(equalTo(ExchangeBoundOkBody.OK)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.7.2.3",
+            description = "The server MUST allow a durable queue to bind to a transient exchange.")
+    public void queueBindDurableQueueToTransientExchange() throws Exception
+    {
+        String testExchange = "testExchange";
+        String testRoutingKey = "rk1";
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .queue().declareName(BrokerAdmin.TEST_QUEUE_NAME).declareDurable(true).declare()
+                       .consumeResponse(QueueDeclareOkBody.class)
+                       .exchange().declareName(testExchange).declareDurable(false).declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class)
+                       .queue().bindName(testExchange).bindQueueName(BrokerAdmin.TEST_QUEUE_NAME).bindRoutingKey(testRoutingKey)
+                       .bind()
+                       .consumeResponse(QueueBindOkBody.class);
+
+            ExchangeBoundOkBody response = interaction.exchange()
+                                                      .boundExchangeName(testExchange)
+                                                      .boundQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                                                      .boundRoutingKey(testRoutingKey)
+                                                      .bound()
+                                                      .consumeResponse()
+                                                      .getLatestResponse(ExchangeBoundOkBody.class);
+            assertThat(response.getReplyCode(), is(equalTo(ExchangeBoundOkBody.OK)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.7.2.5", description = "unbind a queue from an exchange")
+    public void queueUnbind() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            String testExchange = "testExchange";
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .exchange().declareName(testExchange).declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class)
+                       .queue().bindName(testExchange).bindQueueName(BrokerAdmin.TEST_QUEUE_NAME).bindRoutingKey("rk1").bind()
+                       .consumeResponse(QueueBindOkBody.class)
+                       .queue().unbindName(testExchange).unbindQueueName(BrokerAdmin.TEST_QUEUE_NAME).unbindRoutingKey("rk1").unbind()
+                       .consumeResponse(QueueUnbindOkBody.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.7.2.5", description = "The client MUST NOT attempt to
unbind a queue that does "
+                                                          + "not exist.")
+    public void queueUnbindUnknownQueue() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            String testExchange = "testExchange";
+            ChannelCloseBody response = interaction.openAnonymousConnection()
+                                                   .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                   .exchange().declareName(testExchange).declare()
+                                                   .consumeResponse(ExchangeDeclareOkBody.class)
+                                                   .queue()
+                                                   .bindName(testExchange)
+                                                   .bindQueueName(BrokerAdmin.TEST_QUEUE_NAME)
+                                                   .bindRoutingKey("rk1")
+                                                   .bind()
+                                                   .consumeResponse(QueueBindOkBody.class)
+                                                   .queue()
+                                                   .unbindName(testExchange)
+                                                   .unbindQueueName("unknownQueue")
+                                                   .unbindRoutingKey("rk1")
+                                                   .unbind()
+                                                   .consumeResponse()
+                                                   .getLatestResponse(ChannelCloseBody.class);
+            assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.7.2.5", description = "The client MUST NOT attempt to
unbind a queue from an "
+                                                          + "exchange that does not exist.")
+    public void queueUnbindUnknownExchange() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            String testExchange = "testExchange";
+            ChannelCloseBody response = interaction.openAnonymousConnection()
+                                                   .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                   .exchange().declareName(testExchange).declare()
+                                                   .consumeResponse(ExchangeDeclareOkBody.class)
+                                                   .queue()
+                                                   .bindName(testExchange)
+                                                   .bindQueueName(BrokerAdmin.TEST_QUEUE_NAME)
+                                                   .bindRoutingKey("rk1")
+                                                   .bind()
+                                                   .consumeResponse(QueueBindOkBody.class)
+                                                   .queue()
+                                                   .unbindName("unknownExchange")
+                                                   .unbindQueueName(BrokerAdmin.TEST_QUEUE_NAME)
+                                                   .unbindRoutingKey("rk1")
+                                                   .unbind()
+                                                   .consumeResponse()
+                                                   .getLatestResponse(ChannelCloseBody.class);
+            assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.7.2.5", description = "If a unbind fails, the server
MUST raise a connection "
+                                                          + "exception")
+    public void queueUnbindUnknownRoutingKey() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            String testExchange = "testExchange";
+            ChannelCloseBody response = interaction.openAnonymousConnection()
+                                                   .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                   .exchange().declareName(testExchange).declare()
+                                                   .consumeResponse(ExchangeDeclareOkBody.class)
+                                                   .queue()
+                                                   .bindName(testExchange)
+                                                   .bindQueueName(BrokerAdmin.TEST_QUEUE_NAME)
+                                                   .bindRoutingKey("rk1")
+                                                   .bind()
+                                                   .consumeResponse(QueueBindOkBody.class)
+                                                   .queue()
+                                                   .unbindName(testExchange)
+                                                   .unbindQueueName(BrokerAdmin.TEST_QUEUE_NAME)
+                                                   .unbindRoutingKey("rk2")
+                                                   .unbind()
+                                                   .consumeResponse()
+                                                   .getLatestResponse(ChannelCloseBody.class);
+            assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
+        }
+    }
+
+    /** Qpid specific extension */
+    @Test
+    public void queueDeclareWithAlternateExchange() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final String altExchName = "altExchange";
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .exchange()
+                       .declareName(altExchName)
+                       .declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class)
+                       .queue()
+                       .declareName(BrokerAdmin.TEST_QUEUE_NAME)
+                       .declareArguments(Collections.singletonMap("alternateExchange", altExchName)).declare()
+                       .consumeResponse(QueueDeclareOkBody.class);
+
+            ChannelCloseBody inUseResponse = interaction.exchange()
+                                                        .deleteExchangeName(altExchName)
+                                                        .delete()
+                                                        .consumeResponse().getLatestResponse(ChannelCloseBody.class);
+            assertThat(inUseResponse.getReplyCode(), is(equalTo(ErrorCodes.NOT_ALLOWED)));
+            interaction.channel().closeOk();
+
+            interaction.channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .queue()
+                       .deleteName(BrokerAdmin.TEST_QUEUE_NAME)
+                       .delete()
+                       .consumeResponse(QueueDeleteOkBody.class)
+                       .exchange()
+                       .deleteExchangeName(altExchName)
+                       .delete()
+                       .consumeResponse(ExchangeDeleteOkBody.class);
+        }
+    }
+
+    /** Qpid specific extension */
+    @Test
+    @Ignore("TODO - server fails if queue is declared with unknown alternate exchange")
+    public void queueDeclareWithUnknownAlternateExchange() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ConnectionCloseBody response = interaction.openAnonymousConnection()
+                                                      .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                      .queue()
+                                                      .declareName(BrokerAdmin.TEST_QUEUE_NAME)
+                                                      .declareArguments(Collections.singletonMap("alternateExchange",
"notKnown")).declare()
+                                                      .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+            // TODO server fails - jira required
+            assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message