qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject [4/4] qpid-broker-j git commit: QPID-7937 : Define message grouping policy with an enum (default NONE), if messageGroupKey is not supplied in Queue attributes, use the message property "group-id"
Date Sat, 30 Sep 2017 20:31:50 GMT
QPID-7937 : Define message grouping policy with an enum (default NONE), if messageGroupKey is not supplied in Queue attributes, use the message property "group-id"


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/3745dd95
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/3745dd95
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/3745dd95

Branch: refs/heads/master
Commit: 3745dd952e998b59f27699ad208a80a7c9ffe90e
Parents: 9d5a825
Author: rgodfrey <rgodfrey@apache.org>
Authored: Sat Sep 30 22:10:58 2017 +0200
Committer: rgodfrey <rgodfrey@apache.org>
Committed: Sat Sep 30 22:30:17 2017 +0200

----------------------------------------------------------------------
 .../qpid/server/message/AMQMessageHeader.java   |   2 +
 .../message/internal/InternalMessageHeader.java |   7 ++
 .../org/apache/qpid/server/model/Queue.java     |  18 ++-
 .../apache/qpid/server/queue/AbstractQueue.java |  31 +++--
 .../AssignedConsumerMessageGroupManager.java    |  15 ++-
 .../queue/DefinedGroupMessageGroupManager.java  |  25 +++--
 .../qpid/server/queue/MessageGroupType.java     |  28 +++++
 .../server/queue/QueueArgumentsConverter.java   |  75 +++++++------
 .../VirtualHostStoreUpgraderAndRecoverer.java   |  56 ++++++++++
 .../server/virtualhost/AbstractVirtualHost.java |   7 ++
 .../server/exchange/HeadersBindingTest.java     |   7 +-
 .../VirtualHostQueueCreationTest.java           |   5 +-
 .../MessageConverter_v0_10_to_Internal.java     |   6 +
 .../protocol/v0_10/MessageTransferHeader.java   |   8 +-
 .../v0_8/MessageConverter_v0_8_to_Internal.java |   7 +-
 .../server/protocol/v0_8/MessageMetaData.java   |   7 ++
 .../protocol/v1_0/MessageMetaData_1_0.java      |  13 +++
 .../server/management/amqp/ManagementNode.java  |   6 +
 .../src/main/java/resources/addQueue.html       |  25 +++--
 .../java/resources/js/qpid/management/Queue.js  |   8 +-
 .../resources/js/qpid/management/addQueue.js    |  13 ++-
 .../src/main/java/resources/showQueue.html      |  16 +--
 .../org/apache/qpid/systest/rest/Asserts.java   |   2 +-
 .../server/queue/MessageGroupQueueTest.java     | 112 ++++++++++---------
 24 files changed, 341 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java b/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
index a920d89..c7d28d1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
@@ -33,6 +33,8 @@ public interface AMQMessageHeader
 
     String getAppId();
 
+    String getGroupId();
+
     String getMessageId();
 
     String getMimeType();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
index 374e3df..a353a93 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
@@ -132,6 +132,13 @@ public final class InternalMessageHeader implements AMQMessageHeader, Serializab
     }
 
     @Override
+    public String getGroupId()
+    {
+        final Object jmsxGroupId = _headers.get("JMSXGroupID");
+        return jmsxGroupId == null ? null : String.valueOf(jmsxGroupId);
+    }
+
+    @Override
     public String getMessageId()
     {
         return _messageId;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 855603e..e48c54a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -33,13 +33,7 @@ import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInfo;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.CreatingLinkInfo;
-import org.apache.qpid.server.queue.NotificationCheck;
-import org.apache.qpid.server.queue.QueueConsumer;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntryIterator;
-import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.server.queue.*;
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.util.Deletable;
@@ -70,7 +64,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
     String EXCLUSIVE = "exclusive";
     String MESSAGE_DURABILITY = "messageDurability";
     String MESSAGE_GROUP_KEY = "messageGroupKey";
-    String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups";
+    String MESSAGE_GROUP_TYPE = "messageGroupType";
     String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup";
     String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts";
     String NO_LOCAL = "noLocal";
@@ -159,8 +153,12 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
     @ManagedAttribute( defaultValue = "${queue.maximumDistinctGroups}")
     int getMaximumDistinctGroups();
 
-    @ManagedAttribute
-    boolean isMessageGroupSharedGroups();
+    @SuppressWarnings("unused")
+    @ManagedContextDefault( name = "queue.messageGroupType")
+    MessageGroupType DEFAULT_MESSAGE_GROUP_TYPE = MessageGroupType.NONE;
+
+    @ManagedAttribute( defaultValue = "${queue.messageGroupType}")
+    MessageGroupType getMessageGroupType();
 
     @SuppressWarnings("unused")
     @ManagedContextDefault( name = "queue.maximumDeliveryAttempts")

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 565a0dd..9b56cbc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -133,8 +133,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
 
     private static final Logger _logger = LoggerFactory.getLogger(AbstractQueue.class);
 
-    public static final String SHARED_MSG_GROUP_ARG_VALUE = "1";
-
     private static final QueueNotificationListener NULL_NOTIFICATION_LISTENER = new QueueNotificationListener()
     {
         @Override
@@ -229,6 +227,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     @ManagedAttributeField
     private boolean _messageGroupSharedGroups;
     @ManagedAttributeField
+    private MessageGroupType _messageGroupType;
+    @ManagedAttributeField
     private String _messageGroupDefaultGroup;
     @ManagedAttributeField
     private int _maximumDistinctGroups;
@@ -486,21 +486,20 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         getEventLogger().message(_logSubject,
                                  getCreatedLogMessage());
 
-        if(getMessageGroupKey() != null)
+        switch(getMessageGroupType())
         {
-            if(isMessageGroupSharedGroups())
-            {
+            case NONE:
+                _messageGroupManager = null;
+                break;
+            case STANDARD:
+                _messageGroupManager = new AssignedConsumerMessageGroupManager(getMessageGroupKey(), getMaximumDistinctGroups());
+                break;
+            case SHARED_GROUPS:
                 _messageGroupManager =
                         new DefinedGroupMessageGroupManager(getMessageGroupKey(), getMessageGroupDefaultGroup(), this);
-            }
-            else
-            {
-                _messageGroupManager = new AssignedConsumerMessageGroupManager(getMessageGroupKey(), getMaximumDistinctGroups());
-            }
-        }
-        else
-        {
-            _messageGroupManager = null;
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown messageGroupType type " + _messageGroupType);
         }
 
         _mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
@@ -3048,9 +3047,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     }
 
     @Override
-    public boolean isMessageGroupSharedGroups()
+    public MessageGroupType getMessageGroupType()
     {
-        return _messageGroupSharedGroups;
+        return _messageGroupType;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
index 38eac28..96671e5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
@@ -27,6 +27,8 @@ import java.util.concurrent.ConcurrentMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.message.AMQMessageHeader;
+
 
 public class AssignedConsumerMessageGroupManager implements MessageGroupManager
 {
@@ -56,7 +58,8 @@ public class AssignedConsumerMessageGroupManager implements MessageGroupManager
     @Override
     public boolean mightAssign(final QueueEntry entry, QueueConsumer sub)
     {
-        Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+        final AMQMessageHeader messageHeader = entry.getMessage().getMessageHeader();
+        Object groupVal = _groupId == null ? messageHeader.getGroupId() : messageHeader.getHeader(_groupId);
 
         if(groupVal == null)
         {
@@ -77,7 +80,8 @@ public class AssignedConsumerMessageGroupManager implements MessageGroupManager
 
     private boolean assignMessage(QueueConsumer<?,?> sub, QueueEntry entry)
     {
-        Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+        final AMQMessageHeader messageHeader = entry.getMessage().getMessageHeader();
+        Object groupVal = _groupId == null ? messageHeader.getGroupId() : messageHeader.getHeader(_groupId);
         if(groupVal == null)
         {
             return true;
@@ -132,13 +136,14 @@ public class AssignedConsumerMessageGroupManager implements MessageGroupManager
                 return false;
             }
 
-            Object groupId = entry.getMessage().getMessageHeader().getHeader(_groupId);
-            if(groupId == null)
+            final AMQMessageHeader messageHeader = entry.getMessage().getMessageHeader();
+            Object groupVal = _groupId == null ? messageHeader.getGroupId() : messageHeader.getHeader(_groupId);
+            if(groupVal == null)
             {
                 return false;
             }
 
-            Integer group = groupId.hashCode() & _groupMask;
+            Integer group = groupVal.hashCode() & _groupMask;
             QueueConsumer<?,?> assignedSub = _groupMap.get(group);
             if(assignedSub == _sub)
             {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
index c8e790e..06803a7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
@@ -20,21 +20,24 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
-import org.apache.qpid.server.message.MessageInstance.EntryState;
-import org.apache.qpid.server.util.StateChangeListener;
-
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.ServerMessage;
-
 import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
+import org.apache.qpid.server.message.MessageInstance.EntryState;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.util.StateChangeListener;
+
 public class DefinedGroupMessageGroupManager implements MessageGroupManager
 {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DefinedGroupMessageGroupManager.class);
     private final String _groupId;
     private final String _defaultGroup;
     private final Map<Object, Group> _groupMap = new HashMap<>();
@@ -250,7 +253,11 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
     {
         ServerMessage message = entry.getMessage();
         AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader();
-        Object groupVal = messageHeader == null ? _defaultGroup : messageHeader.getHeader(_groupId);
+        Object groupVal = messageHeader == null
+                ? _defaultGroup
+                : _groupId == null
+                        ? messageHeader.getGroupId()
+                        : messageHeader.getHeader(_groupId);
         if(groupVal == null)
         {
             groupVal = _defaultGroup;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupType.java b/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupType.java
new file mode 100644
index 0000000..f28affd
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupType.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public enum MessageGroupType
+{
+    NONE,
+    STANDARD,
+    SHARED_GROUPS
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
index aaf9bd1..0c6deb9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -38,56 +38,57 @@ public class QueueArgumentsConverter
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(QueueArgumentsConverter.class);
 
-    public static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity";
-    public static final String X_QPID_CAPACITY = "x-qpid-capacity";
-    public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap";
-    public static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count";
-    public static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size";
-    public static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age";
-    public static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth";
-
-    public static final String QPID_ALERT_COUNT = "qpid.alert_count";
-    public static final String QPID_ALERT_SIZE = "qpid.alert_size";
-    public static final String QPID_ALERT_REPEAT_GAP = "qpid.alert_repeat_gap";
+    private static final String SHARED_MSG_GROUP_ARG_VALUE = "1";
+    private static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity";
+    private static final String X_QPID_CAPACITY = "x-qpid-capacity";
+    private static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap";
+    private static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count";
+    private static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size";
+    private static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age";
+    private static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth";
+
+    private static final String QPID_ALERT_COUNT = "qpid.alert_count";
+    private static final String QPID_ALERT_SIZE = "qpid.alert_size";
+    private static final String QPID_ALERT_REPEAT_GAP = "qpid.alert_repeat_gap";
 
     public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
 
     public static final String X_QPID_DESCRIPTION = "x-qpid-description";
 
-    public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
+    private static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
 
-    public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
-    public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
-    public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
-    public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
-    public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
-    public static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group";
+    private static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
+    static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
+    private static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
+    static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
+    static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
+    private static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group";
 
-    public static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
+    private static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
 
-    public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
+    private static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
 
-    public static final String QPID_DEFAULT_FILTERS = "qpid.default_filters";
+    private static final String QPID_DEFAULT_FILTERS = "qpid.default_filters";
 
-    public static final String QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS = "qpid.ensure_nondestructive_consumers";
+    private static final String QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS = "qpid.ensure_nondestructive_consumers";
 
-    public static final String QPID_EXCLUSIVITY_POLICY = "qpid.exclusivity_policy";
-    public static final String QPID_LIFETIME_POLICY = "qpid.lifetime_policy";
+    private static final String QPID_EXCLUSIVITY_POLICY = "qpid.exclusivity_policy";
+    private static final String QPID_LIFETIME_POLICY = "qpid.lifetime_policy";
 
-    public static final String QPID_POLICY_TYPE = "qpid.policy_type";
-    public static final String QPID_MAX_COUNT = "qpid.max_count";
-    public static final String QPID_MAX_SIZE = "qpid.max_size";
+    private static final String QPID_POLICY_TYPE = "qpid.policy_type";
+    private static final String QPID_MAX_COUNT = "qpid.max_count";
+    private static final String QPID_MAX_SIZE = "qpid.max_size";
 
     /**
      * No-local queue argument is used to support the no-local feature of Durable Subscribers.
      */
-    public static final String QPID_NO_LOCAL = "no-local";
+    private static final String QPID_NO_LOCAL = "no-local";
 
-    static final Map<String, String> ATTRIBUTE_MAPPINGS = new LinkedHashMap<String, String>();
+    private static final Map<String, String> ATTRIBUTE_MAPPINGS = new LinkedHashMap<String, String>();
 
     private static final String ALTERNATE_EXCHANGE = "alternateExchange";
     private static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
-    private static String PROPERTY_DEAD_LETTER_QUEUE_SUFFIX = "qpid.broker_dead_letter_queue_suffix";
+    private static final String PROPERTY_DEAD_LETTER_QUEUE_SUFFIX = "qpid.broker_dead_letter_queue_suffix";
 
     static
     {
@@ -150,10 +151,14 @@ public class QueueArgumentsConverter
                 modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.valueOf(String.valueOf(wireArguments.get(QPID_POLICY_TYPE)).toUpperCase()));
             }
 
-            if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP))
+            if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP)
+               && SHARED_MSG_GROUP_ARG_VALUE.equals(String.valueOf(wireArguments.get(QPID_SHARED_MSG_GROUP))))
             {
-                modelArguments.put(Queue.MESSAGE_GROUP_SHARED_GROUPS,
-                                   AbstractQueue.SHARED_MSG_GROUP_ARG_VALUE.equals(String.valueOf(wireArguments.get(QPID_SHARED_MSG_GROUP))));
+                modelArguments.put(Queue.MESSAGE_GROUP_TYPE, MessageGroupType.SHARED_GROUPS);
+            }
+            else if(wireArguments.containsKey(QPID_GROUP_HEADER_KEY))
+            {
+                modelArguments.put(Queue.MESSAGE_GROUP_TYPE, MessageGroupType.STANDARD);
             }
 
 
@@ -224,9 +229,9 @@ public class QueueArgumentsConverter
             }
         }
 
-        if(Boolean.TRUE.equals(modelArguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)))
+        if(MessageGroupType.SHARED_GROUPS.equals(modelArguments.get(Queue.MESSAGE_GROUP_TYPE)))
         {
-            wireArguments.put(QPID_SHARED_MSG_GROUP, AbstractQueue.SHARED_MSG_GROUP_ARG_VALUE);
+            wireArguments.put(QPID_SHARED_MSG_GROUP, SHARED_MSG_GROUP_ARG_VALUE);
         }
 
         return wireArguments;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
index 1572600..f53fa24 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
@@ -745,6 +745,25 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS
                     attributes.remove("bindings");
                 }
 
+                if(attributes.containsKey("messageGroupKey"))
+                {
+                    if(attributes.containsKey("messageGroupSharedGroups")
+                       && convertAttributeValueToBoolean("messageGroupSharedGroups",
+                                                         attributes.remove("messageGroupSharedGroups")))
+                    {
+                        attributes.put("messageGroupType", "SHARED_GROUPS");
+
+                    }
+                    else
+                    {
+                        attributes.put("messageGroupType", "STANDARD");
+                    }
+                }
+                else
+                {
+                    attributes.put("messageGroupType", "NONE");
+                }
+
                 _queues.put(record.getId(), (String) attributes.get("name"));
 
                 if (!attributes.equals(new HashMap<>(record.getAttributes())) || addToUpdateMap)
@@ -788,6 +807,43 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS
             return value;
         }
 
+        private boolean convertAttributeValueToBoolean(final String attributeName,
+                                                       final Object attributeValue)
+        {
+            boolean value;
+            if (attributeValue instanceof Boolean)
+            {
+                value = (Boolean) attributeValue;
+            }
+            else if (attributeValue instanceof String)
+            {
+                String strValue = (String)attributeValue;
+                if(strValue.equalsIgnoreCase("true"))
+                {
+                    value = true;
+                }
+                else if(strValue.equalsIgnoreCase("false"))
+                {
+                    value = false;
+                }
+                else
+                {
+                    throw new IllegalConfigurationException(String.format(
+                            "Cannot evaluate '%s': %s",
+                            attributeName, attributeValue));
+                }
+
+            }
+            else
+            {
+                throw new IllegalConfigurationException(String.format("Cannot evaluate '%s': %s",
+                                                                      attributeName,
+                                                                      String.valueOf(attributeValue)));
+            }
+            return value;
+        }
+
+
         @Override
         public void complete()
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 230638c..4126c84 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -1800,6 +1800,13 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         }
 
         @Override
+        public String getGroupId()
+        {
+            Object jmsXGroupId = getHeader("JMSXGroupID");
+            return jmsXGroupId == null ? null : String.valueOf(jmsXGroupId);
+        }
+
+        @Override
         public String getMessageId()
         {
             return _message.getMessageId();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
index 5f5e573..68152d3 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -28,7 +28,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.qpid.server.exchange.ExchangeDefaults;
 import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.logging.EventLogger;
@@ -75,6 +74,12 @@ public class HeadersBindingTest extends QpidTestCase
         }
 
         @Override
+        public String getGroupId()
+        {
+            return null;
+        }
+
+        @Override
         public String getMessageId()
         {
             return null;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
index f5fc87b..0c591da 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.SystemConfig;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.queue.MessageGroupType;
 import org.apache.qpid.server.queue.PriorityQueue;
 import org.apache.qpid.server.queue.PriorityQueueImpl;
 import org.apache.qpid.server.queue.StandardQueueImpl;
@@ -178,11 +179,11 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
         attributes.put(Queue.ID, UUID.randomUUID());
         attributes.put(Queue.NAME, getTestName());
         attributes.put(Queue.MESSAGE_GROUP_KEY,"mykey");
-        attributes.put(Queue.MESSAGE_GROUP_SHARED_GROUPS, true);
+        attributes.put(Queue.MESSAGE_GROUP_TYPE, MessageGroupType.SHARED_GROUPS);
 
         Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
         assertEquals("mykey", queue.getAttribute(Queue.MESSAGE_GROUP_KEY));
-        assertEquals(Boolean.TRUE, queue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS));
+        assertEquals(MessageGroupType.SHARED_GROUPS, queue.getAttribute(Queue.MESSAGE_GROUP_TYPE));
     }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
index 1ea0c12..0610d09 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
@@ -185,6 +185,12 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess
         }
 
         @Override
+        public String getGroupId()
+        {
+            return _delegate.getGroupId();
+        }
+
+        @Override
         public String getMessageId()
         {
             return _delegate.getMessageId();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
index 5eb8406..6156781 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
@@ -42,7 +42,6 @@ class MessageTransferHeader implements AMQMessageHeader
     private final DeliveryProperties _deliveryProps;
     private final MessageProperties _messageProps;
     private final long _arrivalTime;
-    private long _notValidBefore;
 
     public MessageTransferHeader(DeliveryProperties deliveryProps,
                                  MessageProperties messageProps,
@@ -100,6 +99,13 @@ class MessageTransferHeader implements AMQMessageHeader
     }
 
     @Override
+    public String getGroupId()
+    {
+        Object jmsXGroupId = getHeader("JMSXGroupID");
+        return jmsXGroupId == null ? null : String.valueOf(jmsXGroupId);
+    }
+
+    @Override
     public String getMessageId()
     {
         UUID id = _messageProps == null ? null : _messageProps.getMessageId();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
index dbd2194..2cb6b9a 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
@@ -28,7 +28,6 @@ import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtil
 
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -232,6 +231,12 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe
         }
 
         @Override
+        public String getGroupId()
+        {
+            return _delegate.getGroupId();
+        }
+
+        @Override
         public String getMessageId()
         {
             return _delegate.getMessageId();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index 00953b9..d5baf87 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -241,6 +241,13 @@ public class MessageMetaData implements StorableMessageMetaData
         }
 
         @Override
+        public String getGroupId()
+        {
+            String jmsXGroupId = getProperties().getHeaders().getString("JMSXGroupID");
+            return jmsXGroupId == null ? null : jmsXGroupId;
+        }
+
+        @Override
         public String getCorrelationId()
         {
             return getProperties().getCorrelationIdAsString();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
index 6d59a15..b82b132 100755
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
@@ -629,6 +629,19 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
         }
 
         @Override
+        public String getGroupId()
+        {
+            if (_propertiesSection == null || _propertiesSection.getValue().getGroupId() == null)
+            {
+                return null;
+            }
+            else
+            {
+                return _propertiesSection.getValue().getGroupId();
+            }
+        }
+
+        @Override
         public String getUserId()
         {
             if (_propertiesSection == null || _propertiesSection.getValue().getUserId() == null)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index f4e7b34..4790b78 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -1859,6 +1859,12 @@ class ManagementNode implements MessageSource, MessageDestination, BaseQueue
         }
 
         @Override
+        public String getGroupId()
+        {
+            return null;
+        }
+
+        @Override
         public String getMessageId()
         {
             return _messageId;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/management-http/src/main/java/resources/addQueue.html
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/addQueue.html b/broker-plugins/management-http/src/main/java/resources/addQueue.html
index f726a3a..18fd1d1 100644
--- a/broker-plugins/management-http/src/main/java/resources/addQueue.html
+++ b/broker-plugins/management-http/src/main/java/resources/addQueue.html
@@ -316,6 +316,19 @@
                     <div class="infoMessage">Configuring maximum delivery retries on a queue which has no alternate binding (DLQ or exchange) <br/> will result in messages being discarded after the limit is reached.</div>
 
                     <div class="clear">
+                        <div class="formLabel-labelCell">Message Group Type:</div>
+                        <div class="formLabel-controlCell">
+                            <input type="text" id="formAddQueue.messageGroupType"
+                                   data-dojo-type="dijit/form/FilteringSelect"
+                                   data-dojo-props="
+                          name: 'messageGroupType',
+                          required: false,
+                          promptMessage: 'Select message grouping type',
+                          title: 'Select message grouping type'"/>
+                        </div>
+
+                    </div>
+                    <div class="clear">
                         <div class="formLabel-labelCell">Message Group Key:</div>
                         <div class="formLabel-controlCell">
                             <input type="text" id="formAddQueue.messageGroupKey"
@@ -329,18 +342,6 @@
                         </div>
                     </div>
                     <div class="clear">
-                        <div class="formLabel-labelCell">Shared Message Groups?</div>
-                        <div class="formLabel-controlCell">
-                            <input type="checkbox" id="formAddQueue.messageGroupSharedGroups"
-                                   dojoType="dijit.form.CheckBox"
-                                   data-dojo-props="
-                                  name: 'messageGroupSharedGroups',
-                                  value: 'messageGroupSharedGroups',
-                                  checked: false,
-                                  title: 'Controls where a shared groups feature is enabled'"/>
-                        </div>
-                    </div>
-                    <div class="clear">
                         <div class="formLabel-labelCell">Hold on Publish Enabled?</div>
                         <div class="formLabel-controlCell">
                             <input type="checkbox" id="formAddQueue.holdOnPublishEnabled"

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
index 5a21ab9..91f8793 100644
--- a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
+++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
@@ -411,7 +411,7 @@ define(["dojo/_base/declare",
                         "alternateBinding",
                         "messageGroups",
                         "messageGroupKey",
-                        "messageGroupSharedGroups",
+                        "messageGroupType",
                         "maximumDeliveryAttempts",
                         "holdOnPublishEnabled"]);
 
@@ -579,11 +579,11 @@ define(["dojo/_base/declare",
             {
                 this.maximumQueueDepth.style.display = "none";
             }
-            if (this.queueData["messageGroupKey"])
+            var messageGroupType = this.queueData["messageGroupType"];
+            this["messageGroupType"].innerHTML = entities.encode(messageGroupType);
+            if (this.queueData["messageGroupKey"] || (messageGroupType && messageGroupType !== "NONE"))
             {
                 this.messageGroupKey.innerHTML = entities.encode(String(this.queueData["messageGroupKey"]));
-                this.messageGroupSharedGroups.innerHTML =
-                    entities.encode(String(this.queueData["messageGroupSharedGroups"]));
                 this.messageGroups.style.display = "block";
             }
             else

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
index d59ea25..337c94d 100644
--- a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
+++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
@@ -92,6 +92,7 @@ define(["dojo/dom",
                 this.queueType = registry.byId("formAddQueue.type");
                 this.context = registry.byId("formAddQueue.context");
                 this.overflowPolicyWidget = registry.byId("formAddQueue.overflowPolicy");
+                this.messageGroupTypeWidget = registry.byId("formAddQueue.messageGroupType");
                 this.editNodeBanner = dom.byId("addQueue.editNoteBanner");
 
 
@@ -215,10 +216,16 @@ define(["dojo/dom",
             {
                 this.alternateBindingLoadPromise.then(lang.hitch(this, function ()
                 {
-                    var validValues = this.management.metadata.getMetaData("Queue",
+                    var validOverflowValues = this.management.metadata.getMetaData("Queue",
                         this.initialData.type).attributes.overflowPolicy.validValues;
-                    var validValueStore = util.makeTypeStore(validValues);
-                    this.overflowPolicyWidget.set("store", validValueStore);
+                    var validOverflowValueStore = util.makeTypeStore(validOverflowValues);
+                    this.overflowPolicyWidget.set("store", validOverflowValueStore);
+
+                    var validGroupingValues = this.management.metadata.getMetaData("Queue",
+                        this.initialData.type).attributes.messageGroupType.validValues;
+                    var validGroupingValueStore = util.makeTypeStore(validGroupingValues);
+                    this.messageGroupTypeWidget.set("store", validGroupingValueStore);
+
 
                     util.applyToWidgets(this.form.domNode,
                         "Queue",

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/management-http/src/main/java/resources/showQueue.html
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/showQueue.html b/broker-plugins/management-http/src/main/java/resources/showQueue.html
index 87d7ce2..7424614 100644
--- a/broker-plugins/management-http/src/main/java/resources/showQueue.html
+++ b/broker-plugins/management-http/src/main/java/resources/showQueue.html
@@ -93,14 +93,14 @@
             <div class="minimumMessageTtl"></div>
         </div>
         <div class="clear messageGroups">
-          <div class="clear">
-            <div class="messageGroupKeyLabel formLabel-labelCell ">Message Group Key:</div>
-            <div class="messageGroupKey"></div>
-          </div>
-          <div class="clear">
-            <div class="messageGroupSharedGroupsLabel formLabel-labelCell">Shared Message Groups:</div>
-            <div class="messageGroupSharedGroups"></div>
-          </div>
+            <div class="clear">
+                <div class="messageGroupTypeLabel formLabel-labelCell">Message Group Type:</div>
+                <div class="messageGroupType"></div>
+            </div>
+            <div class="clear">
+                <div class="messageGroupKeyLabel formLabel-labelCell ">Message Group Key:</div>
+                <div class="messageGroupKey"></div>
+            </div>
         </div>
         <div class="clear"></div>
     </div>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
index 0689eaa..1c36eec 100644
--- a/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
+++ b/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
@@ -123,7 +123,7 @@ public class Asserts
                                         LastValueQueue.LVQ_KEY,
                                         SortedQueue.SORT_KEY,
                                         Queue.MESSAGE_GROUP_KEY,
-                                        Queue.MESSAGE_GROUP_SHARED_GROUPS,
+                                        Queue.MESSAGE_GROUP_TYPE,
                                         PriorityQueue.PRIORITIES,
                                         ConfiguredObject.CONTEXT,
                                         ConfiguredObject.DESIRED_STATE,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java b/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
index f44ffe6..577bc2d 100644
--- a/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
+++ b/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
@@ -77,15 +77,25 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
 
     public void testSimpleGroupAssignment() throws Exception
     {
-        simpleGroupAssignment(false);
+        simpleGroupAssignment(false, false);
     }
 
     public void testSharedGroupSimpleGroupAssignment() throws Exception
     {
-        simpleGroupAssignment(true);
+        simpleGroupAssignment(true, false);
     }
 
 
+    public void testSimpleGroupAssignmentWithJMSXGroupID() throws Exception
+    {
+        simpleGroupAssignment(false, true);
+    }
+
+    public void testSharedGroupSimpleGroupAssignmentWithJMSXGroupID() throws Exception
+    {
+        simpleGroupAssignment(true, true);
+    }
+
     /**
      * Pre populate the queue with messages with groups as follows
      *
@@ -113,15 +123,15 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
      *  c1 ack --->
      *
      */
-    private void simpleGroupAssignment(boolean sharedGroups) throws QpidException, JMSException
+    private void simpleGroupAssignment(boolean sharedGroups, final boolean useDefaultGroup) throws QpidException, JMSException
     {
-        createQueueAndProducer(sharedGroups);
+        createQueueAndProducer(sharedGroups, useDefaultGroup);
 
         String[] groups = { "ONE", "TWO"};
 
         for (int msg = 0; msg < 4; msg++)
         {
-            producer.send(createMessage(msg, groups[msg % groups.length]));
+            producer.send(createMessage(msg, groups[msg % groups.length], useDefaultGroup));
         }
         producerSession.commit();
         producer.close();
@@ -149,15 +159,15 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
         Message cs2Received2 = consumer2.receive(getReceiveTimeout());
 
         assertNotNull("Consumer 2 should have received second message", cs2Received2);
-        assertEquals("Differing groups", cs2Received2.getStringProperty("group"),
-                     cs2Received.getStringProperty("group"));
+        assertEquals("Differing groups", cs2Received2.getStringProperty(useDefaultGroup ? "JMSXGroupID" :"group"),
+                     cs2Received.getStringProperty(useDefaultGroup ? "JMSXGroupID" :"group"));
 
         cs1Received.acknowledge();
         Message cs1Received2 = consumer1.receive(getReceiveTimeout());
 
         assertNotNull("Consumer 1 should have received second message", cs1Received2);
-        assertEquals("Differing groups", cs1Received2.getStringProperty("group"),
-                     cs1Received.getStringProperty("group"));
+        assertEquals("Differing groups", cs1Received2.getStringProperty(useDefaultGroup ? "JMSXGroupID" :"group"),
+                     cs1Received.getStringProperty(useDefaultGroup ? "JMSXGroupID" : "group"));
 
         cs1Received2.acknowledge();
         cs2Received2.acknowledge();
@@ -166,20 +176,21 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
         assertNull(consumer2.receive(getShortReceiveTimeout()));
     }
 
-    private void createQueueAndProducer(final boolean sharedGroups) throws QpidException, JMSException
+    private void createQueueAndProducer(final boolean sharedGroups, final boolean useDefaultKey) throws QpidException, JMSException
     {
-        if(isBroker10())
+        if(isBroker10() || useDefaultKey)
         {
             final Map<String, Object> arguments = new HashMap<>();
-            arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_GROUP_KEY, "group");
-            arguments.put(ConfiguredObject.DURABLE, "false");
-            arguments.put(ConfiguredObject.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.toString());
-            if(sharedGroups)
+            if(!useDefaultKey)
             {
-                arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_GROUP_SHARED_GROUPS, "true");
+                arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_GROUP_KEY, "group");
             }
+            arguments.put(ConfiguredObject.DURABLE, "false");
+            arguments.put(ConfiguredObject.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.toString());
+            arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_GROUP_TYPE, sharedGroups ? MessageGroupType.SHARED_GROUPS.name() : MessageGroupType.STANDARD.name());
+
             createEntityUsingAmqpManagement(QUEUE, producerSession, "org.apache.qpid.Queue", arguments);
-            queue = producerSession.createQueue(QUEUE);
+            queue = producerSession.createQueue(isBroker10() ? QUEUE : "ADDR:"+QUEUE+" ; {assert : never, node: { type: queue } }");
         }
         else
         {
@@ -204,12 +215,12 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
 
     public void testConsumerCloseGroupAssignment() throws Exception
     {
-        consumerCloseGroupAssignment(false);
+        consumerCloseGroupAssignment(false, false);
     }
 
     public void testSharedGroupConsumerCloseGroupAssignment() throws Exception
     {
-        consumerCloseGroupAssignment(true);
+        consumerCloseGroupAssignment(true, false);
     }
 
     /**
@@ -229,14 +240,14 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
      * requires c2 to go "backwards" in the queue).
      *
      **/
-    private void consumerCloseGroupAssignment(boolean sharedGroups) throws QpidException, JMSException
+    private void consumerCloseGroupAssignment(boolean sharedGroups, final boolean useDefaultGroup) throws QpidException, JMSException
     {
-        createQueueAndProducer(sharedGroups);
+        createQueueAndProducer(sharedGroups, false);
 
-        producer.send(createMessage(1, "ONE"));
-        producer.send(createMessage(2, "ONE"));
-        producer.send(createMessage(3, "TWO"));
-        producer.send(createMessage(4, "ONE"));
+        producer.send(createMessage(1, "ONE", useDefaultGroup));
+        producer.send(createMessage(2, "ONE", useDefaultGroup));
+        producer.send(createMessage(3, "TWO", useDefaultGroup));
+        producer.send(createMessage(4, "ONE", useDefaultGroup));
 
         producerSession.commit();
         producer.close();
@@ -285,7 +296,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
         Message cs2Received3 = consumer2.receive(getReceiveTimeout());
 
         assertNotNull("Consumer 2 should have received second message", cs2Received3);
-        assertEquals("Unexpected group", "ONE", cs2Received3.getStringProperty("group"));
+        assertEquals("Unexpected group", "ONE", cs2Received3.getStringProperty(useDefaultGroup ? "JMSXGroupID" : "group"));
         assertEquals("incorrect message received", 2, cs2Received3.getIntProperty("msg"));
 
         if(is010)
@@ -301,7 +312,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
         Message cs2Received4 = consumer2.receive(getReceiveTimeout());
 
         assertNotNull("Consumer 2 should have received third message", cs2Received4);
-        assertEquals("Unexpected group", "ONE", cs2Received4.getStringProperty("group"));
+        assertEquals("Unexpected group", "ONE", cs2Received4.getStringProperty(useDefaultGroup ? "JMSXGroupID" : "group"));
         assertEquals("incorrect message received", 4, cs2Received4.getIntProperty("msg"));
         if(is010)
         {
@@ -320,12 +331,12 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
     
     public void testConsumerCloseWithRelease() throws Exception
     {
-        consumerCloseWithRelease(false);
+        consumerCloseWithRelease(false, false);
     }
 
     public void testSharedGroupConsumerCloseWithRelease() throws Exception
     {
-        consumerCloseWithRelease(true);
+        consumerCloseWithRelease(true, false);
     }
 
 
@@ -346,14 +357,14 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
      * requires c2 to go "backwards" in the queue). The first such message should be marked as redelivered
      *
      */
-    private void consumerCloseWithRelease(boolean sharedGroups) throws QpidException, JMSException
+    private void consumerCloseWithRelease(boolean sharedGroups, final boolean useDefaultGroup) throws QpidException, JMSException
     {
-        createQueueAndProducer(sharedGroups);
+        createQueueAndProducer(sharedGroups, false);
 
-        producer.send(createMessage(1, "ONE"));
-        producer.send(createMessage(2, "ONE"));
-        producer.send(createMessage(3, "TWO"));
-        producer.send(createMessage(4, "ONE"));
+        producer.send(createMessage(1, "ONE", useDefaultGroup));
+        producer.send(createMessage(2, "ONE", useDefaultGroup));
+        producer.send(createMessage(3, "TWO", useDefaultGroup));
+        producer.send(createMessage(4, "ONE", useDefaultGroup));
 
         producerSession.commit();
         producer.close();
@@ -398,7 +409,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
         received = consumer2.receive(getReceiveTimeout());
 
         assertNotNull("Consumer 2 should now have received second message", received);
-        assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
+        assertEquals("Unexpected group", "ONE", received.getStringProperty(useDefaultGroup ? "JMSXGroupID" : "group"));
         assertEquals("incorrect message received", 1, received.getIntProperty("msg"));
         assertTrue("Expected second message to be marked as redelivered " + received.getIntProperty("msg"),
                    received.getJMSRedelivered());
@@ -415,7 +426,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
         received = consumer2.receive(getReceiveTimeout());
 
         assertNotNull("Consumer 2 should have received a third message", received);
-        assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
+        assertEquals("Unexpected group", "ONE", received.getStringProperty(useDefaultGroup ? "JMSXGroupID" : "group"));
         assertEquals("incorrect message received", 2, received.getIntProperty("msg"));
 
         if(is010)
@@ -430,7 +441,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
         received = consumer2.receive(getReceiveTimeout());
 
         assertNotNull("Consumer 2 should have received a fourth message", received);
-        assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
+        assertEquals("Unexpected group", "ONE", received.getStringProperty(useDefaultGroup ? "JMSXGroupID" : "group"));
         assertEquals("incorrect message received", 4, received.getIntProperty("msg"));
 
         if(is010)
@@ -447,22 +458,22 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
 
     public void testGroupAssignmentSurvivesEmpty() throws JMSException, QpidException
     {
-        groupAssignmentOnEmpty(false);
+        groupAssignmentOnEmpty(false, false);
     }
 
     public void testSharedGroupAssignmentDoesNotSurviveEmpty() throws JMSException, QpidException
     {
-        groupAssignmentOnEmpty(true);
+        groupAssignmentOnEmpty(true, false);
     }
 
-    private void groupAssignmentOnEmpty(boolean sharedGroups) throws QpidException, JMSException
+    private void groupAssignmentOnEmpty(boolean sharedGroups, final boolean useDefaultGroup) throws QpidException, JMSException
     {
-        createQueueAndProducer(sharedGroups);
+        createQueueAndProducer(sharedGroups, useDefaultGroup);
 
-        producer.send(createMessage(1, "ONE"));
-        producer.send(createMessage(2, "TWO"));
-        producer.send(createMessage(3, "THREE"));
-        producer.send(createMessage(4, "ONE"));
+        producer.send(createMessage(1, "ONE", useDefaultGroup));
+        producer.send(createMessage(2, "TWO", useDefaultGroup));
+        producer.send(createMessage(3, "THREE", useDefaultGroup));
+        producer.send(createMessage(4, "ONE", useDefaultGroup));
 
         producerSession.commit();
         producer.close();
@@ -561,11 +572,11 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
 
     }
 
-    private Message createMessage(int msg, String group) throws JMSException
+    private Message createMessage(int msg, String group, final boolean useDefaultGroup) throws JMSException
     {
         Message send = producerSession.createTextMessage("Message: " + msg);
         send.setIntProperty("msg", msg);
-        send.setStringProperty("group", group);
+        send.setStringProperty(useDefaultGroup ? "JMSXGroupID" : "group", group);
 
         return send;
     }
@@ -580,7 +591,8 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
 
         consumerConnection = getConnectionWithPrefetch(1);
 
-        createQueueAndProducer(true);
+        final boolean useDefaultGroup = false;
+        createQueueAndProducer(true, useDefaultGroup);
 
         int numMessages = 100;
         SharedGroupTestMessageListener groupingTestMessageListener = new SharedGroupTestMessageListener(numMessages);
@@ -602,7 +614,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
 
         for(int i = 1; i <= numMessages; i++)
         {
-            producer.send(createMessage(i, "GROUP"));
+            producer.send(createMessage(i, "GROUP", useDefaultGroup));
         }
         producerSession.commit();
         producer.close();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message