activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] branch master updated: [ARTEMIS-2863] Add support to pause dispatch when group rebalance
Date Wed, 19 Aug 2020 16:04:59 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c506cc  [ARTEMIS-2863] Add support to pause dispatch when group rebalance
     new b766969  This closes #3230
2c506cc is described below

commit 2c506cc52a8eca3fb0d7ee3a5a4e5bf4f3b82d72
Author: Michael Pearce <michael.andre.pearce@me.com>
AuthorDate: Tue Aug 4 18:50:49 2020 +0100

    [ARTEMIS-2863] Add support to pause dispatch when group rebalance
    
    Add test case
    Add implementation
    Add docs
---
 .../activemq/artemis/api/core/QueueAttributes.java |  15 +++
 .../artemis/api/core/QueueConfiguration.java       |  18 +++
 .../apache/activemq/artemis/logs/AuditLogger.java  |   8 ++
 .../api/config/ActiveMQDefaultConfiguration.java   |   6 +
 .../artemis/api/core/client/ClientSession.java     |   2 +
 .../artemis/api/core/management/QueueControl.java  |   6 +
 .../artemis/core/client/impl/QueueQueryImpl.java   |  11 +-
 .../protocol/core/impl/ActiveMQSessionContext.java |   2 +-
 .../impl/wireformat/CreateQueueMessage_V2.java     |  26 ++++
 .../wireformat/CreateSharedQueueMessage_V2.java    |  24 ++++
 .../SessionQueueQueryResponseMessage_V3.java       |  30 ++++-
 .../artemis/core/server/QueueQueryResult.java      |   9 ++
 .../deployers/impl/FileConfigurationParser.java    |   8 ++
 .../management/impl/ActiveMQServerControlImpl.java |   1 +
 .../core/management/impl/QueueControlImpl.java     |  15 +++
 .../core/management/impl/view/QueueView.java       |   3 +
 .../artemis/core/persistence/QueueBindingInfo.java |   2 +
 .../journal/AbstractJournalStorageManager.java     |   2 +-
 .../codec/PersistentQueueBindingEncoding.java      |  19 +++
 .../core/postoffice/impl/PostOfficeImpl.java       |   4 +
 .../apache/activemq/artemis/core/server/Queue.java |   4 +
 .../core/server/impl/ActiveMQServerImpl.java       |   7 +-
 .../core/server/impl/QueueConfigurationUtils.java  |   1 +
 .../artemis/core/server/impl/QueueConsumers.java   |   3 +
 .../core/server/impl/QueueConsumersImpl.java       |   6 +
 .../artemis/core/server/impl/QueueImpl.java        |  63 +++++++--
 .../core/settings/impl/AddressSettings.java        |  41 +++++-
 .../resources/schema/artemis-configuration.xsd     |   9 ++
 .../impl/journal/QueueBindingEncodingTest.java     |   5 +
 .../server/impl/ScheduledDeliveryHandlerTest.java  |  10 ++
 .../src/test/resources/artemis-configuration.xsd   |   9 ++
 docs/user-manual/en/message-grouping.md            |  11 +-
 .../tests/integration/jms/client/GroupingTest.java | 149 +++++++++++++++++++++
 .../management/QueueControlUsingCoreTest.java      |   5 +
 .../tests/unit/core/postoffice/impl/FakeQueue.java |  10 ++
 35 files changed, 517 insertions(+), 27 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
index ca4d4d8..13dc12e 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
@@ -28,6 +28,7 @@ public class QueueAttributes implements Serializable {
    public static final String MAX_CONSUMERS = "max-consumers";
    public static final String EXCLUSIVE = "exclusive";
    public static final String GROUP_REBALANCE = "group-rebalance";
+   public static final String GROUP_REBALANCE_PAUSE_DISPATCH = "group-rebalance-pause-dispatch";
    public static final String GROUP_BUCKETS = "group-buckets";
    public static final String GROUP_FIRST_KEY = "group-first-key";
    public static final String LAST_VALUE = "last-value";
@@ -49,6 +50,7 @@ public class QueueAttributes implements Serializable {
    private Integer maxConsumers;
    private Boolean exclusive;
    private Boolean groupRebalance;
+   private Boolean groupRebalancePauseDispatch;
    private Integer groupBuckets;
    private SimpleString groupFirstKey;
    private Boolean lastValue;
@@ -93,6 +95,8 @@ public class QueueAttributes implements Serializable {
             setConsumerPriority(Integer.valueOf(value));
          } else if (key.equals(GROUP_REBALANCE)) {
             setGroupRebalance(Boolean.valueOf(value));
+         } else if (key.equals(GROUP_REBALANCE_PAUSE_DISPATCH)) {
+            setGroupRebalancePauseDispatch(Boolean.valueOf(value));
          } else if (key.equals(GROUP_BUCKETS)) {
             setGroupBuckets(Integer.valueOf(value));
          } else if (key.equals(GROUP_FIRST_KEY)) {
@@ -119,6 +123,7 @@ public class QueueAttributes implements Serializable {
          .setRingSize(this.getRingSize())
          .setEnabled(this.isEnabled())
          .setGroupRebalance(this.getGroupRebalance())
+         .setGroupRebalancePauseDispatch(this.getGroupRebalancePauseDispatch())
          .setNonDestructive(this.getNonDestructive())
          .setLastValue(this.getLastValue())
          .setFilterString(this.getFilterString())
@@ -146,6 +151,7 @@ public class QueueAttributes implements Serializable {
             .setRingSize(queueConfiguration.getRingSize())
             .setEnabled(queueConfiguration.isEnabled())
             .setGroupRebalance(queueConfiguration.isGroupRebalance())
+            .setGroupRebalancePauseDispatch(queueConfiguration.isGroupRebalancePauseDispatch())
             .setNonDestructive(queueConfiguration.isNonDestructive())
             .setLastValue(queueConfiguration.isLastValue())
             .setFilterString(queueConfiguration.getFilterString())
@@ -280,6 +286,15 @@ public class QueueAttributes implements Serializable {
       return this;
    }
 
+   public Boolean getGroupRebalancePauseDispatch() {
+      return groupRebalancePauseDispatch;
+   }
+
+   public QueueAttributes setGroupRebalancePauseDispatch(Boolean groupRebalancePauseDispatch) {
+      this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
+      return this;
+   }
+
    public Integer getGroupBuckets() {
       return groupBuckets;
    }
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java
index afe438e..2257721 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueConfiguration.java
@@ -56,6 +56,7 @@ public class QueueConfiguration implements Serializable {
    public static final String MAX_CONSUMERS = "max-consumers";
    public static final String EXCLUSIVE = "exclusive";
    public static final String GROUP_REBALANCE = "group-rebalance";
+   public static final String GROUP_REBALANCE_PAUSE_DISPATCH = "group-rebalance-pause-dispatch";
    public static final String GROUP_BUCKETS = "group-buckets";
    public static final String GROUP_FIRST_KEY = "group-first-key";
    public static final String LAST_VALUE = "last-value";
@@ -87,6 +88,7 @@ public class QueueConfiguration implements Serializable {
    private Integer maxConsumers;
    private Boolean exclusive;
    private Boolean groupRebalance;
+   private Boolean groupRebalancePauseDispatch;
    private Integer groupBuckets;
    private SimpleString groupFirstKey;
    private Boolean lastValue;
@@ -459,6 +461,15 @@ public class QueueConfiguration implements Serializable {
       return this;
    }
 
+   public Boolean isGroupRebalancePauseDispatch() {
+      return groupRebalancePauseDispatch;
+   }
+
+   public QueueConfiguration setGroupRebalancePauseDispatch(Boolean groupRebalancePauseDispatch) {
+      this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
+      return this;
+   }
+
    public Integer getGroupBuckets() {
       return groupBuckets;
    }
@@ -630,6 +641,9 @@ public class QueueConfiguration implements Serializable {
       if (isGroupRebalance() != null) {
          builder.add(GROUP_REBALANCE, isGroupRebalance());
       }
+      if (isGroupRebalancePauseDispatch() != null) {
+         builder.add(GROUP_REBALANCE_PAUSE_DISPATCH, isGroupRebalancePauseDispatch());
+      }
       if (getGroupBuckets() != null) {
          builder.add(GROUP_BUCKETS, getGroupBuckets());
       }
@@ -746,6 +760,8 @@ public class QueueConfiguration implements Serializable {
          return false;
       if (!Objects.equals(groupRebalance, that.groupRebalance))
          return false;
+      if (!Objects.equals(groupRebalancePauseDispatch, that.groupRebalancePauseDispatch))
+         return false;
       if (!Objects.equals(groupBuckets, that.groupBuckets))
          return false;
       if (!Objects.equals(groupFirstKey, that.groupFirstKey))
@@ -802,6 +818,7 @@ public class QueueConfiguration implements Serializable {
       result = 31 * result + Objects.hashCode(maxConsumers);
       result = 31 * result + Objects.hashCode(exclusive);
       result = 31 * result + Objects.hashCode(groupRebalance);
+      result = 31 * result + Objects.hashCode(groupRebalancePauseDispatch);
       result = 31 * result + Objects.hashCode(groupBuckets);
       result = 31 * result + Objects.hashCode(groupFirstKey);
       result = 31 * result + Objects.hashCode(lastValue);
@@ -838,6 +855,7 @@ public class QueueConfiguration implements Serializable {
          + ", maxConsumers=" + maxConsumers
          + ", exclusive=" + exclusive
          + ", groupRebalance=" + groupRebalance
+         + ", groupRebalancePauseDispatch=" + groupRebalancePauseDispatch
          + ", groupBuckets=" + groupBuckets
          + ", groupFirstKey=" + groupFirstKey
          + ", lastValue=" + lastValue
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
index 209402d..7fcc0ed 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
@@ -2729,4 +2729,12 @@ public interface AuditLogger extends BasicLogger {
    @LogMessage(level = Logger.Level.INFO)
    @Message(id = 601734, value = "User {0} failed to resume address {1}", format = Message.Format.MESSAGE_FORMAT)
    void resumeAddressFailure(String user, String queueName);
+
+   static void isGroupRebalancePauseDispatch(Object source) {
+      LOGGER.isGroupRebalancePauseDispatch(getCaller(), source);
+   }
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 601735, value = "User {0} is getting group rebalance pause dispatch property on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
+   void isGroupRebalancePauseDispatch(String user, Object source, Object... args);
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 60656e5..f7d33ff 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -534,6 +534,8 @@ public final class ActiveMQDefaultConfiguration {
 
    public static final boolean DEFAULT_GROUP_REBALANCE = false;
 
+   public static final boolean DEFAULT_GROUP_REBALANCE_PAUSE_DISPATCH = false;
+
    public static final SimpleString DEFAULT_GROUP_FIRST_KEY = null;
 
    public static final RoutingType DEFAULT_ROUTING_TYPE = RoutingType.MULTICAST;
@@ -1503,6 +1505,10 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_GROUP_REBALANCE;
    }
 
+   public static boolean getDefaultGroupRebalancePauseDispatch() {
+      return DEFAULT_GROUP_REBALANCE_PAUSE_DISPATCH;
+   }
+
    public static SimpleString getDefaultGroupFirstKey() {
       return DEFAULT_GROUP_FIRST_KEY;
    }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
index 5db79aa..d2c0f43 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
@@ -162,6 +162,8 @@ public interface ClientSession extends XAResource, AutoCloseable {
 
       Boolean isGroupRebalance();
 
+      Boolean isGroupRebalancePauseDispatch();
+
       Integer getGroupBuckets();
 
       SimpleString getGroupFirstKey();
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index bbb27c0..b07cc3c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -702,6 +702,12 @@ public interface QueueControl {
    boolean isGroupRebalance();
 
    /**
+    * Returns whether the dispatch is paused when groups of this queue are automatically rebalanced.
+    */
+   @Attribute(desc = "whether the dispatch is paused when groups of this queue are automatically rebalanced")
+   boolean isGroupRebalancePauseDispatch();
+
+   /**
     * Will return the group buckets.
     */
    @Attribute(desc = "Get the group buckets")
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
index d28f4f4..d532dc0 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
@@ -52,6 +52,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
 
    private final Boolean groupRebalance;
 
+   private final Boolean groupRebalancePauseDispatch;
+
    private final Integer groupBuckets;
 
    private final SimpleString groupFirstKey;
@@ -162,7 +164,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
                          final Long autoDeleteDelay,
                          final Long autoDeleteMessageCount,
                          final Integer defaultConsumerWindowSize) {
-      this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null, null);
+      this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, null, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null, null);
    }
 
    public QueueQueryImpl(final boolean durable,
@@ -180,6 +182,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
                          final RoutingType routingType,
                          final Boolean exclusive,
                          final Boolean groupRebalance,
+                         final Boolean groupRebalancePauseDispatch,
                          final Integer groupBuckets,
                          final SimpleString groupFirstKey,
                          final Boolean lastValue,
@@ -208,6 +211,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
       this.routingType = routingType;
       this.exclusive = exclusive;
       this.groupRebalance = groupRebalance;
+      this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
       this.groupBuckets = groupBuckets;
       this.groupFirstKey = groupFirstKey;
       this.lastValue = lastValue;
@@ -329,6 +333,11 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
    }
 
    @Override
+   public Boolean isGroupRebalancePauseDispatch() {
+      return groupRebalancePauseDispatch;
+   }
+
+   @Override
    public Integer getGroupBuckets() {
       return groupBuckets;
    }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index a4aa8ab..b4c1dcd 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -905,7 +905,7 @@ public class ActiveMQSessionContext extends SessionContext {
       // We try to recreate any non-durable or auto-created queues, since they might not be there on failover/reconnect.
       // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover/reconnection
       if (!queueInfo.isDurable() || queueInfo.isAutoCreated()) {
-         CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), queueInfo.isDurable(), queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, queueInfo.isExclusive(), queueInfo.isGroupRebalance(), queueInfo.getGroupBuckets(), queueInfo.getGroupFirstKey(), queueInfo.isLastValue(), queueInfo.getLastValueKey(), [...]
+         CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), queueInfo.isDurable(), queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, queueInfo.isExclusive(), queueInfo.isGroupRebalance(), queueInfo.isGroupRebalancePauseDispatch(), queueInfo.getGroupBuckets(), queueInfo.getGroupFirstKey(), queueInfo. [...]
 
          sendPacketWithoutLock(sessionChannel, createQueueRequest);
       }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
index 3fe20a7..aeebbe4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
@@ -37,6 +37,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
 
    private Boolean groupRebalance;
 
+   private Boolean groupRebalancePauseDispatch;
+
    private Integer groupBuckets;
 
    private SimpleString groupFirstKey;
@@ -81,6 +83,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
          requiresResponse,
          queueAttributes.getExclusive(),
          queueAttributes.getGroupRebalance(),
+         queueAttributes.getGroupRebalancePauseDispatch(),
          queueAttributes.getGroupBuckets(),
          queueAttributes.getGroupFirstKey(),
          queueAttributes.getLastValue(),
@@ -111,6 +114,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
          requiresResponse,
          queueConfiguration.isExclusive(),
          queueConfiguration.isGroupRebalance(),
+         queueConfiguration.isGroupRebalancePauseDispatch(),
          queueConfiguration.getGroupBuckets(),
          queueConfiguration.getGroupFirstKey(),
          queueConfiguration.isLastValue(),
@@ -138,6 +142,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
                                 final boolean requiresResponse,
                                 final Boolean exclusive,
                                 final Boolean groupRebalance,
+                                final Boolean groupRebalancePauseDispatch,
                                 final Integer groupBuckets,
                                 final SimpleString groupFirstKey,
                                 final Boolean lastValue,
@@ -164,6 +169,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
       this.purgeOnNoConsumers = purgeOnNoConsumers;
       this.exclusive = exclusive;
       this.groupRebalance = groupRebalance;
+      this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
       this.groupBuckets = groupBuckets;
       this.groupFirstKey = groupFirstKey;
       this.lastValue = lastValue;
@@ -191,6 +197,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
          .setRoutingType(routingType)
          .setExclusive(exclusive)
          .setGroupRebalance(groupRebalance)
+         .setGroupRebalancePauseDispatch(groupRebalancePauseDispatch)
          .setNonDestructive(nonDestructive)
          .setLastValue(lastValue)
          .setFilterString(filterString)
@@ -219,6 +226,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
       buff.append(", purgeOnNoConsumers=" + purgeOnNoConsumers);
       buff.append(", exclusive=" + exclusive);
       buff.append(", groupRebalance=" + groupRebalance);
+      buff.append(", groupRebalancePauseDispatch=" + groupRebalancePauseDispatch);
       buff.append(", groupBuckets=" + groupBuckets);
       buff.append(", groupFirstKey=" + groupFirstKey);
       buff.append(", lastValue=" + lastValue);
@@ -324,6 +332,14 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
       this.groupRebalance = groupRebalance;
    }
 
+   public Boolean isGroupRebalancePauseDispatch() {
+      return groupRebalancePauseDispatch;
+   }
+
+   public void setGroupRebalancePauseDispatch(Boolean groupRebalancePauseDispatch) {
+      this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
+   }
+
    public Integer getGroupBuckets() {
       return groupBuckets;
    }
@@ -401,6 +417,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
       buffer.writeNullableSimpleString(groupFirstKey);
       BufferHelper.writeNullableLong(buffer, ringSize);
       BufferHelper.writeNullableBoolean(buffer, enabled);
+      BufferHelper.writeNullableBoolean(buffer, groupRebalancePauseDispatch);
    }
 
    @Override
@@ -434,6 +451,9 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
       if (buffer.readableBytes() > 0) {
          enabled = BufferHelper.readNullableBoolean(buffer);
       }
+      if (buffer.readableBytes() > 0) {
+         groupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer);
+      }
    }
 
    @Override
@@ -446,6 +466,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
       result = prime * result + (purgeOnNoConsumers ? 1231 : 1237);
       result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 1237);
       result = prime * result + (groupRebalance == null ? 0 : groupRebalance ? 1231 : 1237);
+      result = prime * result + (groupRebalancePauseDispatch == null ? 0 : groupRebalancePauseDispatch ? 1231 : 1237);
       result = prime * result + (groupBuckets == null ? 0 : groupBuckets.hashCode());
       result = prime * result + (groupFirstKey == null ? 0 : groupFirstKey.hashCode());
       result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 1237);
@@ -486,6 +507,11 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
             return false;
       } else if (!groupRebalance.equals(other.groupRebalance))
          return false;
+      if (groupRebalancePauseDispatch == null) {
+         if (other.groupRebalancePauseDispatch != null)
+            return false;
+      } else if (!groupRebalancePauseDispatch.equals(other.groupRebalancePauseDispatch))
+         return false;
       if (groupBuckets == null) {
          if (other.groupBuckets != null)
             return false;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
index 7588e74..17bd1ad 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
@@ -29,6 +29,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
    private Boolean purgeOnNoConsumers;
    private Boolean exclusive;
    private Boolean groupRebalance;
+   private Boolean groupRebalancePauseDispatch;
    private Integer groupBuckets;
    private SimpleString groupFirstKey;
    private Boolean lastValue;
@@ -53,6 +54,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
          queueConfiguration.isPurgeOnNoConsumers(),
          queueConfiguration.isExclusive(),
          queueConfiguration.isGroupRebalance(),
+         queueConfiguration.isGroupRebalancePauseDispatch(),
          queueConfiguration.getGroupBuckets(),
          queueConfiguration.getGroupFirstKey(),
          queueConfiguration.isLastValue(),
@@ -78,6 +80,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
                                       final Boolean purgeOnNoConsumers,
                                       final Boolean exclusive,
                                       final Boolean groupRebalance,
+                                      final Boolean groupRebalancePauseDispatch,
                                       final Integer groupBuckets,
                                       final SimpleString groupFirstKey,
                                       final Boolean lastValue,
@@ -102,6 +105,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
       this.purgeOnNoConsumers = purgeOnNoConsumers;
       this.exclusive = exclusive;
       this.groupRebalance = groupRebalance;
+      this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
       this.groupBuckets = groupBuckets;
       this.groupFirstKey = groupFirstKey;
       this.lastValue = lastValue;
@@ -201,6 +205,14 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
       this.groupRebalance = groupRebalance;
    }
 
+   public Boolean isGroupRebalancePauseDispatch() {
+      return groupRebalancePauseDispatch;
+   }
+
+   public void setGroupRebalancePauseDispatch(Boolean groupRebalancePauseDispatch) {
+      this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
+   }
+
    public Integer getGroupBuckets() {
       return groupBuckets;
    }
@@ -264,6 +276,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
          .setRoutingType(routingType)
          .setExclusive(exclusive)
          .setGroupRebalance(groupRebalance)
+         .setGroupRebalancePauseDispatch(groupRebalancePauseDispatch)
          .setNonDestructive(nonDestructive)
          .setLastValue(lastValue)
          .setFilterString(filterString)
@@ -293,6 +306,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
       buff.append(", purgeOnNoConsumers=" + purgeOnNoConsumers);
       buff.append(", exclusive=" + exclusive);
       buff.append(", groupRebalance=" + groupRebalance);
+      buff.append(", groupRebalancePauseDispatch=" + groupRebalancePauseDispatch);
       buff.append(", groupBuckets=" + groupBuckets);
       buff.append(", groupFirstKey=" + groupFirstKey);
       buff.append(", lastValue=" + lastValue);
@@ -334,6 +348,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
       buffer.writeNullableSimpleString(groupFirstKey);
       BufferHelper.writeNullableLong(buffer, ringSize);
       BufferHelper.writeNullableBoolean(buffer, enabled);
+      BufferHelper.writeNullableBoolean(buffer, groupRebalancePauseDispatch);
    }
 
    @Override
@@ -370,6 +385,9 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
       if (buffer.readableBytes() > 0) {
          enabled = buffer.readNullableBoolean();
       }
+      if (buffer.readableBytes() > 0) {
+         groupRebalancePauseDispatch = buffer.readNullableBoolean();
+      }
    }
 
    @Override
@@ -386,6 +404,7 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
       result = prime * result + (purgeOnNoConsumers == null ? 0 : purgeOnNoConsumers ? 1231 : 1237);
       result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 1237);
       result = prime * result + (groupRebalance == null ? 0 : groupRebalance ? 1231 : 1237);
+      result = prime * result + (groupRebalancePauseDispatch == null ? 0 : groupRebalancePauseDispatch ? 1231 : 1237);
       result = prime * result + (groupBuckets == null ? 0 : groupBuckets.hashCode());
       result = prime * result + (groupFirstKey == null ? 0 : groupFirstKey.hashCode());
       result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 1237);
@@ -451,6 +470,11 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
             return false;
       } else if (!groupRebalance.equals(other.groupRebalance))
          return false;
+      if (groupRebalancePauseDispatch == null) {
+         if (other.groupRebalancePauseDispatch != null)
+            return false;
+      } else if (!groupRebalancePauseDispatch.equals(other.groupRebalancePauseDispatch))
+         return false;
       if (groupBuckets == null) {
          if (other.groupBuckets != null)
             return false;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
index 5f09e96..bf83222 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
@@ -38,6 +38,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
 
    protected Boolean groupRebalance;
 
+   protected Boolean groupRebalancePauseDispatch;
+
    protected Integer groupBuckets;
 
    protected SimpleString groupFirstKey;
@@ -65,11 +67,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
    private Boolean enabled;
 
    public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
-      this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), result.getLastValueKey(), result.isNonDestr [...]
+      this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.isGroupRebalancePauseDispatch(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), res [...]
    }
 
    public SessionQueueQueryResponseMessage_V3() {
-      this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null, null, null);
+      this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null, null, null, null);
    }
 
    private SessionQueueQueryResponseMessage_V3(final SimpleString name,
@@ -87,6 +89,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
                                                final int maxConsumers,
                                                final Boolean exclusive,
                                                final Boolean groupRebalance,
+                                               final Boolean groupRebalancePauseDispatch,
                                                final Integer groupBuckets,
                                                final SimpleString groupFirstKey,
                                                final Boolean lastValue,
@@ -132,6 +135,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
 
       this.groupRebalance = groupRebalance;
 
+      this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
+
       this.groupBuckets = groupBuckets;
 
       this.groupFirstKey = groupFirstKey;
@@ -255,6 +260,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       this.groupRebalance = groupRebalance;
    }
 
+   public Boolean isGroupRebalancePauseDispatch() {
+      return groupRebalancePauseDispatch;
+   }
+
+   public void setGroupRebalancePauseDispatch(Boolean groupRebalancePauseDispatch) {
+      this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
+   }
+
    public Integer getGroupBuckets() {
       return groupBuckets;
    }
@@ -321,6 +334,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       buffer.writeNullableSimpleString(groupFirstKey);
       BufferHelper.writeNullableLong(buffer, ringSize);
       BufferHelper.writeNullableBoolean(buffer, enabled);
+      BufferHelper.writeNullableBoolean(buffer, groupRebalancePauseDispatch);
 
    }
 
@@ -358,6 +372,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       if (buffer.readableBytes() > 0) {
          enabled = BufferHelper.readNullableBoolean(buffer);
       }
+      if (buffer.readableBytes() > 0) {
+         groupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer);
+      }
    }
 
    @Override
@@ -370,6 +387,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       result = prime * result + maxConsumers;
       result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 1237);
       result = prime * result + (groupRebalance == null ? 0 : groupRebalance ? 1231 : 1237);
+      result = prime * result + (groupRebalancePauseDispatch == null ? 0 : groupRebalancePauseDispatch ? 1231 : 1237);
       result = prime * result + (groupBuckets == null ? 0 : groupBuckets.hashCode());
       result = prime * result + (groupFirstKey == null ? 0 : groupFirstKey.hashCode());
       result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 1237);
@@ -402,6 +420,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       buff.append(", maxConsumers=" + maxConsumers);
       buff.append(", exclusive=" + exclusive);
       buff.append(", groupRebalance=" + groupRebalance);
+      buff.append(", groupRebalancePauseDispatch=" + groupRebalancePauseDispatch);
       buff.append(", groupBuckets=" + groupBuckets);
       buff.append(", groupFirstKey=" + groupFirstKey);
       buff.append(", lastValue=" + lastValue);
@@ -420,7 +439,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
 
    @Override
    public ClientSession.QueueQuery toQueueQuery() {
-      return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getAutoDeleteDelay(), getAutoDeleteMe [...]
+      return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), isGroupRebalancePauseDispatch(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getA [...]
    }
 
    @Override
@@ -446,6 +465,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
             return false;
       } else if (!groupRebalance.equals(other.groupRebalance))
          return false;
+      if (groupRebalancePauseDispatch == null) {
+         if (other.groupRebalancePauseDispatch != null)
+            return false;
+      } else if (!groupRebalancePauseDispatch.equals(other.groupRebalancePauseDispatch))
+         return false;
       if (groupBuckets == null) {
          if (other.groupBuckets != null)
             return false;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
index 3ae86dc..798f417 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
@@ -51,6 +51,8 @@ public class QueueQueryResult {
 
    private Boolean groupRebalance;
 
+   private Boolean groupRebalancePauseDispatch;
+
    private Integer groupBuckets;
 
    private SimpleString groupFirstKey;
@@ -92,6 +94,7 @@ public class QueueQueryResult {
                            final int maxConsumers,
                            final Boolean exclusive,
                            final Boolean groupRebalance,
+                           final Boolean groupRebalancePauseDispatch,
                            final Integer groupBuckets,
                            final SimpleString groupFirstKey,
                            final Boolean lastValue,
@@ -135,6 +138,8 @@ public class QueueQueryResult {
 
       this.groupRebalance = groupRebalance;
 
+      this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
+
       this.groupBuckets = groupBuckets;
 
       this.groupFirstKey = groupFirstKey;
@@ -250,6 +255,10 @@ public class QueueQueryResult {
       return groupRebalance;
    }
 
+   public Boolean isGroupRebalancePauseDispatch() {
+      return groupRebalancePauseDispatch;
+   }
+
    public Integer getGroupBuckets() {
       return groupBuckets;
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index c42085b..95cae11 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -216,6 +216,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
    private static final String DEFAULT_GROUP_REBALANCE = "default-group-rebalance";
 
+   private static final String DEFAULT_GROUP_REBALANCE_PAUSE_DISPATCH = "default-group-rebalance-pause-dispatch";
+
    private static final String DEFAULT_GROUP_BUCKETS = "default-group-buckets";
 
    private static final String DEFAULT_GROUP_FIRST_KEY = "default-group-first-key";
@@ -1149,6 +1151,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
             addressSettings.setDefaultExclusiveQueue(XMLUtil.parseBoolean(child));
          } else if (DEFAULT_GROUP_REBALANCE.equalsIgnoreCase(name)) {
             addressSettings.setDefaultGroupRebalance(XMLUtil.parseBoolean(child));
+         } else if (DEFAULT_GROUP_REBALANCE_PAUSE_DISPATCH.equalsIgnoreCase(name)) {
+            addressSettings.setDefaultGroupRebalancePauseDispatch(XMLUtil.parseBoolean(child));
          } else if (DEFAULT_GROUP_BUCKETS.equalsIgnoreCase(name)) {
             addressSettings.setDefaultGroupBuckets(XMLUtil.parseInt(child));
          } else if (DEFAULT_GROUP_FIRST_KEY.equalsIgnoreCase(name)) {
@@ -1294,6 +1298,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
       String user = null;
       Boolean exclusive = null;
       Boolean groupRebalance = null;
+      Boolean groupRebalancePauseDispatch = null;
       Integer groupBuckets = null;
       String groupFirstKey = null;
       Boolean lastValue = null;
@@ -1316,6 +1321,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
             exclusive = Boolean.parseBoolean(item.getNodeValue());
          } else if (item.getNodeName().equals("group-rebalance")) {
             groupRebalance = Boolean.parseBoolean(item.getNodeValue());
+         } else if (item.getNodeName().equals("group-rebalance-pause-dispatch")) {
+            groupRebalancePauseDispatch = Boolean.parseBoolean(item.getNodeValue());
          } else if (item.getNodeName().equals("group-buckets")) {
             groupBuckets = Integer.parseInt(item.getNodeValue());
          } else if (item.getNodeName().equals("group-first-key")) {
@@ -1361,6 +1368,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
               .setUser(user)
               .setExclusive(exclusive)
               .setGroupRebalance(groupRebalance)
+              .setGroupRebalancePauseDispatch(groupRebalancePauseDispatch)
               .setGroupBuckets(groupBuckets)
               .setGroupFirstKey(groupFirstKey)
               .setLastValue(lastValue)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 9084beb..e6493af 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -2881,6 +2881,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
             .add("defaultNonDestructive", addressSettings.isDefaultNonDestructive())
             .add("defaultExclusiveQueue", addressSettings.isDefaultExclusiveQueue())
             .add("defaultGroupRebalance", addressSettings.isDefaultGroupRebalance())
+            .add("defaultGroupRebalancePauseDispatch", addressSettings.isDefaultGroupRebalancePauseDispatch())
             .add("defaultGroupBuckets", addressSettings.getDefaultGroupBuckets())
             .add("defaultGroupFirstKey", addressSettings.getDefaultGroupFirstKey() == null ? "" : addressSettings.getDefaultGroupFirstKey().toString())
             .add("defaultMaxConsumers", addressSettings.getDefaultMaxConsumers())
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 0ce51b8..33d241f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -1808,6 +1808,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
    }
 
    @Override
+   public boolean isGroupRebalancePauseDispatch() {
+      if (AuditLogger.isEnabled()) {
+         AuditLogger.isGroupRebalancePauseDispatch(queue);
+      }
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.isGroupRebalancePauseDispatch();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
    public int getGroupBuckets() {
       if (AuditLogger.isEnabled()) {
          AuditLogger.getGroupBuckets(queue);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/QueueView.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/QueueView.java
index c84b09b..deaa0bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/QueueView.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/QueueView.java
@@ -65,6 +65,7 @@ public class QueueView extends ActiveMQAbstractView<QueueControl> {
          .add("lastValue", toString(queue.isLastValue()))
          .add("scheduledCount", toString(queue.getScheduledCount()))
          .add("groupRebalance", toString(queue.isGroupRebalance()))
+         .add("groupRebalancePauseDispatch", toString(queue.isGroupRebalancePauseDispatch()))
          .add("groupBuckets", toString(queue.getGroupBuckets()))
          .add("groupFirstKey", toString(queue.getGroupFirstKey()));
       return obj;
@@ -122,6 +123,8 @@ public class QueueView extends ActiveMQAbstractView<QueueControl> {
             return q.getScheduledCount();
          case "groupRebalance":
             return queue.isGroupRebalance();
+         case "groupRebalancePauseDispatch":
+            return queue.isGroupRebalancePauseDispatch();
          case "groupBuckets":
             return queue.getGroupBuckets();
          case "groupFirstKey":
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
index 62ed7af..9d3c96f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
@@ -92,6 +92,8 @@ public interface QueueBindingInfo {
 
    boolean isGroupRebalance();
 
+   boolean isGroupRebalancePauseDispatch();
+
    int getGroupBuckets();
 
    SimpleString getGroupFirstKey();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 551c64f..8a4b0b9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1307,7 +1307,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
 
       SimpleString filterString = filter == null ? null : filter.getFilterString();
 
-      PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isEnabled(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch( [...]
+      PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isEnabled(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDi [...]
 
       readLock();
       try {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
index e753395..1dc7b11 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
@@ -66,6 +66,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
 
    public boolean groupRebalance;
 
+   public boolean groupRebalancePauseDispatch;
+
    public int groupBuckets;
 
    public SimpleString groupFirstKey;
@@ -118,6 +120,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
          configurationManaged +
          ", groupRebalance=" +
          groupRebalance +
+         ", groupRebalancePauseDispatch=" +
+         groupRebalancePauseDispatch +
          ", groupBuckets=" +
          groupBuckets +
          ", groupFirstKey=" +
@@ -141,6 +145,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
                                          final boolean enabled,
                                          final boolean exclusive,
                                          final boolean groupRebalance,
+                                         final boolean groupRebalancePauseDispatch,
                                          final int groupBuckets,
                                          final SimpleString groupFirstKey,
                                          final boolean lastValue,
@@ -161,6 +166,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
       this.autoCreated = autoCreated;
       this.maxConsumers = maxConsumers;
       this.purgeOnNoConsumers = purgeOnNoConsumers;
+      this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
       this.enabled = enabled;
       this.exclusive = exclusive;
       this.groupRebalance = groupRebalance;
@@ -347,6 +353,11 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
    }
 
    @Override
+   public boolean isGroupRebalancePauseDispatch() {
+      return groupRebalancePauseDispatch;
+   }
+
+   @Override
    public int getGroupBuckets() {
       return groupBuckets;
    }
@@ -482,6 +493,12 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
       } else {
          enabled = ActiveMQDefaultConfiguration.getDefaultEnabled();
       }
+
+      if (buffer.readableBytes() > 0) {
+         groupRebalancePauseDispatch = buffer.readBoolean();
+      } else {
+         groupRebalancePauseDispatch = ActiveMQDefaultConfiguration.getDefaultGroupRebalancePauseDispatch();
+      }
    }
 
    @Override
@@ -509,6 +526,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
       buffer.writeNullableSimpleString(groupFirstKey);
       buffer.writeLong(ringSize);
       buffer.writeBoolean(enabled);
+      buffer.writeBoolean(groupRebalancePauseDispatch);
    }
 
    @Override
@@ -533,6 +551,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
          DataConstants.SIZE_LONG +
          SimpleString.sizeofNullableString(groupFirstKey) +
          DataConstants.SIZE_LONG +
+         DataConstants.SIZE_BOOLEAN +
          DataConstants.SIZE_BOOLEAN;
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 9e063fa..a3576b3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -682,6 +682,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                changed = true;
                queue.setGroupRebalance(queueConfiguration.isGroupRebalance());
             }
+            if ((forceUpdate || queueConfiguration.isGroupRebalancePauseDispatch() != null) && !Objects.equals(queue.isGroupRebalancePauseDispatch(), queueConfiguration.isGroupRebalancePauseDispatch())) {
+               changed = true;
+               queue.setGroupRebalancePauseDispatch(queueConfiguration.isGroupRebalancePauseDispatch());
+            }
             if ((forceUpdate || queueConfiguration.getGroupBuckets() != null) && !Objects.equals(queue.getGroupBuckets(), queueConfiguration.getGroupBuckets())) {
                changed = true;
                queue.setGroupBuckets(queueConfiguration.getGroupBuckets());
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index be7b155..0f2a071 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -136,6 +136,10 @@ public interface Queue extends Bindable,CriticalComponent {
 
    void setGroupRebalance(boolean groupRebalance);
 
+   boolean isGroupRebalancePauseDispatch();
+
+   void setGroupRebalancePauseDispatch(boolean groupRebalancePauseDisptach);
+
    SimpleString getGroupFirstKey();
 
    void setGroupFirstKey(SimpleString groupFirstKey);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 3384912..4fe0ff4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -969,6 +969,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       long defaultDelayBeforeDispatch = addressSettings.getDefaultDelayBeforeDispatch();
       int defaultConsumerWindowSize = addressSettings.getDefaultConsumerWindowSize();
       boolean defaultGroupRebalance = addressSettings.isDefaultGroupRebalance();
+      boolean defaultGroupRebalancePauseDispatch = addressSettings.isDefaultGroupRebalancePauseDispatch();
       int defaultGroupBuckets = addressSettings.getDefaultGroupBuckets();
       SimpleString defaultGroupFirstKey = addressSettings.getDefaultGroupFirstKey();
       long autoDeleteQueuesDelay = addressSettings.getAutoDeleteQueuesDelay();
@@ -985,12 +986,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
          SimpleString filterString = filter == null ? null : filter.getFilterString();
 
-         response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumers [...]
+         response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue [...]
       } else if (realName.equals(managementAddress)) {
          // make an exception for the management address (see HORNETQ-29)
-         response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null);
+         response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null);
       } else {
-         response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessage [...]
+         response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupRebalancePauseDispatch, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDelete [...]
       }
 
       return response;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java
index 0b3b1ce..5bbf7c2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConfigurationUtils.java
@@ -27,6 +27,7 @@ public class QueueConfigurationUtils {
       config.setMaxConsumers(config.getMaxConsumers() == null ? as.getDefaultMaxConsumers() : config.getMaxConsumers());
       config.setExclusive(config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive());
       config.setGroupRebalance(config.isGroupRebalance() == null ? as.isDefaultGroupRebalance() : config.isGroupRebalance());
+      config.setGroupRebalancePauseDispatch(config.isGroupRebalancePauseDispatch() == null ? as.isDefaultGroupRebalancePauseDispatch() : config.isGroupRebalancePauseDispatch());
       config.setGroupBuckets(config.getGroupBuckets() == null ? as.getDefaultGroupBuckets() : config.getGroupBuckets());
       config.setGroupFirstKey(config.getGroupFirstKey() == null ? as.getDefaultGroupFirstKey() : config.getGroupFirstKey());
       config.setLastValue(config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue());
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java
index 895b1ee..fb8f096 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java
@@ -21,6 +21,7 @@ import org.apache.activemq.artemis.utils.collections.RepeatableIterator;
 import org.apache.activemq.artemis.utils.collections.ResettableIterator;
 
 import java.util.Set;
+import java.util.stream.Stream;
 
 public interface QueueConsumers<T extends PriorityAware> extends Iterable<T>, RepeatableIterator<T>, ResettableIterator<T> {
 
@@ -34,4 +35,6 @@ public interface QueueConsumers<T extends PriorityAware> extends Iterable<T>, Re
 
    boolean isEmpty();
 
+   Stream<T> stream();
+
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
index 0495c51..1afca46 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.Spliterator;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.function.Consumer;
+import java.util.stream.Stream;
 
 /**
  * This class's purpose is to hold the consumers.
@@ -105,6 +106,11 @@ public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsume
    }
 
    @Override
+   public Stream<T> stream() {
+      return unmodifiableConsumers.stream();
+   }
+
+   @Override
    public Iterator<T> iterator() {
       return unmodifiableConsumers.iterator();
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 9105fc2..b44d1e0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -254,6 +254,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private volatile boolean groupRebalance;
 
+   private volatile boolean groupRebalancePauseDispatch;
+
    private volatile int groupBuckets;
 
    private volatile SimpleString groupFirstKey;
@@ -641,6 +643,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
       this.groupRebalance = queueConfiguration.isGroupRebalance() == null ? ActiveMQDefaultConfiguration.getDefaultGroupRebalance() : queueConfiguration.isGroupRebalance();
 
+      this.groupRebalancePauseDispatch = queueConfiguration.isGroupRebalancePauseDispatch() == null ? ActiveMQDefaultConfiguration.getDefaultGroupRebalancePauseDispatch() : queueConfiguration.isGroupRebalancePauseDispatch();
+
       this.groupBuckets = queueConfiguration.getGroupBuckets() == null ? ActiveMQDefaultConfiguration.getDefaultGroupBuckets() : queueConfiguration.getGroupBuckets();
 
       this.groups = groupMap(this.groupBuckets);
@@ -918,6 +922,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public boolean isGroupRebalancePauseDispatch() {
+      return groupRebalancePauseDispatch;
+   }
+
+   @Override
+   public synchronized void setGroupRebalancePauseDispatch(boolean groupRebalancePauseDispatch) {
+      this.groupRebalancePauseDispatch = groupRebalancePauseDispatch;
+   }
+
+   @Override
    public SimpleString getGroupFirstKey() {
       return groupFirstKey;
    }
@@ -1330,16 +1344,32 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       if (canDispatch) {
          return true;
       } else {
+
+         //Dont change that we can dispatch until inflight's are handled avoids issues with out of order messages.
+         if (inFlightMessages()) {
+            return false;
+         }
+
+         if (consumers.size() >= consumersBeforeDispatch) {
+            if (dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(false), BooleanUtil.toInt(true))) {
+               dispatchStartTimeUpdater.set(this, System.currentTimeMillis());
+            }
+            return true;
+         }
+
          long currentDispatchStartTime = dispatchStartTimeUpdater.get(this);
          if (currentDispatchStartTime != -1 && currentDispatchStartTime < System.currentTimeMillis()) {
             dispatchingUpdater.set(this, BooleanUtil.toInt(true));
             return true;
-         } else {
-            return false;
          }
+         return false;
       }
    }
 
+   private boolean inFlightMessages() {
+      return consumers.stream().mapToInt(c -> c.consumer().getDeliveringMessages().size()).sum() != 0;
+   }
+
    @Override
    public void addConsumer(final Consumer consumer) throws Exception {
       if (logger.isDebugEnabled()) {
@@ -1362,21 +1392,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             }
 
             cancelRedistributor();
+
+            if (groupRebalance) {
+               if (groupRebalancePauseDispatch) {
+                  stopDispatch();
+               }
+               groups.removeAll();
+            }
+
             ConsumerHolder<Consumer> newConsumerHolder = new ConsumerHolder<>(consumer);
             if (consumers.add(newConsumerHolder)) {
-               int currentConsumerCount = consumers.size();
                if (delayBeforeDispatch >= 0) {
                   dispatchStartTimeUpdater.compareAndSet(this,-1, delayBeforeDispatch + System.currentTimeMillis());
                }
-               if (currentConsumerCount >= consumersBeforeDispatch) {
-                  if (dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(false), BooleanUtil.toInt(true))) {
-                     dispatchStartTimeUpdater.set(this, System.currentTimeMillis());
-                  }
-               }
-            }
 
-            if (groupRebalance) {
-               groups.removeAll();
             }
 
             if (refCountForConsumers != null) {
@@ -1423,9 +1452,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
             if (consumerRemoved) {
                consumerRemovedTimestampUpdater.set(this, System.currentTimeMillis());
-               boolean stopped = dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(true), BooleanUtil.toInt(consumers.size() != 0));
-               if (stopped) {
-                  dispatchStartTimeUpdater.set(this, -1);
+               if (consumers.size() == 0) {
+                  stopDispatch();
                }
             }
 
@@ -1446,6 +1474,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
+   private void stopDispatch() {
+      boolean stopped = dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(true), BooleanUtil.toInt(false));
+      if (stopped) {
+         dispatchStartTimeUpdater.set(this, -1);
+      }
+   }
+
    private boolean checkConsumerDirectDeliver() {
       if (consumers.isEmpty()) {
          return false;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 4c9205c..e713b08 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -169,6 +169,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    private Boolean defaultGroupRebalance = null;
 
+   private Boolean defaultGroupRebalancePauseDispatch = null;
+
    private Integer defaultGroupBuckets = null;
 
    private SimpleString defaultGroupFirstKey = null;
@@ -311,6 +313,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       this.defaultAddressRoutingType = other.defaultAddressRoutingType;
       this.defaultConsumerWindowSize = other.defaultConsumerWindowSize;
       this.defaultGroupRebalance = other.defaultGroupRebalance;
+      this.defaultGroupRebalancePauseDispatch = other.defaultGroupRebalancePauseDispatch;
       this.defaultGroupBuckets = other.defaultGroupBuckets;
       this.defaultGroupFirstKey = other.defaultGroupFirstKey;
       this.defaultRingSize = other.defaultRingSize;
@@ -840,6 +843,21 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
    }
 
    /**
+    * @return the defaultGroupRebalancePauseDispatch
+    */
+   public boolean isDefaultGroupRebalancePauseDispatch() {
+      return defaultGroupRebalancePauseDispatch != null ? defaultGroupRebalancePauseDispatch : ActiveMQDefaultConfiguration.getDefaultGroupRebalancePauseDispatch();
+   }
+
+   /**
+    * @param defaultGroupRebalancePauseDispatch the defaultGroupBuckets to set
+    */
+   public AddressSettings setDefaultGroupRebalancePauseDispatch(boolean defaultGroupRebalancePauseDispatch) {
+      this.defaultGroupRebalancePauseDispatch = defaultGroupRebalancePauseDispatch;
+      return this;
+   }
+
+   /**
     * @return the defaultGroupBuckets
     */
    public int getDefaultGroupBuckets() {
@@ -1053,6 +1071,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (defaultGroupRebalance == null) {
          defaultGroupRebalance = merged.defaultGroupRebalance;
       }
+      if (defaultGroupRebalancePauseDispatch == null) {
+         defaultGroupRebalancePauseDispatch = merged.defaultGroupRebalancePauseDispatch;
+      }
       if (defaultGroupBuckets == null) {
          defaultGroupBuckets = merged.defaultGroupBuckets;
       }
@@ -1294,6 +1315,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (buffer.readableBytes() > 0) {
          enableMetrics = BufferHelper.readNullableBoolean(buffer);
       }
+
+      if (buffer.readableBytes() > 0) {
+         defaultGroupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer);
+      }
+
    }
 
    @Override
@@ -1356,7 +1382,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          BufferHelper.sizeOfNullableBoolean(autoCreateExpiryResources) +
          SimpleString.sizeofNullableString(expiryQueuePrefix) +
          SimpleString.sizeofNullableString(expiryQueueSuffix) +
-         BufferHelper.sizeOfNullableBoolean(enableMetrics);
+         BufferHelper.sizeOfNullableBoolean(enableMetrics) +
+         BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch);
    }
 
    @Override
@@ -1480,6 +1507,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       BufferHelper.writeNullableLong(buffer, maxExpiryDelay);
 
       BufferHelper.writeNullableBoolean(buffer, enableMetrics);
+
+      BufferHelper.writeNullableBoolean(buffer, defaultGroupRebalancePauseDispatch);
+
    }
 
    /* (non-Javadoc)
@@ -1539,6 +1569,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       result = prime * result + ((defaultDelayBeforeDispatch == null) ? 0 : defaultDelayBeforeDispatch.hashCode());
       result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
       result = prime * result + ((defaultGroupRebalance == null) ? 0 : defaultGroupRebalance.hashCode());
+      result = prime * result + ((defaultGroupRebalancePauseDispatch == null) ? 0 : defaultGroupRebalancePauseDispatch.hashCode());
       result = prime * result + ((defaultGroupBuckets == null) ? 0 : defaultGroupBuckets.hashCode());
       result = prime * result + ((defaultGroupFirstKey == null) ? 0 : defaultGroupFirstKey.hashCode());
       result = prime * result + ((defaultRingSize == null) ? 0 : defaultRingSize.hashCode());
@@ -1825,6 +1856,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       } else if (!defaultGroupRebalance.equals(other.defaultGroupRebalance))
          return false;
 
+      if (defaultGroupRebalancePauseDispatch == null) {
+         if (other.defaultGroupRebalancePauseDispatch != null)
+            return false;
+      } else if (!defaultGroupRebalancePauseDispatch.equals(other.defaultGroupRebalancePauseDispatch))
+         return false;
+
       if (defaultGroupBuckets == null) {
          if (other.defaultGroupBuckets != null)
             return false;
@@ -1996,6 +2033,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          defaultConsumerWindowSize +
          ", defaultGroupRebalance=" +
          defaultGroupRebalance +
+         ", defaultGroupRebalancePauseDispatch=" +
+         defaultGroupRebalancePauseDispatch +
          ", defaultGroupBuckets=" +
          defaultGroupBuckets +
          ", defaultGroupFirstKey=" +
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 05c91fd..6387a69 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -526,6 +526,7 @@
                         <xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" use="optional"/>
                         <xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/>
                         <xsd:attribute name="group-rebalance" type="xsd:boolean" use="optional"/>
+                        <xsd:attribute name="group-rebalance-pause-dispatch" type="xsd:boolean" use="optional"/>
                         <xsd:attribute name="group-buckets" type="xsd:int" use="optional"/>
                         <xsd:attribute name="group-first-key" type="xsd:string" use="optional"/>
                         <xsd:attribute name="last-value" type="xsd:boolean" use="optional"/>
@@ -3365,6 +3366,14 @@
                </xsd:annotation>
             </xsd:element>
 
+            <xsd:element name="default-group-rebalance-pause-dispatch" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     whether to pause dispatch when rebalancing groups
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
             <xsd:element name="default-group-buckets" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
                <xsd:annotation>
                   <xsd:documentation>
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java
index 2ac3a2a..92ee1eb 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java
@@ -51,6 +51,7 @@ public class QueueBindingEncodingTest extends Assert {
       final boolean configurationManaged = RandomUtil.randomBoolean();
       final long ringSize = RandomUtil.randomLong();
       final boolean enabled = RandomUtil.randomBoolean();
+      final boolean groupRebalancePauseDispatch = RandomUtil.randomBoolean();
 
       PersistentQueueBindingEncoding encoding = new PersistentQueueBindingEncoding(name,
                                                                                    address,
@@ -62,6 +63,7 @@ public class QueueBindingEncodingTest extends Assert {
                                                                                    enabled,
                                                                                    exclusive,
                                                                                    groupRebalance,
+                                                                                   groupRebalancePauseDispatch,
                                                                                    groupBuckets,
                                                                                    groupFirstKey,
                                                                                    lastValue,
@@ -105,5 +107,8 @@ public class QueueBindingEncodingTest extends Assert {
       assertEquals(routingType, decoding.getRoutingType());
       assertEquals(configurationManaged, decoding.isConfigurationManaged());
       assertEquals(ringSize, decoding.getRingSize());
+
+      assertEquals(groupRebalancePauseDispatch, decoding.isGroupRebalancePauseDispatch());
+
    }
 }
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 40c472c..00b205e 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -927,6 +927,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public boolean isGroupRebalancePauseDispatch() {
+         return false;
+      }
+
+      @Override
+      public void setGroupRebalancePauseDispatch(boolean groupRebalancePauseDisptach) {
+
+      }
+
+      @Override
       public SimpleString getGroupFirstKey() {
          return null;
       }
diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd
index a3e5812..5a96eef 100644
--- a/artemis-tools/src/test/resources/artemis-configuration.xsd
+++ b/artemis-tools/src/test/resources/artemis-configuration.xsd
@@ -518,6 +518,7 @@
                         <xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" use="optional"/>
                         <xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/>
                         <xsd:attribute name="group-rebalance" type="xsd:boolean" use="optional"/>
+                        <xsd:attribute name="group-rebalance-pause-dispatch" type="xsd:boolean" use="optional"/>
                         <xsd:attribute name="group-buckets" type="xsd:int" use="optional"/>
                         <xsd:attribute name="group-first-key" type="xsd:string" use="optional"/>
                         <xsd:attribute name="last-value" type="xsd:boolean" use="optional"/>
@@ -3158,6 +3159,14 @@
                </xsd:annotation>
             </xsd:element>
 
+            <xsd:element name="default-group-rebalance-pause-dispatch" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     whether to pause dispatch when rebalancing groups
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
             <xsd:element name="default-group-buckets" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
                <xsd:annotation>
                   <xsd:documentation>
diff --git a/docs/user-manual/en/message-grouping.md b/docs/user-manual/en/message-grouping.md
index 405b868..e71efc7 100644
--- a/docs/user-manual/en/message-grouping.md
+++ b/docs/user-manual/en/message-grouping.md
@@ -152,10 +152,13 @@ via the management API or managment console by invoking `resetAllGroups`
 
 By setting `group-rebalance` to `true` at the queue level, every time a consumer is added it will trigger a rebalance/reset of the groups.
 
+As noted above, when group rebalance is done, there is a risk you may have inflight messages being processed, by default the broker will continue to dispatch whilst rebalance is occuring. To ensure that inflight messages are processed before dispatch of new messages post rebalance, 
+to different consumers, you can set `group-rebalance-pause-dispatch` to `true` which will cause the dispatch to pause whilst rebalance occurs, until all inflight messages are processed.   
+
 ```xml
 <address name="foo.bar">
    <multicast>
-      <queue name="orders1" group-rebalance="true"/>
+      <queue name="orders1" group-rebalance="true" group-rebalance-pause-dispatch="true"/>
    </multicast>
 </address>
 ```
@@ -164,8 +167,8 @@ Or on auto-create when using the JMS Client by using address parameters when
 creating the destination used by the consumer.
 
 ```java
-Queue queue = session.createQueue("my.destination.name?group-rebalance=true");
-Topic topic = session.createTopic("my.destination.name?group-rebalance=true");
+Queue queue = session.createQueue("my.destination.name?group-rebalance=true&group-rebalance-pause-dispatch=true");
+Topic topic = session.createTopic("my.destination.name?group-rebalance=true&group-rebalance-pause-dispatch=true");
 ```
 
 Also the default for all queues under and address can be defaulted using the 
@@ -174,11 +177,13 @@ Also the default for all queues under and address can be defaulted using the
 ```xml
 <address-setting match="my.address">
    <default-group-rebalance>true</default-group-rebalance>
+   <default-group-rebalance-pause-dispatch>true</default-group-rebalance-pause-dispatch>
 </address-setting>
 ```
 
 
 By default, `default-group-rebalance` is `false` meaning this is disabled/off.
+By default, `default-group-rebalance-pause-dispatch` is `false` meaning this is disabled/off.
 
 
 #### Group Buckets
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
index 284d0e8..7b033b1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
@@ -518,6 +518,155 @@ public class GroupingTest extends JMSTestBase {
       ctx.close();
    }
 
+   /**
+    * This tests ensures that when we have group rebalance and pause dispatch,
+    * the broker pauses dispatch of new messages to consumers whilst rebalance and awaits existing inflight messages to be handled before restarting dispatch with new reblanced group allocations,
+    * this allows us to provide a guarantee of message ordering even with rebalance, at the expense that during rebalance dispatch will pause till all consumers with inflight messages are handled.
+    *
+    * @throws Exception
+    */
+   @Test
+   public void testGroupRebalancePauseDispatch() throws Exception {
+      ConnectionFactory fact = getCF();
+      Assume.assumeFalse("only makes sense withOUT auto-group", ((ActiveMQConnectionFactory) fact).isAutoGroup());
+      Assume.assumeTrue("only makes sense withOUT explicit group-id", ((ActiveMQConnectionFactory) fact).getGroupID() == null);
+      String testQueueName = getName() + "_group_rebalance";
+
+      server.createQueue(new QueueConfiguration(testQueueName).setRoutingType(RoutingType.ANYCAST).setGroupRebalance(true).setGroupRebalancePauseDispatch(true));
+
+      JMSContext ctx = addContext(getCF().createContext(JMSContext.SESSION_TRANSACTED));
+
+      Queue testQueue = ctx.createQueue(testQueueName);
+
+
+      final String groupID1 = "groupA";
+      final String groupID2 = "groupB";
+      final String groupID3 = "groupC";
+
+
+      JMSProducer producer1 = ctx.createProducer().setProperty("JMSXGroupID", groupID1);
+      JMSProducer producer2 = ctx.createProducer().setProperty("JMSXGroupID", groupID2);
+      JMSProducer producer3 = ctx.createProducer().setProperty("JMSXGroupID", groupID3);
+
+      JMSConsumer consumer1 = ctx.createConsumer(testQueue);
+      JMSConsumer consumer2 = ctx.createConsumer(testQueue);
+
+      ctx.start();
+
+      for (int j = 0; j < 10; j++) {
+         send(ctx, testQueue, groupID1, producer1, j);
+      }
+      for (int j = 10; j < 20; j++) {
+         send(ctx, testQueue, groupID2, producer2, j);
+      }
+      for (int j = 20; j < 30; j++) {
+         send(ctx, testQueue, groupID3, producer3, j);
+      }
+
+      ctx.commit();
+
+      //First set of msgs should go to the first consumer only
+      for (int j = 0; j < 10; j++) {
+         TextMessage tm = (TextMessage) consumer1.receive(10000);
+         assertNotNull(tm);
+         tm.acknowledge();
+         assertEquals("Message" + j, tm.getText());
+         assertEquals(tm.getStringProperty("JMSXGroupID"), groupID1);
+      }
+      ctx.commit();
+
+
+      //Second set of msgs should go to the second consumers only
+      for (int j = 10; j < 20; j++) {
+         TextMessage tm = (TextMessage) consumer2.receive(10000);
+
+         assertNotNull(tm);
+
+         tm.acknowledge();
+
+         assertEquals("Message" + j, tm.getText());
+
+         assertEquals(tm.getStringProperty("JMSXGroupID"), groupID2);
+      }
+      ctx.commit();
+
+
+      //Add new consumer but where third set we have not consumed so should inflight, that should cause rebalance
+      JMSConsumer consumer3 = ctx.createConsumer(testQueue);
+
+      //Send next set of messages
+      for (int j = 0; j < 10; j++) {
+         send(ctx, testQueue, groupID1, producer1, j);
+      }
+      for (int j = 10; j < 20; j++) {
+         send(ctx, testQueue, groupID2, producer2, j);
+      }
+      for (int j = 20; j < 30; j++) {
+         send(ctx, testQueue, groupID3, producer3, j);
+      }
+      ctx.commit();
+
+      //Ensure we dont get anything on the other consumers, whilst we rebalance and there is inflight messages. - e.g. ensure ordering guarentee.
+      assertNull(consumer2.receiveNoWait());
+      assertNull(consumer3.receiveNoWait());
+
+      //Ensure the inflight set of msgs should go to the first consumer only
+      for (int j = 20; j < 30; j++) {
+         TextMessage tm = (TextMessage) consumer1.receive(10000);
+
+         assertNotNull(tm);
+
+         tm.acknowledge();
+
+         assertEquals("Message" + j, tm.getText());
+
+         assertEquals(tm.getStringProperty("JMSXGroupID"), groupID3);
+      }
+      ctx.commit();
+
+      //Now we cleared the "inflightm messages" expect that consumers 1,2 and 3 are rebalanced and the messages sent earlier are received.
+
+      //First set of msgs should go to the first consumer only
+      for (int j = 0; j < 10; j++) {
+         TextMessage tm = (TextMessage) consumer1.receive(10000);
+         assertNotNull(tm);
+         tm.acknowledge();
+         assertEquals("Message" + j, tm.getText());
+         assertEquals(tm.getStringProperty("JMSXGroupID"), groupID1);
+      }
+
+      //Second set of msgs should go to the second consumers only
+      for (int j = 10; j < 20; j++) {
+         TextMessage tm = (TextMessage) consumer2.receive(10000);
+
+         assertNotNull(tm);
+
+         tm.acknowledge();
+
+         assertEquals("Message" + j, tm.getText());
+
+         assertEquals(tm.getStringProperty("JMSXGroupID"), groupID2);
+      }
+
+      //Third set of msgs should now go to the third consumer now
+      for (int j = 20; j < 30; j++) {
+         TextMessage tm = (TextMessage) consumer3.receive(10000);
+
+         assertNotNull(tm);
+
+         tm.acknowledge();
+
+         assertEquals("Message" + j, tm.getText());
+
+         assertEquals(tm.getStringProperty("JMSXGroupID"), groupID3);
+      }
+
+      ctx.commit();
+
+      ctx.close();
+   }
+
+
 
    @Test
    public void testGroupFirstKey() throws Exception {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index d969998..663954b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -96,6 +96,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
          }
 
          @Override
+         public boolean isGroupRebalancePauseDispatch() {
+            return (Boolean) proxy.retrieveAttributeValue("groupRebalancePauseDispatch");
+         }
+
+         @Override
          public int getGroupBuckets() {
             return (Integer) proxy.retrieveAttributeValue("groupBuckets", Integer.class);
          }
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index b1896bd..1ab01c1 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -188,6 +188,16 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public boolean isGroupRebalancePauseDispatch() {
+      return false;
+   }
+
+   @Override
+   public void setGroupRebalancePauseDispatch(boolean groupRebalancePauseDisptach) {
+
+   }
+
+   @Override
    public SimpleString getGroupFirstKey() {
       return null;
    }


Mime
View raw message