qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject qpid-broker-j git commit: [Broker-J][AMQP 0-8..0-10] Queue declare arguments that match know attributes should be acceptable
Date Tue, 04 Dec 2018 21:50:27 GMT
Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 50b462d7a -> c80aceab2


[Broker-J][AMQP 0-8..0-10] Queue declare arguments that match know attributes should be acceptable


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c80aceab
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c80aceab
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c80aceab

Branch: refs/heads/master
Commit: c80aceab29fc8c6c835c0d83554286bbe2ed8e36
Parents: 50b462d
Author: Alex Rudyy <orudyy@apache.org>
Authored: Tue Dec 4 18:04:34 2018 +0000
Committer: Alex Rudyy <orudyy@apache.org>
Committed: Tue Dec 4 18:04:34 2018 +0000

----------------------------------------------------------------------
 .../server/queue/QueueArgumentsConverter.java   |  20 ++-
 .../protocol/v0_10/ServerSessionDelegate.java   |   4 +-
 .../qpid/server/protocol/v0_8/AMQChannel.java   |   2 +-
 .../qpid/tests/protocol/v0_10/Interaction.java  |  22 +++
 .../tests/protocol/v0_10/QueueInteraction.java  |   8 +
 .../v0_10/LargeApplicationHeadersTest.java      |  37 +----
 .../protocol/v0_10/LargeMessageBodyTest.java    |  36 +----
 .../qpid/tests/protocol/v0_10/MessageTest.java  |  81 +++-------
 .../v0_10/extensions/queue/QueueTest.java       | 152 +++++++++++++++++
 .../v0_8/extension/queue/QueueTest.java         | 161 +++++++++++++++++++
 10 files changed, 397 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
index 65be871..e67e9a3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -20,16 +20,21 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.model.AlternateBinding;
 import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectAttribute;
+import org.apache.qpid.server.model.ConfiguredObjectTypeRegistry;
+import org.apache.qpid.server.model.Model;
 import org.apache.qpid.server.model.OverflowPolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -130,11 +135,24 @@ public class QueueArgumentsConverter
 
 
     public static Map<String,Object> convertWireArgsToModel(final String queueName,
-                                                            Map<String, Object> wireArguments)
+                                                            final Map<String, Object>
wireArguments,
+                                                            final Model model)
     {
         Map<String,Object> modelArguments = new HashMap<>();
         if(wireArguments != null)
         {
+            final ConfiguredObjectTypeRegistry typeRegistry = model.getTypeRegistry();
+            final Map<String, ConfiguredObjectAttribute<?, ?>> attributeTypes
=
+                    new HashMap<>(typeRegistry.getAttributeTypes(Queue.class));
+            typeRegistry.getTypeSpecialisations(Queue.class)
+                        .forEach(type -> typeRegistry.getTypeSpecificAttributes(type)
+                                                     .forEach(t -> attributeTypes.put(t.getName(),
t)));
+            wireArguments.entrySet()
+                         .stream()
+                         .filter(entry -> attributeTypes.containsKey(entry.getKey())
+                                          && !attributeTypes.get(entry.getKey()).isDerived())
+                         .forEach(entry -> modelArguments.put(entry.getKey(), entry.getValue()));
+
             for(Map.Entry<String,String> entry : ATTRIBUTE_MAPPINGS.entrySet())
             {
                 if(wireArguments.containsKey(entry.getKey()))

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 505ea89..5dda560 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -1557,7 +1557,9 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession>
impleme
             try
             {
                 final Map<String, Object> arguments = QueueArgumentsConverter.convertWireArgsToModel(queueName,
-                                                                                        
            method.getArguments());
+                                                                                        
            method.getArguments(),
+                                                                                        
            session.getAMQPConnection()
+                                                                                        
                   .getModel());
                 final String alternateExchangeName = method.getAlternateExchange();
                 if (method.hasAlternateExchange() && !nameNullOrEmpty(alternateExchangeName))
                 {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 4b4f9c7..e2048a0 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -2985,7 +2985,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                 }
 
                 Map<String, Object> attributes =
-                        QueueArgumentsConverter.convertWireArgsToModel(queueNameString, wireArguments);
+                        QueueArgumentsConverter.convertWireArgsToModel(queueNameString, wireArguments,
getModel());
 
                 attributes.put(Queue.NAME, queueNameString);
                 attributes.put(Queue.DURABLE, durable);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
index 03deef5..8804c31 100644
--- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.tests.protocol.v0_10;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 import org.apache.qpid.server.protocol.v0_10.transport.BBDecoder;
 import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
@@ -34,6 +35,7 @@ import org.apache.qpid.server.protocol.v0_10.transport.Method;
 import org.apache.qpid.server.protocol.v0_10.transport.SessionAttached;
 import org.apache.qpid.tests.protocol.AbstractFrameTransport;
 import org.apache.qpid.tests.protocol.AbstractInteraction;
+import org.apache.qpid.tests.protocol.Response;
 
 public class Interaction extends AbstractInteraction<Interaction>
 {
@@ -204,4 +206,24 @@ public class Interaction extends AbstractInteraction<Interaction>
     {
         return _exchangeInteraction;
     }
+
+    public <T extends Method> T consume(final Class<T> expected,
+                                        final Class<? extends Method>... ignore)
+            throws Exception
+    {
+        final Class<? extends Method>[] expectedResponses = Arrays.copyOf(ignore, ignore.length
+ 1);
+        expectedResponses[ignore.length] = expected;
+
+        T completed = null;
+        do
+        {
+            Response<?> response = consumeResponse(expectedResponses).getLatestResponse();
+            if (expected.isAssignableFrom(response.getBody().getClass()))
+            {
+                completed = (T) response.getBody();
+            }
+        }
+        while (completed == null);
+        return completed;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java
index ca3edee..6171466 100644
--- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.tests.protocol.v0_10;
 
+import java.util.Map;
+
 import org.apache.qpid.server.protocol.v0_10.transport.QueueDeclare;
 import org.apache.qpid.server.protocol.v0_10.transport.QueueDelete;
 import org.apache.qpid.server.protocol.v0_10.transport.QueuePurge;
@@ -143,4 +145,10 @@ public class QueueInteraction
     {
         return _interaction.sendPerformative(_query);
     }
+
+    public QueueInteraction declareArguments(final Map<String, Object> arguments)
+    {
+        _declare.setArguments(arguments);
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java
b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java
index 2872f46..51a376a 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java
@@ -24,22 +24,17 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.qpid.server.protocol.v0_10.transport.*;
-import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
@@ -113,12 +108,10 @@ public class LargeApplicationHeadersTest extends BrokerAdminUsingTestBase
                        .consumeResponse()
                        .getLatestResponse(SessionCompleted.class);
 
-
-            MessageTransfer transfer = consumeResponse(interaction,
-                                                       MessageTransfer.class,
-                                                       SessionCompleted.class,
-                                                       SessionCommandPoint.class,
-                                                       SessionConfirmed.class);
+            MessageTransfer transfer = interaction.consume(MessageTransfer.class,
+                                                           SessionCompleted.class,
+                                                           SessionCommandPoint.class,
+                                                           SessionConfirmed.class);
 
             assertThat(transfer.getBodySize(), is(0));
 
@@ -134,28 +127,6 @@ public class LargeApplicationHeadersTest extends BrokerAdminUsingTestBase
         }
     }
 
-    private <T extends Method> T consumeResponse(final Interaction interaction,
-                                                 final Class<T> expected,
-                                                 final Class<? extends Method>... ignore)
-            throws Exception
-    {
-        List<Class<? extends Method>> possibleResponses = new ArrayList<>(Arrays.asList(ignore));
-        possibleResponses.add(expected);
-
-        T completed = null;
-        do
-        {
-            interaction.consumeResponse(possibleResponses.toArray(new Class[possibleResponses.size()]));
-            Response<?> response = interaction.getLatestResponse();
-            if (expected.isAssignableFrom(response.getBody().getClass()))
-            {
-                completed = (T) response.getBody();
-            }
-        }
-        while (completed == null);
-        return completed;
-    }
-
     private Map<String, Object> createApplicationHeadersThatExceedSingleFrame(final
int headerPropertySize, final int maxFrameSize)
     {
         Map<String, Object> applicationHeaders = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java
b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java
index cc2ef42..6d829d5 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java
@@ -26,9 +26,6 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 import java.util.stream.IntStream;
 
 import org.junit.Before;
@@ -41,11 +38,9 @@ import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
-import org.apache.qpid.server.protocol.v0_10.transport.Method;
 import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
 import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
 import org.apache.qpid.server.protocol.v0_10.transport.SessionConfirmed;
-import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
@@ -119,11 +114,10 @@ public class LargeMessageBodyTest extends BrokerAdminUsingTestBase
                        .consumeResponse()
                        .getLatestResponse(SessionCompleted.class);
 
-            MessageTransfer transfer = consumeResponse(interaction,
-                                                       MessageTransfer.class,
-                                                       SessionCompleted.class,
-                                                       SessionCommandPoint.class,
-                                                       SessionConfirmed.class);
+            MessageTransfer transfer = interaction.consume(MessageTransfer.class,
+                                                           SessionCompleted.class,
+                                                           SessionCommandPoint.class,
+                                                           SessionConfirmed.class);
 
             assertThat(transfer.getBodySize(), is(equalTo(messageContent.length)));
             QpidByteBuffer receivedBody = transfer.getBody();
@@ -132,26 +126,4 @@ public class LargeMessageBodyTest extends BrokerAdminUsingTestBase
             assertThat(receivedBytes, is(equalTo(messageContent)));
         }
     }
-
-    private <T extends Method> T consumeResponse(final Interaction interaction,
-                                                 final Class<T> expected,
-                                                 final Class<? extends Method>... ignore)
-            throws Exception
-    {
-        List<Class<? extends Method>> possibleResponses = new ArrayList<>(Arrays.asList(ignore));
-        possibleResponses.add(expected);
-
-        T completed = null;
-        do
-        {
-            interaction.consumeResponse(possibleResponses.toArray(new Class[possibleResponses.size()]));
-            Response<?> response = interaction.getLatestResponse();
-            if (expected.isAssignableFrom(response.getBody().getClass()))
-            {
-                completed = (T) response.getBody();
-            }
-        }
-        while (completed == null);
-        return completed;
-    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
index 6747451..08baf22 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
@@ -27,9 +27,6 @@ import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -41,14 +38,12 @@ import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
-import org.apache.qpid.server.protocol.v0_10.transport.Method;
 import org.apache.qpid.server.protocol.v0_10.transport.Range;
 import org.apache.qpid.server.protocol.v0_10.transport.RangeSet;
 import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
 import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
 import org.apache.qpid.server.protocol.v0_10.transport.SessionConfirmed;
 import org.apache.qpid.server.protocol.v0_10.transport.SessionFlush;
-import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -115,10 +110,8 @@ public class MessageTest extends BrokerAdminUsingTestBase
                        .flushCompleted()
                        .flush();
 
-            SessionCompleted completed = consumeResponse(interaction,
-                                                         SessionCompleted.class,
-                                                         SessionCommandPoint.class,
-                                                         SessionConfirmed.class);
+            SessionCompleted completed =
+                    interaction.consume(SessionCompleted.class, SessionCommandPoint.class,
SessionConfirmed.class);
 
             assertThat(completed.getCommands(), is(notNullValue()));
             assertThat(completed.getCommands().includes(0), is(equalTo(true)));
@@ -160,11 +153,10 @@ public class MessageTest extends BrokerAdminUsingTestBase
                        .flowValue(-1)
                        .flow();
 
-            MessageTransfer transfer = consumeResponse(interaction,
-                                                       MessageTransfer.class,
-                                                       SessionCompleted.class,
-                                                       SessionCommandPoint.class,
-                                                       SessionConfirmed.class);
+            MessageTransfer transfer = interaction.consume(MessageTransfer.class,
+                                                           SessionCompleted.class,
+                                                           SessionCommandPoint.class,
+                                                           SessionConfirmed.class);
 
             try (QpidByteBuffer buffer = transfer.getBody())
             {
@@ -210,11 +202,10 @@ public class MessageTest extends BrokerAdminUsingTestBase
                        .flowValue(-1)
                        .flow();
 
-            MessageTransfer transfer = consumeResponse(interaction,
-                                                       MessageTransfer.class,
-                                                       SessionCompleted.class,
-                                                       SessionCommandPoint.class,
-                                                       SessionConfirmed.class);
+            MessageTransfer transfer = interaction.consume(MessageTransfer.class,
+                                                           SessionCompleted.class,
+                                                           SessionCommandPoint.class,
+                                                           SessionConfirmed.class);
 
             assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
is(equalTo(1)));
 
@@ -224,11 +215,10 @@ public class MessageTest extends BrokerAdminUsingTestBase
                        .flushCompleted()
                        .flush();
 
-            SessionCompleted completed = consumeResponse(interaction,
-                                                         SessionCompleted.class,
-                                                         SessionCommandPoint.class,
-                                                         SessionConfirmed.class,
-                                                         SessionFlush.class);
+            SessionCompleted completed = interaction.consume(SessionCompleted.class,
+                                                             SessionCommandPoint.class,
+                                                             SessionConfirmed.class,
+                                                             SessionFlush.class);
 
             assertThat(completed.getCommands(), is(notNullValue()));
             assertThat(completed.getCommands().includes(3), is(equalTo(true)));
@@ -273,12 +263,11 @@ public class MessageTest extends BrokerAdminUsingTestBase
                        .flowValue(-1)
                        .flow();
 
-            MessageTransfer transfer = consumeResponse(interaction,
-                                                       MessageTransfer.class,
-                                                       SessionCompleted.class,
-                                                       SessionCommandPoint.class,
-                                                       SessionConfirmed.class,
-                                                       SessionFlush.class);
+            MessageTransfer transfer = interaction.consume(MessageTransfer.class,
+                                                           SessionCompleted.class,
+                                                           SessionCommandPoint.class,
+                                                           SessionConfirmed.class,
+                                                           SessionFlush.class);
 
             assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
is(equalTo(1)));
 
@@ -293,11 +282,10 @@ public class MessageTest extends BrokerAdminUsingTestBase
                        .session().flushCompleted()
                                  .flush();
 
-            SessionCompleted completed = consumeResponse(interaction,
-                                                         SessionCompleted.class,
-                                                         SessionCommandPoint.class,
-                                                         SessionConfirmed.class,
-                                                         SessionFlush.class);
+            SessionCompleted completed = interaction.consume(SessionCompleted.class,
+                                                             SessionCommandPoint.class,
+                                                             SessionConfirmed.class,
+                                                             SessionFlush.class);
 
             assertThat(completed.getCommands(), is(notNullValue()));
             assertThat(completed.getCommands().includes(4), is(equalTo(true)));
@@ -305,27 +293,4 @@ public class MessageTest extends BrokerAdminUsingTestBase
             assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
is(equalTo(0)));
         }
     }
-
-    private <T extends Method> T consumeResponse(final Interaction interaction,
-                                                 final Class<T> expected,
-                                                 final Class<? extends Method>... ignore)
-            throws Exception
-    {
-        List<Class<? extends Method>> possibleResponses = new ArrayList<>(Arrays.asList(ignore));
-        possibleResponses.add(expected);
-
-        T completed = null;
-        do
-        {
-            interaction.consumeResponse(possibleResponses.toArray(new Class[possibleResponses.size()]));
-            Response<?> response = interaction.getLatestResponse();
-            if (expected.isAssignableFrom(response.getBody().getClass()))
-            {
-                completed = (T) response.getBody();
-            }
-        }
-        while (completed == null);
-        return completed;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java
b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java
new file mode 100644
index 0000000..f5216c2
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java
@@ -0,0 +1,152 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10.extensions.queue;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionResult;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
+import org.apache.qpid.server.protocol.v0_10.transport.QueueQueryResult;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionConfirmed;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v0_10.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_10.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+public class QueueTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+    private static final byte[] SESSION_NAME = "test".getBytes(UTF_8);
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    @SpecificationTest(section = "10.queue.declare", description = "This command creates
or checks a queue.")
+    public void queueDeclareUsingRealQueueAttributesInWireArguments() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            SessionCompleted completed = interaction.openAnonymousConnection()
+                                                    .channelId(1)
+                                                    .attachSession(SESSION_NAME)
+                                                    .queue()
+                                                    .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                                                    .declareArguments(Collections.singletonMap("defaultFilters",
+                                                                                        
      "{\"selector\":{\"x-filter-jms-selector\":[\"id=2\"]}}"))
+                                                    .declareId(0)
+                                                    .declare()
+                                                    .session()
+                                                    .flushCompleted()
+                                                    .flush()
+                                                    .consumeResponse()
+                                                    .getLatestResponse(SessionCompleted.class);
+
+            assertThat(completed.getCommands().includes(0), is(equalTo(true)));
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
is(equalTo(0)));
+
+            MessageProperties messageProperties1 = new MessageProperties();
+            messageProperties1.setApplicationHeaders(Collections.singletonMap("id", 1));
+
+            interaction.message()
+                       .transferDestination(BrokerAdmin.TEST_QUEUE_NAME)
+                       .transferId(0)
+                       .transferHeader(null, messageProperties1)
+                       .transferBody("Test1".getBytes(StandardCharsets.UTF_8))
+                       .transfer()
+                       .session()
+                       .flushCompleted()
+                       .flush()
+                       .consumeResponse();
+
+            MessageProperties messageProperties2 = new MessageProperties();
+            messageProperties2.setApplicationHeaders(Collections.singletonMap("id", 2));
+            final String body2 = "Message 2 Content";
+            interaction.message()
+                       .transferDestination(BrokerAdmin.TEST_QUEUE_NAME)
+                       .transferId(1)
+                       .transferHeader(null, messageProperties2)
+                       .transferBody(body2.getBytes(StandardCharsets.UTF_8))
+                       .transfer()
+                       .session()
+                       .flushCompleted()
+                       .flush()
+                       .consumeResponse();
+
+            final String subscriberName = "Test";
+            interaction.message()
+                       .subscribeDestination(subscriberName)
+                       .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                       .subscribeId(0)
+                       .subscribe()
+                       .message()
+                       .flowId(1)
+                       .flowDestination(subscriberName)
+                       .flowUnit(MessageCreditUnit.MESSAGE)
+                       .flowValue(1)
+                       .flow()
+                       .message()
+                       .flowId(2)
+                       .flowDestination(subscriberName)
+                       .flowUnit(MessageCreditUnit.BYTE)
+                       .flowValue(-1)
+                       .flow();
+
+            MessageTransfer transfer = interaction.consume(MessageTransfer.class,
+                                                           SessionCompleted.class,
+                                                           SessionCommandPoint.class,
+                                                           SessionConfirmed.class);
+
+            try (QpidByteBuffer buffer = transfer.getBody())
+            {
+                final byte[] dst = new byte[buffer.remaining()];
+                buffer.get(dst);
+                assertThat(new String(dst, UTF_8), is(equalTo(body2)));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java
new file mode 100644
index 0000000..af809e1
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java
@@ -0,0 +1,161 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8.extension.queue;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicDeliverBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicQosOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareOkBody;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_8.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+public class QueueTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    @SpecificationTest(section = "1.7.2.1", description = "declare queue, create if needed")
+    public void queueDeclareUsingRealQueueAttributesInWireArguments() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            QueueDeclareOkBody response = interaction.openAnonymousConnection()
+                                                     .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                     .queue().declareName(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .declareArguments(Collections.singletonMap("defaultFilters",
+                                                                                        
       "{\"selector\":{\"x-filter-jms-selector\":[\"id=2\"]}}"))
+                                                     .declare()
+                                                     .consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
+
+            assertThat(response.getQueue(), is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME))));
+            assertThat(response.getMessageCount(), is(equalTo(0L)));
+            assertThat(response.getConsumerCount(), is(equalTo(0L)));
+
+            // make sure that wire arguments took effect
+            // by publishing messages and consuming message matching filter
+
+            String consumerTag = "test";
+            interaction.basic().qosPrefetchCount(2)
+                       .qos()
+                       .consumeResponse(BasicQosOkBody.class)
+                       .basic().consumeConsumerTag(consumerTag)
+                       .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                       .consume()
+                       .consumeResponse(BasicConsumeOkBody.class)
+                       .channel().flow(true)
+                       .consumeResponse(ChannelFlowOkBody.class);
+
+            String content2 = "Test Content 2";
+            Map<String, Object> messageHeaders2 = Collections.singletonMap("id", 2);
+            String contentType = "text/plain";
+
+            // first message is not matching queue default filter
+            interaction.basic().publishExchange("")
+                       .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+                       .publishMandatory(true)
+                       .contentHeaderPropertiesContentType(contentType)
+                       .contentHeaderPropertiesHeaders(Collections.singletonMap("id", 1))
+                       .content("Test1")
+                       .publishMessage()
+
+                       // second message is matching queue default filter
+                       .basic().publishExchange("")
+                       .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+                       .publishMandatory(true)
+                       .contentHeaderPropertiesContentType(contentType)
+                       .contentHeaderPropertiesHeaders(messageHeaders2)
+                       .content(content2)
+                       .publishMessage();
+
+            // second message should be received
+            BasicDeliverBody delivery =
+                    interaction.consumeResponse(BasicDeliverBody.class).getLatestResponse(BasicDeliverBody.class);
+
+            assertThat(delivery.getConsumerTag(), is(equalTo(AMQShortString.valueOf(consumerTag))));
+            assertThat(delivery.getConsumerTag(), is(notNullValue()));
+            assertThat(delivery.getRedelivered(), is(equalTo(false)));
+            assertThat(delivery.getExchange(), is(nullValue()));
+            assertThat(delivery.getRoutingKey(), is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME))));
+
+            ContentHeaderBody header =
+                    interaction.consumeResponse(ContentHeaderBody.class).getLatestResponse(ContentHeaderBody.class);
+
+            assertThat(header.getBodySize(), is(equalTo((long) content2.length())));
+            BasicContentHeaderProperties properties = header.getProperties();
+            Map<String, Object> receivedHeaders = new HashMap<>(FieldTable.convertToMap(properties.getHeaders()));
+            assertThat(receivedHeaders, is(equalTo(new HashMap<>(messageHeaders2))));
+            assertThat(properties.getContentTypeAsString(), is(equalTo(contentType)));
+
+            ContentBody content = interaction.consumeResponse(ContentBody.class).getLatestResponse(ContentBody.class);
+
+            QpidByteBuffer payload = content.getPayload();
+            byte[] contentData = new byte[payload.remaining()];
+            payload.get(contentData);
+            payload.dispose();
+            String receivedContent = new String(contentData, StandardCharsets.UTF_8);
+
+            assertThat(receivedContent, is(equalTo(receivedContent)));
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
is(equalTo(2)));
+
+            interaction.basic().ackDeliveryTag(delivery.getDeliveryTag())
+                       .ack()
+                       .channel().close().consumeResponse(ChannelCloseOkBody.class);
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
is(equalTo(1)));
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message