qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject qpid-broker-j git commit: QPID-8038: [Broker-J][AMQP 0-8..0-91] Add exchange protocol tests
Date Mon, 04 Dec 2017 07:55:25 GMT
Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 4ad30dd90 -> 104a03d29


QPID-8038: [Broker-J][AMQP 0-8..0-91] Add exchange protocol tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/104a03d2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/104a03d2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/104a03d2

Branch: refs/heads/master
Commit: 104a03d297705f26715fee54b3b1ac9f1853841c
Parents: 4ad30dd
Author: Keith Wall <keith.wall@gmail.com>
Authored: Mon Dec 4 07:53:57 2017 +0000
Committer: Keith Wall <keith.wall@gmail.com>
Committed: Mon Dec 4 07:53:57 2017 +0000

----------------------------------------------------------------------
 .../protocol/v0_8/ExchangeInteraction.java      |  69 ++++++-
 .../tests/protocol/v0_8/QueueInteraction.java   |  69 ++++++-
 .../qpid/tests/protocol/v0_8/ExchangeTest.java  | 197 +++++++++++++++++++
 .../qpid/tests/protocol/v0_8/QueueTest.java     |   2 +-
 .../tests/protocol/v0_8/TransactionTest.java    |   2 +-
 5 files changed, 332 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/104a03d2/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
index f4ed0a2..9218480 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
@@ -25,28 +25,56 @@ import java.util.Map;
 
 import org.apache.qpid.server.protocol.v0_8.AMQShortString;
 import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeBoundBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeleteBody;
 
 public class ExchangeInteraction
 {
     private Interaction _interaction;
-    private String _decalreExchange = "amq.direct";
+    private String _declareExchange = "amq.direct";
     private String _declareType = "direct";
-    private boolean _declarePassive = true;
-    private boolean _declareDurable = true;
+    private boolean _declarePassive = false;
+    private boolean _declareDurable = false;
     private boolean _declareAutoDelete = false;
     private boolean _declareNoWait = false;
     private Map<String, Object> _declareArguments = new HashMap<>();
 
+    private String _boundQueue;
+    private String _boundRoutingKey;
+    private String _boundExchange;
+
+    private String _deleteExchange;
+    private boolean _deleteIfUnused = false;
+    private boolean _deleteNoWait = false;
+
     public ExchangeInteraction(final Interaction interaction)
     {
         _interaction = interaction;
     }
 
+    public ExchangeInteraction declareName(final String name)
+    {
+        _declareExchange = name;
+        return this;
+    }
+
+    public ExchangeInteraction declarePassive(final boolean passive)
+    {
+        _declarePassive = passive;
+        return this;
+    }
+
+    public ExchangeInteraction declareDurable(final boolean durable)
+    {
+        _declareDurable = durable;
+        return this;
+    }
+
     public Interaction declare() throws Exception
     {
         return _interaction.sendPerformative(new ExchangeDeclareBody(0,
-                                                                     AMQShortString.valueOf(_decalreExchange),
+                                                                     AMQShortString.valueOf(_declareExchange),
                                                                      AMQShortString.valueOf(_declareType),
                                                                      _declarePassive,
                                                                      _declareDurable,
@@ -55,4 +83,37 @@ public class ExchangeInteraction
                                                                      _declareNoWait,
                                                                      FieldTable.convertToFieldTable(_declareArguments)));
     }
+
+    public ExchangeInteraction boundExchangeName(final String name)
+    {
+        _boundExchange = name;
+        return this;
+    }
+
+    public Interaction bound() throws Exception
+    {
+        return _interaction.sendPerformative(new ExchangeBoundBody(AMQShortString.valueOf(_boundExchange),
+                                                                   AMQShortString.valueOf(_boundRoutingKey),
+                                                                   AMQShortString.valueOf(_boundQueue)));
+    }
+
+    public ExchangeInteraction deleteExchangeName(final String name)
+    {
+        _deleteExchange = name;
+        return this;
+    }
+
+    public ExchangeInteraction deleteIfUnused(final boolean deleteIfUnused)
+    {
+        _deleteIfUnused = deleteIfUnused;
+        return this;
+    }
+
+    public Interaction delete() throws Exception
+    {
+        return _interaction.sendPerformative(new ExchangeDeleteBody(0,
+                                                                    AMQShortString.valueOf(_deleteExchange),
+                                                                    _deleteIfUnused,
+                                                                    _deleteNoWait));
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/104a03d2/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
index e54fa9a..49ec8bf 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
@@ -25,9 +25,11 @@ import java.util.Map;
 
 import org.apache.qpid.server.protocol.v0_8.AMQShortString;
 import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueBindBody;
 import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareBody;
 import org.apache.qpid.server.protocol.v0_8.transport.QueueDeleteBody;
 import org.apache.qpid.server.protocol.v0_8.transport.QueuePurgeBody;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueUnbindBody;
 
 public class QueueInteraction
 {
@@ -48,6 +50,16 @@ public class QueueInteraction
     private String _purgeName;
     private boolean _purgeNowait;
 
+    private String _bindQueueName;
+    private String _bindExchangeName;
+    private String _bindRoutingKey;
+    private Map<String, Object> _bindArguments = new HashMap<>();
+
+    private String _unbindQueueName;
+    private String _unbindExchangeName;
+    private String _unbindRoutingKey;
+    private Map<String, Object> _unbindArguments = new HashMap<>();
+
     public QueueInteraction(final Interaction interaction)
     {
         _interaction = interaction;
@@ -115,16 +127,71 @@ public class QueueInteraction
                                                                  _deleteIfEmpty,
                                                                  _deleteNowait));
     }
+
     public QueueInteraction purgeName(final String name)
     {
         _purgeName = name;
         return this;
     }
-
     public Interaction purge() throws Exception
     {
         return _interaction.sendPerformative(new QueuePurgeBody(0,
                                                                 AMQShortString.valueOf(_purgeName),
                                                                 _purgeNowait));
     }
+
+    public QueueInteraction bindQueueName(final String bindQueueName)
+    {
+        _bindQueueName = bindQueueName;
+        return this;
+    }
+
+    public QueueInteraction bindName(final String name)
+    {
+        _bindExchangeName = name;
+        return this;
+    }
+
+    public QueueInteraction bindRoutingKey(final String bindRoutingKey)
+    {
+        _bindRoutingKey = bindRoutingKey;
+        return this;
+    }
+
+    public Interaction bind() throws Exception
+    {
+        return _interaction.sendPerformative(new QueueBindBody(0,
+                                                               AMQShortString.valueOf(_bindQueueName),
+                                                               AMQShortString.valueOf(_bindExchangeName),
+                                                               AMQShortString.valueOf(_bindRoutingKey),
+                                                               _deleteNowait,
+                                                               FieldTable.convertToFieldTable(_bindArguments)));
+    }
+
+    public QueueInteraction unbindName(final String name)
+    {
+        _unbindExchangeName = name;
+        return this;
+    }
+
+    public QueueInteraction unbindQueueName(final String name)
+    {
+        _unbindQueueName = name;
+        return this;
+    }
+
+    public QueueInteraction unbindRoutingKey(final String routingKey)
+    {
+        _unbindRoutingKey = routingKey;
+        return this;
+    }
+
+    public Interaction unbind() throws Exception
+    {
+        return _interaction.sendPerformative(new QueueUnbindBody(0,
+                                                                 AMQShortString.valueOf(_unbindQueueName),
+                                                                 AMQShortString.valueOf(_unbindExchangeName),
+                                                                 AMQShortString.valueOf(_unbindRoutingKey),
+                                                                 FieldTable.convertToFieldTable(_unbindArguments)));
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/104a03d2/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ExchangeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ExchangeTest.java
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ExchangeTest.java
new file mode 100644
index 0000000..08d4201
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ExchangeTest.java
@@ -0,0 +1,197 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.ErrorCodes;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeBoundOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeleteOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueBindOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueUnbindOkBody;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class ExchangeTest extends BrokerAdminUsingTestBase
+{
+    private static final String TEST_EXCHANGE = "testExchange";
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    @SpecificationTest(section = "1.6.2.1", description = "verify exchange exists, create
if needed")
+    public void exchangeDeclare() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .exchange().declareName(TEST_EXCHANGE).declare()
+                       .consumeResponse().getLatestResponse(ExchangeDeclareOkBody.class);
+
+            ExchangeBoundOkBody response = interaction.exchange()
+                                                      .boundExchangeName(TEST_EXCHANGE)
+                                                      .bound()
+                                                      .consumeResponse()
+                                                      .getLatestResponse(ExchangeBoundOkBody.class);
+            assertThat(response.getReplyCode(), is(equalTo(ExchangeBoundOkBody.NO_BINDINGS)));
+
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.6.2.1",
+            description = "If [durable is] set when creating a new exchange, the exchange
will be marked as durable. "
+                          + "Durable exchanges remain active when a server restarts. Non-durable
exchanges (transient "
+                          + "exchanges) are purged if/when a server restarts.")
+    public void exchangeDeclareDurable() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .exchange().declareDurable(true).declareName(TEST_EXCHANGE).declare()
+                       .consumeResponse().getLatestResponse(ExchangeDeclareOkBody.class);
+        }
+
+        assumeThat(getBrokerAdmin().supportsRestart(), Matchers.is(true));
+        getBrokerAdmin().restart();
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ExchangeBoundOkBody response = interaction.openAnonymousConnection()
+                                                      .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                      .exchange()
+                                                      .boundExchangeName(TEST_EXCHANGE)
+                                                      .bound()
+                                                      .consumeResponse()
+                                                      .getLatestResponse(ExchangeBoundOkBody.class);
+            assertThat(response.getReplyCode(), is(equalTo(ExchangeBoundOkBody.NO_BINDINGS)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.6.2.3",
+            description = "This method deletes an exchange. When an exchange is deleted all
queue bindings on the "
+                          + "exchange are cancelled.")
+    public void exchangeDelete() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .exchange().declareName(TEST_EXCHANGE).declare()
+                       .consumeResponse().getLatestResponse(ExchangeDeclareOkBody.class);
+
+            ExchangeBoundOkBody boundResponse = interaction.exchange()
+                                                           .boundExchangeName(TEST_EXCHANGE)
+                                                           .bound()
+                                                           .consumeResponse()
+                                                           .getLatestResponse(ExchangeBoundOkBody.class);
+            assertThat(boundResponse.getReplyCode(), is(equalTo(ExchangeBoundOkBody.NO_BINDINGS)));
+
+            interaction.exchange()
+                       .deleteExchangeName(TEST_EXCHANGE)
+                       .delete()
+                       .consumeResponse()
+                       .getLatestResponse(ExchangeDeleteOkBody.class);
+
+            ExchangeBoundOkBody boundResponse2 = interaction.exchange()
+                                                           .boundExchangeName(TEST_EXCHANGE)
+                                                           .bound()
+                                                           .consumeResponse()
+                                                           .getLatestResponse(ExchangeBoundOkBody.class);
+
+            assertThat(boundResponse2.getReplyCode(), is(equalTo(ExchangeBoundOkBody.EXCHANGE_NOT_FOUND)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.6.2.3",
+            description = "If [if-unused is] set, the server will only delete the exchange
if it has no queue"
+                          + "bindings. If the exchange has queue bindings the server does
not delete it but raises a "
+                          + "channel exception instead.")
+    public void exchangeDeleteInUse() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .exchange().declareName(TEST_EXCHANGE).declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class)
+                       .queue().bindName(TEST_EXCHANGE).bindQueueName(BrokerAdmin.TEST_QUEUE_NAME).bind()
+                       .consumeResponse(QueueBindOkBody.class);
+
+            ChannelCloseBody response = interaction.exchange()
+                                                   .deleteExchangeName(TEST_EXCHANGE)
+                                                   .deleteIfUnused(true)
+                                                   .delete()
+                                                   .consumeResponse()
+                                                   .getLatestResponse(ChannelCloseBody.class);
+            assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.IN_USE)));
+            interaction.channel().closeOk();
+
+            ExchangeBoundOkBody boundResponse = interaction.channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                           .exchange()
+                                                           .boundExchangeName(TEST_EXCHANGE)
+                                                           .bound()
+                                                           .consumeResponse()
+                                                           .getLatestResponse(ExchangeBoundOkBody.class);
+
+            assertThat(boundResponse.getReplyCode(), is(equalTo(ExchangeBoundOkBody.OK)));
+
+            interaction.queue().unbindName(TEST_EXCHANGE).unbindQueueName(BrokerAdmin.TEST_QUEUE_NAME).unbind()
+                       .consumeResponse(QueueUnbindOkBody.class)
+                       .exchange()
+                       .deleteIfUnused(true)
+                       .deleteExchangeName(TEST_EXCHANGE)
+                       .delete()
+                       .consumeResponse()
+                       .getLatestResponse(ExchangeDeleteOkBody.class);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/104a03d2/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
index cae90ed..fc65e02 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
@@ -140,7 +140,7 @@ public class QueueTest extends BrokerAdminUsingTestBase
 
     @Test
     @SpecificationTest(section = "1.7.2.1",
-            description = "If [durable is] set when creating a new queue, the queue will
be marked as durable."
+            description = "If [durable is] set when creating a new queue, the queue will
be marked as durable. "
                           + "Durable queues remain active when a server restarts.")
     public void queueDeclareDurable() throws Exception
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/104a03d2/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
index df677e8..6e1208a 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
@@ -67,7 +67,7 @@ public class TransactionTest extends BrokerAdminUsingTestBase
                        .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
                        .content("Test")
                        .publishMessage()
-                       .exchange().declare().consumeResponse(ExchangeDeclareOkBody.class);
+                       .exchange().declarePassive(true).declare().consumeResponse(ExchangeDeclareOkBody.class);
             assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
is(equalTo(0)));
 
             interaction.tx().commit().consumeResponse(TxCommitOkBody.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message