From commits-return-47553-apmail-qpid-commits-archive=qpid.apache.org@qpid.apache.org Wed Dec 5 13:44:23 2018 Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9FC91188AD for ; Wed, 5 Dec 2018 13:44:23 +0000 (UTC) Received: (qmail 86470 invoked by uid 500); 5 Dec 2018 13:44:23 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 86441 invoked by uid 500); 5 Dec 2018 13:44:23 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 86432 invoked by uid 99); 5 Dec 2018 13:44:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Dec 2018 13:44:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 000F1E1192; Wed, 5 Dec 2018 13:44:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: orudyy@apache.org To: commits@qpid.apache.org Date: Wed, 05 Dec 2018 13:44:23 -0000 Message-Id: <347258380874463eaad8118e31e58231@git.apache.org> In-Reply-To: <5625ffbe6fa843df9ef4e5bc8b731d3a@git.apache.org> References: <5625ffbe6fa843df9ef4e5bc8b731d3a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] qpid-broker-j git commit: QPID-7694:[Broker-J][AMQP 0-8..0-10] Add validation for queue declare arguments QPID-7694:[Broker-J][AMQP 0-8..0-10] Add validation for queue declare arguments Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ab33d1ad Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ab33d1ad Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ab33d1ad Branch: refs/heads/master Commit: ab33d1ad69d3f27f8a56f046fd959ceb0f62d05a Parents: c80acea Author: Alex Rudyy Authored: Wed Dec 5 12:31:48 2018 +0000 Committer: Alex Rudyy Committed: Wed Dec 5 13:39:58 2018 +0000 ---------------------------------------------------------------------- .../server/queue/QueueArgumentsConverter.java | 86 +++++++++++++------- .../qpid/server/protocol/v0_8/AMQChannel.java | 2 +- .../v0_10/extensions/queue/QueueTest.java | 35 ++++++-- .../v0_8/extension/queue/QueueTest.java | 21 ++++- 4 files changed, 103 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ab33d1ad/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index e67e9a3..0c01d52 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -20,12 +20,15 @@ */ package org.apache.qpid.server.queue; -import java.util.Collection; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; -import java.util.stream.Collectors; +import java.util.Objects; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,10 +66,10 @@ public class QueueArgumentsConverter private static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key"; private static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key"; - static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled"; + private static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled"; private static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count"; - static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key"; - static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group"; + private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key"; + private static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group"; private static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group"; private static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability"; @@ -142,22 +145,28 @@ public class QueueArgumentsConverter if(wireArguments != null) { final ConfiguredObjectTypeRegistry typeRegistry = model.getTypeRegistry(); - final Map> attributeTypes = - new HashMap<>(typeRegistry.getAttributeTypes(Queue.class)); + final List> attributeTypes = + new ArrayList<>(typeRegistry.getAttributeTypes(Queue.class).values()); typeRegistry.getTypeSpecialisations(Queue.class) - .forEach(type -> typeRegistry.getTypeSpecificAttributes(type) - .forEach(t -> attributeTypes.put(t.getName(), t))); + .forEach(type -> attributeTypes.addAll(typeRegistry.getTypeSpecificAttributes(type))); + + final Set wireArgumentNames = new HashSet<>(wireArguments.keySet()); wireArguments.entrySet() .stream() - .filter(entry -> attributeTypes.containsKey(entry.getKey()) - && !attributeTypes.get(entry.getKey()).isDerived()) - .forEach(entry -> modelArguments.put(entry.getKey(), entry.getValue())); + .filter(entry -> attributeTypes.stream() + .anyMatch(type -> Objects.equals(entry.getKey(), type.getName()) + && !type.isDerived())) + .forEach(entry -> { + modelArguments.put(entry.getKey(), entry.getValue()); + wireArgumentNames.remove(entry.getKey()); + }); for(Map.Entry entry : ATTRIBUTE_MAPPINGS.entrySet()) { if(wireArguments.containsKey(entry.getKey())) { modelArguments.put(entry.getValue(), wireArguments.get(entry.getKey())); + wireArgumentNames.remove(entry.getKey()); } } if(wireArguments.containsKey(QPID_LAST_VALUE_QUEUE) && !wireArguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY)) @@ -169,10 +178,13 @@ public class QueueArgumentsConverter modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.valueOf(String.valueOf(wireArguments.get(QPID_POLICY_TYPE)).toUpperCase())); } - if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP) - && SHARED_MSG_GROUP_ARG_VALUE.equals(String.valueOf(wireArguments.get(QPID_SHARED_MSG_GROUP)))) + if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP)) { - modelArguments.put(Queue.MESSAGE_GROUP_TYPE, MessageGroupType.SHARED_GROUPS); + wireArgumentNames.remove(QPID_SHARED_MSG_GROUP); + if (SHARED_MSG_GROUP_ARG_VALUE.equals(String.valueOf(wireArguments.get(QPID_SHARED_MSG_GROUP)))) + { + modelArguments.put(Queue.MESSAGE_GROUP_TYPE, MessageGroupType.SHARED_GROUPS); + } } else if(wireArguments.containsKey(QPID_GROUP_HEADER_KEY)) { @@ -189,34 +201,40 @@ public class QueueArgumentsConverter modelArguments.put(Queue.NO_LOCAL, Boolean.parseBoolean(wireArguments.get(QPID_NO_LOCAL).toString())); } - if (wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY) != null && wireArguments.get(X_QPID_CAPACITY) != null) + if (wireArguments.containsKey(X_QPID_FLOW_RESUME_CAPACITY)) { - double resumeCapacity = Integer.parseInt(wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY).toString()); - double maximumCapacity = Integer.parseInt(wireArguments.get(X_QPID_CAPACITY).toString()); - if (resumeCapacity > maximumCapacity) - { - throw new ConnectionScopedRuntimeException( - "Flow resume size can't be greater than flow control size"); - } - Map context = (Map) modelArguments.get(Queue.CONTEXT); - if (context == null) + wireArgumentNames.remove(X_QPID_FLOW_RESUME_CAPACITY); + if (wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY) != null && wireArguments.get(X_QPID_CAPACITY) != null) { - context = new HashMap<>(); - modelArguments.put(Queue.CONTEXT, context); + double resumeCapacity = Integer.parseInt(wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY).toString()); + double maximumCapacity = Integer.parseInt(wireArguments.get(X_QPID_CAPACITY).toString()); + if (resumeCapacity > maximumCapacity) + { + throw new ConnectionScopedRuntimeException( + "Flow resume size can't be greater than flow control size"); + } + Map context = (Map) modelArguments.get(Queue.CONTEXT); + if (context == null) + { + context = new HashMap<>(); + modelArguments.put(Queue.CONTEXT, context); + } + double ratio = resumeCapacity / maximumCapacity; + context.put(Queue.QUEUE_FLOW_RESUME_LIMIT, String.format("%.2f", ratio * 100.0)); + modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL); } - double ratio = resumeCapacity / maximumCapacity; - context.put(Queue.QUEUE_FLOW_RESUME_LIMIT, String.format("%.2f", ratio * 100.0)); - modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL); } - if (wireArguments.get(ALTERNATE_EXCHANGE) != null) + if (wireArguments.containsKey(ALTERNATE_EXCHANGE)) { + wireArgumentNames.remove(ALTERNATE_EXCHANGE); modelArguments.put(Queue.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, wireArguments.get(ALTERNATE_EXCHANGE))); } else if (wireArguments.containsKey(X_QPID_DLQ_ENABLED)) { + wireArgumentNames.remove(X_QPID_DLQ_ENABLED); Object argument = wireArguments.get(X_QPID_DLQ_ENABLED); if ((argument instanceof Boolean && ((Boolean) argument).booleanValue()) || (argument instanceof String && Boolean.parseBoolean((String)argument))) @@ -226,6 +244,12 @@ public class QueueArgumentsConverter getDeadLetterQueueName(queueName))); } } + + if (!wireArgumentNames.isEmpty()) + { + throw new IllegalArgumentException(String.format("Unsupported queue declare argument(s) : %s", + String.join(",", wireArgumentNames))); + } } return modelArguments; } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ab33d1ad/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index e2048a0..fc85de2 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -3107,7 +3107,7 @@ public class AMQChannel extends AbstractAMQPSession