geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjra...@apache.org
Subject [geode] branch develop updated: GEODE-6551: Fix Constraints Check in Alter Region (#3349)
Date Fri, 05 Apr 2019 11:58:44 GMT
This is an automated email from the ASF dual-hosted git repository.

jjramos pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5c7599c  GEODE-6551: Fix Constraints Check in Alter Region (#3349)
5c7599c is described below

commit 5c7599c921114b5354e04e95c5990c43c9cd7a8f
Author: Juan José Ramos <jujoramos@users.noreply.github.com>
AuthorDate: Fri Apr 5 12:58:30 2019 +0100

    GEODE-6551: Fix Constraints Check in Alter Region (#3349)
    
    * GEODE-6551: Fix Constraints Check in Alter Region
    
    Some persistence constraints were not correctly validated for a
    partitioned region before associating an async-event-queue and/or
    gateway-sender, leaving the region and the cluster configuration
    service inconsistent.
    
    - Fixed minor warnings.
    - Fixed the internal validation implemented by PartitionedRegion.
    - Some method's signatures were changed from "parallelGatewaySender" to
    "parallelAsynchronousEventDispatcher" as they are used alike for
    both a gateway-sender and async-event-queue.
---
 .../geode/internal/cache/PartitionedRegion.java    | 112 ++++---
 .../cli/functions/RegionAlterFunction.java         |  98 +++---
 .../internal/cache/PartitionedRegionTest.java      | 142 ++++++---
 .../cli/functions/RegionAlterFunctionTest.java     |  34 +-
 .../cache/wan/misc/WanValidationsDUnitTest.java    |  49 ++-
 .../cli/commands/AlterRegionCommandDUnitTest.java  | 344 +++++++++++++++++++++
 6 files changed, 614 insertions(+), 165 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 3deb65d..2fdd4cb 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -90,6 +90,7 @@ import org.apache.geode.cache.TimeoutException;
 import org.apache.geode.cache.TransactionDataNotColocatedException;
 import org.apache.geode.cache.TransactionDataRebalancedException;
 import org.apache.geode.cache.TransactionException;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import org.apache.geode.cache.client.internal.ClientMetadataService;
 import org.apache.geode.cache.execute.EmptyRegionFunctionException;
@@ -231,6 +232,7 @@ import org.apache.geode.internal.cache.versions.VersionStamp;
 import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
 import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
 import org.apache.geode.internal.cache.wan.GatewaySenderException;
 import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
@@ -1221,10 +1223,11 @@ public class PartitionedRegion extends LocalRegion
     }
   }
 
-  public void updatePRConfigWithNewSetOfGatewaySenders(Set<String> gatewaySendersToAdd) {
+  public void updatePRConfigWithNewSetOfAsynchronousEventDispatchers(
+      Set<String> asynchronousEventDispatchers) {
     PartitionRegionHelper.assignBucketsToPartitions(this);
     updatePartitionRegionConfig(prConfig -> {
-      prConfig.setGatewaySenderIds(gatewaySendersToAdd);
+      prConfig.setGatewaySenderIds(asynchronousEventDispatchers);
     });
   }
 
@@ -1375,7 +1378,7 @@ public class PartitionedRegion extends LocalRegion
       prConfig = getPRRoot().get(getRegionIdentifier());
 
       if (prConfig == null) {
-        validateParallelGatewaySenderIds();
+        validateParallelAsynchronousEventDispatcherIds();
         this.partitionedRegionId = generatePRId(getSystem());
         prConfig = new PartitionRegionConfig(this.partitionedRegionId, this.getFullPath(),
             prAttribs, this.getScope(), getAttributes().getEvictionAttributes(),
@@ -1480,35 +1483,77 @@ public class PartitionedRegion extends LocalRegion
     }
   }
 
-  public void validateParallelGatewaySenderIds() throws PRLocallyDestroyedException {
-    validateParallelGatewaySenderIds(this.getParallelGatewaySenderIds());
+  public void validateParallelAsynchronousEventDispatcherIds() throws PRLocallyDestroyedException {
+    validateParallelAsynchronousEventDispatcherIds(this.getParallelGatewaySenderIds());
   }
 
-  /*
-   * filterOutNonParallelGatewaySenders takes in a set of gateway sender IDs and returns
-   * a set of parallel gateway senders present in the input set.
+  /**
+   * Filters out non parallel GatewaySenders.
+   *
+   * @param senderIds set of gateway sender IDs.
+   * @return set of parallel gateway senders present in the input set.
    */
   public Set<String> filterOutNonParallelGatewaySenders(Set<String> senderIds) {
-    Set<String> allParallelSenders = cache.getAllGatewaySenders().parallelStream()
-        .filter(GatewaySender::isParallel).map(GatewaySender::getId).collect(toSet());
-    Set<String> parallelSenders = new HashSet<>();
-    senderIds.parallelStream().forEach(gatewaySenderId -> {
-      if (allParallelSenders.contains(gatewaySenderId)) {
-        parallelSenders.add(gatewaySenderId);
-      }
-    });
+    Set<String> allParallelSenders = cache.getAllGatewaySenders()
+        .parallelStream()
+        .filter(GatewaySender::isParallel)
+        .map(GatewaySender::getId)
+        .collect(toSet());
+
+    Set<String> parallelSenders = new HashSet<>(senderIds);
+    parallelSenders.retainAll(allParallelSenders);
+
     return parallelSenders;
   }
 
-  public void validateParallelGatewaySenderIds(Set<String> parallelGatewaySenderIds)
-      throws PRLocallyDestroyedException {
-    for (String senderId : parallelGatewaySenderIds) {
-      for (PartitionRegionConfig config : getPRRoot().values()) {
-        if (config.getGatewaySenderIds().contains(senderId)) {
+  /**
+   * Filters out non parallel AsyncEventQueues.
+   *
+   * @param queueIds set of async-event-queue IDs.
+   * @return set of parallel async-event-queues present in the input set.
+   */
+  public Set<String> filterOutNonParallelAsyncEventQueues(Set<String> queueIds) {
+    Set<String> allParallelQueues = cache.getAsyncEventQueues()
+        .parallelStream()
+        .filter(AsyncEventQueue::isParallel)
+        .map(asyncEventQueue -> AsyncEventQueueImpl
+            .getAsyncEventQueueIdFromSenderId(asyncEventQueue.getId()))
+        .collect(toSet());
+
+    Set<String> parallelAsyncEventQueues = new HashSet<>(queueIds);
+    parallelAsyncEventQueues.retainAll(allParallelQueues);
+
+    return parallelAsyncEventQueues;
+  }
+
+  public void validateParallelAsynchronousEventDispatcherIds(
+      Set<String> asynchronousEventDispatcherIds) throws PRLocallyDestroyedException {
+    for (String dispatcherId : asynchronousEventDispatcherIds) {
+      GatewaySender sender = getCache().getGatewaySender(dispatcherId);
+      AsyncEventQueue asyncEventQueue = getCache()
+          .getAsyncEventQueue(AsyncEventQueueImpl.getAsyncEventQueueIdFromSenderId(dispatcherId));
+
+      // Can't attach a non persistent parallel gateway / async-event-queue to a persistent
+      // partitioned region.
+      if (getDataPolicy().withPersistence()) {
+        if ((sender != null) && (!sender.isPersistenceEnabled())) {
+          throw new GatewaySenderConfigurationException(String.format(
+              "Non persistent gateway sender %s can not be attached to persistent region %s",
+              dispatcherId, getFullPath()));
+        } else if ((asyncEventQueue != null) && (!asyncEventQueue.isPersistent())) {
+          throw new AsyncEventQueueConfigurationException(String.format(
+              "Non persistent asynchronous event queue %s can not be attached to persistent region %s",
+              dispatcherId, getFullPath()));
+        }
+      }
+
+      for (PartitionRegionConfig config : this.prRoot.values()) {
+        if (config.getGatewaySenderIds().contains(dispatcherId)) {
           if (this.getFullPath().equals(config.getFullPath())) {
             // The sender is already attached to this region
             continue;
           }
+
           Map<String, PartitionedRegion> colocationMap =
               ColocationHelper.getAllColocationRegions(this);
           if (!colocationMap.isEmpty()) {
@@ -1518,28 +1563,23 @@ public class PartitionedRegion extends LocalRegion
               int prID = config.getPRId();
               PartitionedRegion colocatedPR = PartitionedRegion.getPRFromId(prID);
               PartitionedRegion leader = ColocationHelper.getLeaderRegion(colocatedPR);
+
               if (colocationMap.containsValue(leader)) {
                 continue;
               } else {
-                throw new IllegalStateException(
-                    String.format(
-                        "Non colocated regions %s, %s cannot have the same parallel %s id %s configured.",
-                        new Object[] {this.getFullPath(), config.getFullPath(),
-                            senderId.contains(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX)
-                                ? "async event queue" : "gateway sender",
-                            senderId}));
+                throw new IllegalStateException(String.format(
+                    "Non colocated regions %s, %s cannot have the same parallel %s id %s configured.",
+                    this.getFullPath(), config.getFullPath(),
+                    (asyncEventQueue != null ? "async event queue" : "gateway sender"),
+                    dispatcherId));
               }
             }
           } else {
-            throw new IllegalStateException(
-                String.format(
-                    "Non colocated regions %s, %s cannot have the same parallel %s id %s configured.",
-                    new Object[] {this.getFullPath(), config.getFullPath(),
-                        senderId.contains(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX)
-                            ? "async event queue" : "gateway sender",
-                        senderId}));
+            throw new IllegalStateException(String.format(
+                "Non colocated regions %s, %s cannot have the same parallel %s id %s configured.",
+                this.getFullPath(), config.getFullPath(),
+                (asyncEventQueue != null ? "async event queue" : "gateway sender"), dispatcherId));
           }
-
         }
       }
     }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/RegionAlterFunction.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/RegionAlterFunction.java
index bd148d9..0db4e8c 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/RegionAlterFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/RegionAlterFunction.java
@@ -18,6 +18,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 import org.apache.logging.log4j.Logger;
 
@@ -47,9 +48,7 @@ import org.apache.geode.management.internal.configuration.domain.DeclarableTypeI
  */
 public class RegionAlterFunction extends CliFunction<RegionConfig> {
   private static final Logger logger = LogService.getLogger();
-
   private static final long serialVersionUID = -4846425364943216425L;
-  private static final String NULLSTR = "null";
 
   @Override
   public boolean isHA() {
@@ -57,25 +56,35 @@ public class RegionAlterFunction extends CliFunction<RegionConfig> {
   }
 
   @Override
+  public String getId() {
+    return RegionAlterFunction.class.getName();
+  }
+
+  @Override
   public CliFunctionResult executeFunction(FunctionContext<RegionConfig> context) {
     Cache cache = ((InternalCache) context.getCache()).getCacheForProcessingClientRequests();
     RegionConfig deltaConfig = context.getArguments();
     alterRegion(cache, deltaConfig);
+
     return new CliFunctionResult(context.getMemberName(), Result.Status.OK,
         String.format("Region %s altered", deltaConfig.getName()));
   }
 
+  private static <T> Predicate<T> not(Predicate<T> t) {
+    return t.negate();
+  }
+
   void alterRegion(Cache cache, RegionConfig deltaConfig) {
     final String regionPathString = deltaConfig.getName();
 
     AbstractRegion region = (AbstractRegion) cache.getRegion(regionPathString);
     if (region == null) {
-      throw new IllegalArgumentException(String.format(
-          "Region does not exist: %s", regionPathString));
+      throw new IllegalArgumentException(
+          String.format("Region does not exist: %s", regionPathString));
     }
 
+    AttributesMutator<?, ?> mutator = region.getAttributesMutator();
     RegionAttributesType regionAttributes = deltaConfig.getRegionAttributes();
-    AttributesMutator mutator = region.getAttributesMutator();
 
     if (regionAttributes.isCloningEnabled() != null) {
       mutator.setCloningEnabled(regionAttributes.isCloningEnabled());
@@ -94,55 +103,51 @@ public class RegionAlterFunction extends CliFunction<RegionConfig> {
 
     // Alter expiration attributes
     updateExpirationAttributes(cache, regionAttributes.getEntryIdleTime(),
-        region.getEntryIdleTimeout(), p -> mutator.setEntryIdleTimeout(p),
-        p -> mutator.setCustomEntryIdleTimeout(p));
+        region.getEntryIdleTimeout(), mutator::setEntryIdleTimeout,
+        mutator::setCustomEntryIdleTimeout);
     updateExpirationAttributes(cache, regionAttributes.getEntryTimeToLive(),
-        region.getEntryTimeToLive(), p -> mutator.setEntryTimeToLive(p),
-        p -> mutator.setCustomEntryTimeToLive(p));
+        region.getEntryTimeToLive(), mutator::setEntryTimeToLive,
+        mutator::setCustomEntryTimeToLive);
     updateExpirationAttributes(cache, regionAttributes.getRegionIdleTime(),
-        region.getRegionIdleTimeout(), p -> mutator.setRegionIdleTimeout(p), null);
+        region.getRegionIdleTimeout(), mutator::setRegionIdleTimeout, null);
     updateExpirationAttributes(cache, regionAttributes.getRegionTimeToLive(),
-        region.getRegionTimeToLive(), p -> mutator.setRegionTimeToLive(p), null);
-
+        region.getRegionTimeToLive(), mutator::setRegionTimeToLive, null);
 
     final Set<String> newGatewaySenderIds = regionAttributes.getGatewaySenderIdsAsSet();
     final Set<String> newAsyncEventQueueIds = regionAttributes.getAsyncEventQueueIdsAsSet();
 
     if (region instanceof PartitionedRegion) {
       Set<String> senderIds = new HashSet<>();
+
       if (newGatewaySenderIds != null) {
         validateParallelGatewaySenderIDs((PartitionedRegion) region, newGatewaySenderIds);
         senderIds.addAll(newGatewaySenderIds);
       } else if (region.getGatewaySenderIds() != null) {
         senderIds.addAll(region.getAllGatewaySenderIds());
       }
+
       if (newAsyncEventQueueIds != null) {
-        validateParallelGatewaySenderIDs((PartitionedRegion) region, newAsyncEventQueueIds);
+        validateParallelAsynchronousEventQueueIDs((PartitionedRegion) region,
+            newAsyncEventQueueIds);
         senderIds.addAll(newAsyncEventQueueIds);
       } else if (region.getAsyncEventQueueIds() != null) {
         senderIds.addAll(region.getAsyncEventQueueIds());
       }
-      ((PartitionedRegion) region).updatePRConfigWithNewSetOfGatewaySenders(senderIds);
+
+      ((PartitionedRegion) region)
+          .updatePRConfigWithNewSetOfAsynchronousEventDispatchers(senderIds);
     }
 
     // Alter Gateway Sender Ids
     if (newGatewaySenderIds != null) {
-      // Remove old gateway sender ids that aren't in the new list
       Set<String> oldGatewaySenderIds = region.getGatewaySenderIds();
-      if (!oldGatewaySenderIds.isEmpty()) {
-        for (String gatewaySenderId : oldGatewaySenderIds) {
-          if (!newGatewaySenderIds.contains(gatewaySenderId)) {
-            mutator.removeGatewaySenderId(gatewaySenderId);
-          }
-        }
-      }
+      // Remove old gateway sender ids that aren't in the new list
+      oldGatewaySenderIds.stream().filter(not(newGatewaySenderIds::contains))
+          .forEach(mutator::removeGatewaySenderId);
 
       // Add new gateway sender ids that don't already exist
-      for (String gatewaySenderId : newGatewaySenderIds) {
-        if (!oldGatewaySenderIds.contains(gatewaySenderId)) {
-          mutator.addGatewaySenderId(gatewaySenderId);
-        }
-      }
+      newGatewaySenderIds.stream().filter(not(oldGatewaySenderIds::contains))
+          .forEach(mutator::addGatewaySenderId);
 
       if (logger.isDebugEnabled()) {
         logger.debug("Region successfully altered - gateway sender IDs");
@@ -151,23 +156,15 @@ public class RegionAlterFunction extends CliFunction<RegionConfig> {
 
     // Alter Async Queue Ids
     if (newAsyncEventQueueIds != null) {
+      Set<String> oldAsyncEventQueueIds = region.getAsyncEventQueueIds();
 
       // Remove old async event queue ids that aren't in the new list
-      Set<String> oldAsyncEventQueueIds = region.getAsyncEventQueueIds();
-      if (!oldAsyncEventQueueIds.isEmpty()) {
-        for (String asyncEventQueueId : oldAsyncEventQueueIds) {
-          if (!newAsyncEventQueueIds.contains(asyncEventQueueId)) {
-            mutator.removeAsyncEventQueueId(asyncEventQueueId);
-          }
-        }
-      }
+      oldAsyncEventQueueIds.stream().filter(not(newAsyncEventQueueIds::contains))
+          .forEach(mutator::removeAsyncEventQueueId);
 
       // Add new async event queue ids that don't already exist
-      for (String asyncEventQueueId : newAsyncEventQueueIds) {
-        if (!oldAsyncEventQueueIds.contains(asyncEventQueueId)) {
-          mutator.addAsyncEventQueueId(asyncEventQueueId);
-        }
-      }
+      newAsyncEventQueueIds.stream().filter(not(oldAsyncEventQueueIds::contains))
+          .forEach(mutator::addAsyncEventQueueId);
 
       if (logger.isDebugEnabled()) {
         logger.debug("Region successfully altered - async event queue IDs");
@@ -226,8 +223,7 @@ public class RegionAlterFunction extends CliFunction<RegionConfig> {
 
   private void updateExpirationAttributes(Cache cache,
       RegionAttributesType.ExpirationAttributesType newAttributes,
-      ExpirationAttributes existingAttributes,
-      Consumer<ExpirationAttributes> mutator1,
+      ExpirationAttributes existingAttributes, Consumer<ExpirationAttributes> mutator1,
       Consumer<CustomExpiry> mutator2) {
     if (newAttributes == null) {
       return;
@@ -236,6 +232,7 @@ public class RegionAlterFunction extends CliFunction<RegionConfig> {
     if (newAttributes.hasTimoutOrAction() && existingAttributes != null) {
       int existingTimeout = existingAttributes.getTimeout();
       ExpirationAction existingAction = existingAttributes.getAction();
+
       if (newAttributes.getTimeout() != null) {
         existingTimeout = Integer.parseInt(newAttributes.getTimeout());
       }
@@ -243,6 +240,7 @@ public class RegionAlterFunction extends CliFunction<RegionConfig> {
       if (newAttributes.getAction() != null) {
         existingAction = ExpirationAction.fromXmlString(newAttributes.getAction());
       }
+
       mutator1.accept(new ExpirationAttributes(existingTimeout, existingAction));
     }
 
@@ -252,6 +250,7 @@ public class RegionAlterFunction extends CliFunction<RegionConfig> {
 
     if (newAttributes.hasCustomExpiry()) {
       DeclarableType newCustomExpiry = newAttributes.getCustomExpiry();
+
       if (newCustomExpiry.equals(DeclarableType.EMPTY)) {
         mutator2.accept(null);
       } else {
@@ -264,19 +263,24 @@ public class RegionAlterFunction extends CliFunction<RegionConfig> {
     }
   }
 
-
   private void validateParallelGatewaySenderIDs(PartitionedRegion region,
       Set<String> newGatewaySenderIds) {
     try {
       Set<String> parallelSenders = region.filterOutNonParallelGatewaySenders(newGatewaySenderIds);
-      region.validateParallelGatewaySenderIds(parallelSenders);
+      region.validateParallelAsynchronousEventDispatcherIds(parallelSenders);
     } catch (PRLocallyDestroyedException e) {
       throw new IllegalStateException("Partitioned Region not found registered", e);
     }
   }
 
-  @Override
-  public String getId() {
-    return RegionAlterFunction.class.getName();
+  private void validateParallelAsynchronousEventQueueIDs(PartitionedRegion region,
+      Set<String> newAsyncEventQueueIds) {
+    try {
+      Set<String> parallelSenders =
+          region.filterOutNonParallelAsyncEventQueues(newAsyncEventQueueIds);
+      region.validateParallelAsynchronousEventDispatcherIds(parallelSenders);
+    } catch (PRLocallyDestroyedException e) {
+      throw new IllegalStateException("Partitioned Region not found registered", e);
+    }
   }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
index fb9f37b..8c48a94 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
@@ -33,6 +34,7 @@ import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import junitparams.JUnitParamsRunner;
 import junitparams.Parameters;
@@ -45,6 +47,8 @@ import org.apache.geode.cache.CacheLoader;
 import org.apache.geode.cache.CacheWriter;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.control.InternalResourceManager;
@@ -53,14 +57,9 @@ import org.apache.geode.test.fake.Fakes;
 
 @RunWith(JUnitParamsRunner.class)
 public class PartitionedRegionTest {
-
-  String regionName = "prTestRegion";
-
-  InternalCache internalCache;
-
-  PartitionedRegion partitionedRegion;
-
-  Properties gemfireProperties = new Properties();
+  private InternalCache internalCache;
+  private PartitionedRegion partitionedRegion;
+  private Properties gemfireProperties = new Properties();
 
   @Before
   public void setup() {
@@ -69,25 +68,38 @@ public class PartitionedRegionTest {
     InternalResourceManager resourceManager =
         mock(InternalResourceManager.class, RETURNS_DEEP_STUBS);
     when(internalCache.getInternalResourceManager()).thenReturn(resourceManager);
+    @SuppressWarnings("deprecation")
     AttributesFactory attributesFactory = new AttributesFactory();
     attributesFactory.setPartitionAttributes(
         new PartitionAttributesFactory().setTotalNumBuckets(1).setRedundantCopies(1).create());
-    partitionedRegion = new PartitionedRegion(regionName, attributesFactory.create(),
-        null, internalCache, mock(InternalRegionArguments.class));
+    partitionedRegion = new PartitionedRegion("prTestRegion", attributesFactory.create(), null,
+        internalCache, mock(InternalRegionArguments.class));
     DistributedSystem mockDistributedSystem = mock(DistributedSystem.class);
     when(internalCache.getDistributedSystem()).thenReturn(mockDistributedSystem);
     when(mockDistributedSystem.getProperties()).thenReturn(gemfireProperties);
   }
 
+  @SuppressWarnings("unused")
+  private Object[] parametersToTestUpdatePRNodeInformation() {
+    CacheLoader mockLoader = mock(CacheLoader.class);
+    CacheWriter mockWriter = mock(CacheWriter.class);
+    return new Object[] {
+        new Object[] {mockLoader, null, (byte) 0x01},
+        new Object[] {null, mockWriter, (byte) 0x02},
+        new Object[] {mockLoader, mockWriter, (byte) 0x03},
+        new Object[] {null, null, (byte) 0x00}
+    };
+  }
+
   @Test
   @Parameters(method = "parametersToTestUpdatePRNodeInformation")
   public void verifyPRConfigUpdatedAfterLoaderUpdate(CacheLoader mockLoader, CacheWriter mockWriter,
-      byte configByte) {
+      @SuppressWarnings("unused") byte configByte) {
     LocalRegion prRoot = mock(LocalRegion.class);
     PartitionRegionConfig mockConfig = mock(PartitionRegionConfig.class);
     PartitionedRegion prSpy = spy(partitionedRegion);
 
-    doReturn(prRoot).when(prSpy).getPRRoot();
+    when(prSpy.getPRRoot()).thenReturn(prRoot);
     when(prRoot.get(prSpy.getRegionIdentifier())).thenReturn(mockConfig);
 
     InternalDistributedMember ourMember = prSpy.getDistributionManager().getId();
@@ -99,8 +111,8 @@ public class PartitionedRegionTest {
     when(ourNode.getMemberId()).thenReturn(ourMember);
     when(otherNode1.getMemberId()).thenReturn(otherMember1);
     when(otherNode2.getMemberId()).thenReturn(otherMember2);
-    when(ourNode.isCacheLoaderAttached()).thenReturn(mockLoader == null ? false : true);
-    when(ourNode.isCacheWriterAttached()).thenReturn(mockWriter == null ? false : true);
+    when(ourNode.isCacheLoaderAttached()).thenReturn(mockLoader != null);
+    when(ourNode.isCacheWriterAttached()).thenReturn(mockWriter != null);
 
 
     VersionedArrayList prNodes = new VersionedArrayList();
@@ -118,7 +130,7 @@ public class PartitionedRegionTest {
     prSpy.updatePRNodeInformation();
 
     Node verifyOurNode = null;
-    assertThat(mockConfig.getNodes().contains(ourNode));
+    assertThat(mockConfig.getNodes().contains(ourNode)).isTrue();
     for (Node node : mockConfig.getNodes()) {
       if (node.getMemberId().equals(ourMember)) {
         verifyOurNode = node;
@@ -130,23 +142,12 @@ public class PartitionedRegionTest {
     verify(prRoot).put(prSpy.getRegionIdentifier(), mockConfig);
 
     assertThat(verifyOurNode).isNotNull();
-    assertThat(verifyOurNode.isCacheLoaderAttached()).isEqualTo(mockLoader == null ? false : true);
-    assertThat(verifyOurNode.isCacheWriterAttached()).isEqualTo(mockWriter == null ? false : true);
-  }
-
-  private Object[] parametersToTestUpdatePRNodeInformation() {
-    CacheLoader mockLoader = mock(CacheLoader.class);
-    CacheWriter mockWriter = mock(CacheWriter.class);
-    return new Object[] {
-        new Object[] {mockLoader, null, (byte) 0x01},
-        new Object[] {null, mockWriter, (byte) 0x02},
-        new Object[] {mockLoader, mockWriter, (byte) 0x03},
-        new Object[] {null, null, (byte) 0x00}
-    };
+    assertThat(verifyOurNode.isCacheLoaderAttached()).isEqualTo(mockLoader != null);
+    assertThat(verifyOurNode.isCacheWriterAttached()).isEqualTo(mockWriter != null);
   }
 
   @Test
-  public void getBucketNodeForReadOrWriteReturnsPrimaryNodeForRegisterInterest() throws Exception {
+  public void getBucketNodeForReadOrWriteReturnsPrimaryNodeForRegisterInterest() {
     int bucketId = 0;
     InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
     InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
@@ -155,7 +156,6 @@ public class PartitionedRegionTest {
     PartitionedRegion spyPR = spy(partitionedRegion);
     doReturn(primaryMember).when(spyPR).getNodeForBucketWrite(eq(bucketId), isNull());
     doReturn(secondaryMember).when(spyPR).getNodeForBucketRead(eq(bucketId));
-
     InternalDistributedMember memberForRegisterInterestRead =
         spyPR.getBucketNodeForReadOrWrite(bucketId, clientEvent);
 
@@ -164,8 +164,7 @@ public class PartitionedRegionTest {
   }
 
   @Test
-  public void getBucketNodeForReadOrWriteReturnsSecondaryNodeForNonRegisterInterest()
-      throws Exception {
+  public void getBucketNodeForReadOrWriteReturnsSecondaryNodeForNonRegisterInterest() {
     int bucketId = 0;
     InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
     InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
@@ -174,7 +173,6 @@ public class PartitionedRegionTest {
     PartitionedRegion spyPR = spy(partitionedRegion);
     doReturn(primaryMember).when(spyPR).getNodeForBucketWrite(eq(bucketId), isNull());
     doReturn(secondaryMember).when(spyPR).getNodeForBucketRead(eq(bucketId));
-
     InternalDistributedMember memberForRegisterInterestRead =
         spyPR.getBucketNodeForReadOrWrite(bucketId, clientEvent);
 
@@ -183,15 +181,13 @@ public class PartitionedRegionTest {
   }
 
   @Test
-  public void getBucketNodeForReadOrWriteReturnsSecondaryNodeWhenClientEventIsNotPresent()
-      throws Exception {
+  public void getBucketNodeForReadOrWriteReturnsSecondaryNodeWhenClientEventIsNotPresent() {
     int bucketId = 0;
     InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
     InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
     PartitionedRegion spyPR = spy(partitionedRegion);
     doReturn(primaryMember).when(spyPR).getNodeForBucketWrite(eq(bucketId), isNull());
     doReturn(secondaryMember).when(spyPR).getNodeForBucketRead(eq(bucketId));
-
     InternalDistributedMember memberForRegisterInterestRead =
         spyPR.getBucketNodeForReadOrWrite(bucketId, null);
 
@@ -200,8 +196,7 @@ public class PartitionedRegionTest {
   }
 
   @Test
-  public void getBucketNodeForReadOrWriteReturnsSecondaryNodeWhenClientEventOperationIsNotPresent()
-      throws Exception {
+  public void getBucketNodeForReadOrWriteReturnsSecondaryNodeWhenClientEventOperationIsNotPresent() {
     int bucketId = 0;
     InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
     InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
@@ -210,7 +205,6 @@ public class PartitionedRegionTest {
     PartitionedRegion spyPR = spy(partitionedRegion);
     doReturn(primaryMember).when(spyPR).getNodeForBucketWrite(eq(bucketId), isNull());
     doReturn(secondaryMember).when(spyPR).getNodeForBucketRead(eq(bucketId));
-
     InternalDistributedMember memberForRegisterInterestRead =
         spyPR.getBucketNodeForReadOrWrite(bucketId, null);
 
@@ -221,18 +215,15 @@ public class PartitionedRegionTest {
   @Test
   public void updateBucketMapsForInterestRegistrationWithSetOfKeysFetchesPrimaryBucketsForRead() {
     Integer[] bucketIds = new Integer[] {0, 1};
-
     InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
     InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
     PartitionedRegion spyPR = spy(partitionedRegion);
     doReturn(primaryMember).when(spyPR).getNodeForBucketWrite(anyInt(), isNull());
     doReturn(secondaryMember).when(spyPR).getNodeForBucketRead(anyInt());
-    HashMap<InternalDistributedMember, HashSet<Integer>> nodeToBuckets =
-        new HashMap<InternalDistributedMember, HashSet<Integer>>();
-    Set buckets = Arrays.stream(bucketIds).collect(Collectors.toCollection(HashSet::new));
+    HashMap<InternalDistributedMember, HashSet<Integer>> nodeToBuckets = new HashMap<>();
+    Set<Integer> buckets = Arrays.stream(bucketIds).collect(Collectors.toCollection(HashSet::new));
 
     spyPR.updateNodeToBucketMap(nodeToBuckets, buckets);
-
     verify(spyPR, times(2)).getNodeForBucketWrite(anyInt(), isNull());
   }
 
@@ -245,15 +236,70 @@ public class PartitionedRegionTest {
     PartitionedRegion spyPR = spy(partitionedRegion);
     doReturn(primaryMember).when(spyPR).getNodeForBucketWrite(anyInt(), isNull());
     doReturn(secondaryMember).when(spyPR).getNodeForBucketRead(anyInt());
-    HashMap<InternalDistributedMember, HashMap<Integer, HashSet>> nodeToBuckets =
-        new HashMap<InternalDistributedMember, HashMap<Integer, HashSet>>();
+    HashMap<InternalDistributedMember, HashMap<Integer, HashSet>> nodeToBuckets = new HashMap<>();
     HashSet buckets = Arrays.stream(bucketIds).collect(Collectors.toCollection(HashSet::new));
     HashMap<Integer, HashSet> bucketKeys = new HashMap<>();
-    bucketKeys.put(Integer.valueOf(0), buckets);
+    bucketKeys.put(0, buckets);
 
     spyPR.updateNodeToBucketMap(nodeToBuckets, bucketKeys);
-
     verify(spyPR, times(1)).getNodeForBucketWrite(anyInt(), isNull());
   }
 
+  @Test
+  public void filterOutNonParallelGatewaySendersShouldReturnCorrectly() {
+    GatewaySender parallelSender = mock(GatewaySender.class);
+    when(parallelSender.isParallel()).thenReturn(true);
+    when(parallelSender.getId()).thenReturn("parallel");
+    GatewaySender anotherParallelSender = mock(GatewaySender.class);
+    when(anotherParallelSender.isParallel()).thenReturn(true);
+    when(anotherParallelSender.getId()).thenReturn("anotherParallel");
+    GatewaySender serialSender = mock(GatewaySender.class);
+    when(serialSender.isParallel()).thenReturn(false);
+    when(serialSender.getId()).thenReturn("serial");
+    Set<GatewaySender> mockSenders =
+        Stream.of(parallelSender, anotherParallelSender, serialSender).collect(Collectors.toSet());
+
+    when(internalCache.getAllGatewaySenders()).thenReturn(mockSenders);
+    assertThat(partitionedRegion
+        .filterOutNonParallelGatewaySenders(Stream.of("serial").collect(Collectors.toSet())))
+            .isEmpty();
+    assertThat(partitionedRegion
+        .filterOutNonParallelGatewaySenders(Stream.of("unknownSender").collect(Collectors.toSet())))
+            .isEmpty();
+    assertThat(partitionedRegion.filterOutNonParallelGatewaySenders(
+        Stream.of("parallel", "serial").collect(Collectors.toSet()))).isNotEmpty()
+            .containsExactly("parallel");
+    assertThat(partitionedRegion.filterOutNonParallelGatewaySenders(
+        Stream.of("parallel", "serial", "anotherParallel").collect(Collectors.toSet())))
+            .isNotEmpty().containsExactly("parallel", "anotherParallel");
+  }
+
+  @Test
+  public void filterOutNonParallelAsyncEventQueuesShouldReturnCorrectly() {
+    AsyncEventQueue parallelQueue = mock(AsyncEventQueue.class);
+    when(parallelQueue.isParallel()).thenReturn(true);
+    when(parallelQueue.getId()).thenReturn(getSenderIdFromAsyncEventQueueId("parallel"));
+    AsyncEventQueue anotherParallelQueue = mock(AsyncEventQueue.class);
+    when(anotherParallelQueue.isParallel()).thenReturn(true);
+    when(anotherParallelQueue.getId())
+        .thenReturn(getSenderIdFromAsyncEventQueueId("anotherParallel"));
+    AsyncEventQueue serialQueue = mock(AsyncEventQueue.class);
+    when(serialQueue.isParallel()).thenReturn(false);
+    when(serialQueue.getId()).thenReturn(getSenderIdFromAsyncEventQueueId("serial"));
+    Set<AsyncEventQueue> mockQueues =
+        Stream.of(parallelQueue, anotherParallelQueue, serialQueue).collect(Collectors.toSet());
+
+    when(internalCache.getAsyncEventQueues()).thenReturn(mockQueues);
+    assertThat(partitionedRegion
+        .filterOutNonParallelAsyncEventQueues(Stream.of("serial").collect(Collectors.toSet())))
+            .isEmpty();
+    assertThat(partitionedRegion.filterOutNonParallelAsyncEventQueues(
+        Stream.of("unknownSender").collect(Collectors.toSet()))).isEmpty();
+    assertThat(partitionedRegion.filterOutNonParallelAsyncEventQueues(
+        Stream.of("parallel", "serial").collect(Collectors.toSet()))).isNotEmpty()
+            .containsExactly("parallel");
+    assertThat(partitionedRegion.filterOutNonParallelAsyncEventQueues(
+        Stream.of("parallel", "serial", "anotherParallel").collect(Collectors.toSet())))
+            .isNotEmpty().containsExactly("parallel", "anotherParallel");
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/functions/RegionAlterFunctionTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/functions/RegionAlterFunctionTest.java
index 4a1492b..b0f82a9 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/functions/RegionAlterFunctionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/functions/RegionAlterFunctionTest.java
@@ -53,11 +53,9 @@ import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalCacheForClientAccess;
 
 public class RegionAlterFunctionTest {
-
   private RegionAlterFunction function;
   private RegionConfig config;
   private RegionAttributesType regionAttributes;
-  private InternalCache internalCache;
   private InternalCacheForClientAccess cache;
   private FunctionContext<RegionConfig> context;
   private AttributesMutator mutator;
@@ -78,13 +76,14 @@ public class RegionAlterFunctionTest {
   }
 
   @Before
+  @SuppressWarnings("unchecked")
   public void setUp() throws Exception {
     function = spy(RegionAlterFunction.class);
     config = new RegionConfig();
     regionAttributes = new RegionAttributesType();
     config.setRegionAttributes(regionAttributes);
 
-    internalCache = mock(InternalCache.class);
+    InternalCache internalCache = mock(InternalCache.class);
     cache = mock(InternalCacheForClientAccess.class);
     mutator = mock(AttributesMutator.class);
     evictionMutator = mock(EvictionAttributesMutator.class);
@@ -101,7 +100,7 @@ public class RegionAlterFunctionTest {
   }
 
   @Test
-  public void executeFuntcionHappyPathRetunsStatusOK() {
+  public void executeFunctionHappyPathReturnsStatusOK() {
     doNothing().when(function).alterRegion(any(), any());
     config.setName("regionA");
     CliFunctionResult result = function.executeFunction(context);
@@ -245,6 +244,33 @@ public class RegionAlterFunctionTest {
   }
 
   @Test
+  public void updateWithAsynchronousEventQueues() {
+    regionAttributes.setAsyncEventQueueIds("queue2,queue3");
+    when(region.getAsyncEventQueueIds())
+        .thenReturn(new HashSet<>(Arrays.asList("queue1", "queue2")));
+    function.alterRegion(cache, config);
+
+    verify(mutator).removeAsyncEventQueueId("queue1");
+    verify(mutator, times(0)).removeAsyncEventQueueId("queue2");
+    verify(mutator).addAsyncEventQueueId("queue3");
+
+    // gatewaySender is left intact
+    verify(mutator, times(0)).addGatewaySenderId(any());
+    verify(mutator, times(0)).removeGatewaySenderId(any());
+  }
+
+  @Test
+  public void updateWithEmptyAsynchronousEventQueues() {
+    regionAttributes.setAsyncEventQueueIds("");
+    when(region.getAsyncEventQueueIds())
+        .thenReturn(new HashSet<>(Arrays.asList("queue1", "queue2")));
+    function.alterRegion(cache, config);
+
+    verify(mutator).removeAsyncEventQueueId("queue1");
+    verify(mutator).removeAsyncEventQueueId("queue2");
+  }
+
+  @Test
   public void updateWithCacheListeners() {
     // suppose region has one cacheListener, and we want to replace the oldOne one with the new one
     CacheListener oldOne = mock(CacheListener.class);
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/misc/WanValidationsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/misc/WanValidationsDUnitTest.java
index 3664e43..7453171 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/misc/WanValidationsDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/misc/WanValidationsDUnitTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.cache.wan.misc;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -1297,43 +1298,31 @@ public class WanValidationsDUnitTest extends WANTestBase {
     vm2.invoke(() -> WANTestBase.createReceiverWithBindAddress(lnPort));
   }
 
-
   @Test
-  public void testBug50247_NonPersistentSenderWithPersistentRegion() throws Exception {
+  public void testBug50247_NonPersistentSenderWithPersistentRegion() {
     IgnoredException.addIgnoredException("could not get remote locator information");
     Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
-
     createCacheInVMs(lnPort, vm4, vm5);
 
-    try {
-      vm4.invoke(
-          () -> WANTestBase.createSender("ln1", 2, true, 10, 100, false, false, null, false));
-      vm4.invoke(() -> WANTestBase
-          .createPartitionedRegionWithPersistence(getTestMethodName() + "_PR", "ln1", 1, 100));
-      fail("Expected GatewaySenderException with incompatible gateway sender ids and region");
-    } catch (Exception e) {
-      if (!(e.getCause() instanceof GatewaySenderException)
-          || !(e.getCause().getMessage().contains("can not be attached to persistent region "))) {
-        Assert.fail(
-            "Expected GatewaySenderException with incompatible gateway sender ids and region", e);
-      }
-    }
-
-    try {
-      vm5.invoke(() -> WANTestBase
-          .createPartitionedRegionWithPersistence(getTestMethodName() + "_PR", "ln1", 1, 100));
-      vm5.invoke(
-          () -> WANTestBase.createSender("ln1", 2, true, 10, 100, false, false, null, false));
-      fail("Expected GatewaySenderException with incompatible gateway sender ids and region");
-    } catch (Exception e) {
-      if (!(e.getCause() instanceof GatewaySenderException)
-          || !(e.getCause().getMessage().contains("can not be attached to persistent region "))) {
-        Assert.fail(
-            "Expected GatewaySenderException with incompatible gateway sender ids and region", e);
-      }
-    }
+    vm4.invoke(() -> WANTestBase.createSender("ln1", 2, true, 10, 100, false, false, null, false));
+    assertThatThrownBy(() -> vm4.invoke(() -> WANTestBase
+        .createPartitionedRegionWithPersistence(getTestMethodName() + "_PR", "ln1", 1, 100)))
+            .withFailMessage(
+                "Expected GatewaySenderException with incompatible gateway sender ids and region")
+            .hasRootCauseInstanceOf(GatewaySenderException.class)
+            .hasStackTraceContaining("can not be attached to persistent region ");
+
+    vm5.invoke(() -> WANTestBase.createPartitionedRegionWithPersistence(getTestMethodName() + "_PR",
+        "ln1", 1, 100));
+    assertThatThrownBy(() -> vm5
+        .invoke(() -> WANTestBase.createSender("ln1", 2, true, 10, 100, false, false, null, false)))
+            .withFailMessage(
+                "Expected GatewaySenderException with incompatible gateway sender ids and region")
+            .hasRootCauseInstanceOf(GatewaySenderException.class)
+            .hasStackTraceContaining("can not be attached to persistent region ");
   }
 
+
   /**
    * Test configuration::
    *
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterRegionCommandDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterRegionCommandDUnitTest.java
new file mode 100644
index 0000000..4b82705
--- /dev/null
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterRegionCommandDUnitTest.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.commands;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+import org.apache.geode.cache.configuration.CacheConfig;
+import org.apache.geode.cache.configuration.CacheElement;
+import org.apache.geode.cache.configuration.RegionConfig;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@Category(WanTest.class)
+public class AlterRegionCommandDUnitTest {
+  private static MemberVM locator;
+
+  @Rule
+  public GfshCommandRule gfsh = new GfshCommandRule();
+
+  @Rule
+  public TestName testName = new SerializableTestName();
+
+  @Rule
+  public ClusterStartupRule lsRule = new ClusterStartupRule();
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Before
+  public void before() throws Exception {
+    locator = lsRule.startLocatorVM(0);
+    lsRule.startServerVM(1, locator.getPort());
+
+    gfsh.connectAndVerify(locator);
+    gfsh.executeAndAssertThat(
+        "create disk-store --name=diskStore --dir=" + temporaryFolder.getRoot()).statusIsSuccess();
+  }
+
+  @Test
+  public void alterPartitionRegionWithParallelAsynchronousEventQueueShouldPersistTheChangesIntoTheClusterConfigurationService() {
+    String regionName = testName.getMethodName();
+    String asyncEventQueueName = "asyncEventQueue1";
+
+    gfsh.executeAndAssertThat(
+        "create async-event-queue --parallel=true --persistent=false --listener=org.apache.geode.internal.cache.wan.MyAsyncEventListener --id="
+            + asyncEventQueueName)
+        .statusIsSuccess();
+    locator.waitUntilAsyncEventQueuesAreReadyOnExactlyThisManyServers(asyncEventQueueName, 1);
+
+    gfsh.executeAndAssertThat("create region --type=PARTITION --name=" + regionName)
+        .statusIsSuccess();
+    locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/" + regionName, 1);
+
+    // Associate the async-event-queue
+    gfsh.executeAndAssertThat(
+        "alter region --name=" + regionName + " --async-event-queue-id=" + asyncEventQueueName)
+        .statusIsSuccess().containsOutput("server-1", "OK", "Region " + regionName + " altered");
+
+    // Check the cluster configuration service.
+    locator.invoke(() -> {
+      InternalLocator internalLocator = ClusterStartupRule.getLocator();
+      assertThat(internalLocator).isNotNull();
+      CacheConfig config =
+          internalLocator.getConfigurationPersistenceService().getCacheConfig("cluster");
+
+      RegionConfig regionConfig = CacheElement.findElement(config.getRegions(), regionName);
+      assertThat(regionConfig).isNotNull();
+      assertThat(regionConfig.getRegionAttributes()).isNotNull();
+      assertThat(regionConfig.getRegionAttributes().getAsyncEventQueueIds()).isNotEmpty()
+          .isEqualTo(asyncEventQueueName);
+    });
+  }
+
+  @Test
+  public void alterNonColocatedPartitionRegionWithTheSameParallelAsynchronousEventQueueShouldThrowExceptionAndPreventTheClusterConfigurationServiceFromBeingUpdated() {
+    IgnoredException.addIgnoredException(
+        "Non colocated regions (.*) cannot have the same parallel async event queue id (.*) configured.");
+
+    String asyncEventQueue = "asyncEventQueue";
+    String region1Name = testName.getMethodName() + "1";
+    String region2Name = testName.getMethodName() + "2";
+
+    gfsh.executeAndAssertThat(
+        "create async-event-queue --parallel=true --persistent=false --listener=org.apache.geode.internal.cache.wan.MyAsyncEventListener --id="
+            + asyncEventQueue)
+        .statusIsSuccess();
+    locator.waitUntilAsyncEventQueuesAreReadyOnExactlyThisManyServers(asyncEventQueue, 1);
+
+    gfsh.executeAndAssertThat("create region --type=PARTITION --name=" + region1Name)
+        .statusIsSuccess();
+    locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/" + region1Name, 1);
+
+    gfsh.executeAndAssertThat("create region --type=PARTITION --name=" + region2Name)
+        .statusIsSuccess();
+    locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/" + region2Name, 1);
+
+    // Associate the async-event-queue to both regions (second one should fail because they are not
+    // co-located)
+    gfsh.executeAndAssertThat(
+        "alter region --name=" + region1Name + " --async-event-queue-id=" + asyncEventQueue)
+        .statusIsSuccess().containsOutput("server-1", "OK", "Region " + region1Name + " altered");
+    gfsh.executeAndAssertThat(
+        "alter region --name=" + region2Name + " --async-event-queue-id=" + asyncEventQueue)
+        .statusIsError().containsOutput("server-1", "ERROR",
+            "Non colocated regions /" + region2Name + ", /" + region1Name
+                + " cannot have the same parallel async event queue id " + asyncEventQueue
+                + " configured.");
+
+    // The exception must be thrown early in the initialization, so the change shouldn't be
+    // persisted to the cluster configuration service for the second region.
+    locator.invoke(() -> {
+      InternalLocator internalLocator = ClusterStartupRule.getLocator();
+      assertThat(internalLocator).isNotNull();
+      CacheConfig config =
+          internalLocator.getConfigurationPersistenceService().getCacheConfig("cluster");
+
+      RegionConfig regionConfig = CacheElement.findElement(config.getRegions(), region1Name);
+      assertThat(regionConfig).isNotNull();
+      assertThat(regionConfig.getRegionAttributes()).isNotNull();
+      assertThat(regionConfig.getRegionAttributes().getAsyncEventQueueIds())
+          .isEqualTo(asyncEventQueue);
+
+      RegionConfig region2Config = CacheElement.findElement(config.getRegions(), region2Name);
+      assertThat(region2Config).isNotNull();
+      assertThat(region2Config.getRegionAttributes()).isNotNull();
+      assertThat(region2Config.getRegionAttributes().getAsyncEventQueueIds()).isBlank();
+    });
+  }
+
+  @Test
+  public void alterPartitionPersistentRegionWithParallelNonPersistentAsynchronousEventQueueShouldThrowExceptionAndPreventTheClusterConfigurationServiceFromBeingUpdated() {
+    IgnoredException.addIgnoredException(
+        "Non persistent asynchronous event queue (.*) can not be attached to persistent region (.*)");
+    String regionName = testName.getMethodName();
+    String asyncEventQueueName = "asyncEventQueue";
+
+    gfsh.executeAndAssertThat(
+        "create async-event-queue --parallel=true --persistent=false --listener=org.apache.geode.internal.cache.wan.MyAsyncEventListener --id="
+            + asyncEventQueueName)
+        .statusIsSuccess();
+    locator.waitUntilAsyncEventQueuesAreReadyOnExactlyThisManyServers(asyncEventQueueName, 1);
+
+    gfsh.executeAndAssertThat("create region --type=PARTITION_PERSISTENT --name=" + regionName
+        + " --disk-store=diskStore").statusIsSuccess();
+    locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/" + regionName, 1);
+
+    // Make sure that the next invocations also fail and that the changes are not persisted to the
+    // cluster configuration service. See GEODE-6551.
+    gfsh.executeAndAssertThat(
+        "alter region --name=" + regionName + " --async-event-queue-id=" + asyncEventQueueName)
+        .statusIsError()
+        .containsOutput("server-1", "ERROR", "Non persistent asynchronous event queue "
+            + asyncEventQueueName + " can not be attached to persistent region /" + regionName);
+    gfsh.executeAndAssertThat(
+        "alter region --name=" + regionName + " --async-event-queue-id=" + asyncEventQueueName)
+        .statusIsError()
+        .containsOutput("server-1", "ERROR", "Non persistent asynchronous event queue "
+            + asyncEventQueueName + " can not be attached to persistent region /" + regionName);
+    gfsh.executeAndAssertThat(
+        "alter region --name=" + regionName + " --async-event-queue-id=" + asyncEventQueueName)
+        .statusIsError()
+        .containsOutput("server-1", "ERROR", "Non persistent asynchronous event queue "
+            + asyncEventQueueName + " can not be attached to persistent region /" + regionName);
+
+    // The exception must be thrown early in the initialization, so the change shouldn't be
+    // persisted to the cluster configuration service.
+    locator.invoke(() -> {
+      InternalLocator internalLocator = ClusterStartupRule.getLocator();
+      assertThat(internalLocator).isNotNull();
+      CacheConfig config =
+          internalLocator.getConfigurationPersistenceService().getCacheConfig("cluster");
+
+      RegionConfig regionConfig = CacheElement.findElement(config.getRegions(), regionName);
+      assertThat(regionConfig).isNotNull();
+      assertThat(regionConfig.getRegionAttributes()).isNotNull();
+      assertThat(regionConfig.getRegionAttributes().getAsyncEventQueueIds()).isBlank();
+    });
+  }
+
+  @Test
+  public void alterPartitionRegionWithParallelGatewaySenderShouldPersistTheChangesIntoTheClusterConfigurationService() {
+    IgnoredException.addIgnoredException("could not get remote locator information");
+    String regionName = testName.getMethodName();
+    String gatewaySenderName = "gatewaySender";
+
+    gfsh.executeAndAssertThat(
+        "create gateway-sender --parallel=true --enable-persistence=false --remote-distributed-system-id=2 --id="
+            + gatewaySenderName)
+        .statusIsSuccess();
+    locator.waitUntilGatewaySendersAreReadyOnExactlyThisManyServers(1);
+
+    gfsh.executeAndAssertThat("create region --type=PARTITION --name=" + regionName)
+        .statusIsSuccess();
+    locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/" + regionName, 1);
+
+    // Associate the gateway-sender
+    gfsh.executeAndAssertThat(
+        "alter region --name=" + regionName + " --gateway-sender-id=" + gatewaySenderName)
+        .statusIsSuccess().containsOutput("server-1", "OK", "Region " + regionName + " altered");
+
+    // Check the cluster configuration service.
+    locator.invoke(() -> {
+      InternalLocator internalLocator = ClusterStartupRule.getLocator();
+      assertThat(internalLocator).isNotNull();
+      CacheConfig config =
+          internalLocator.getConfigurationPersistenceService().getCacheConfig("cluster");
+
+      RegionConfig regionConfig = CacheElement.findElement(config.getRegions(), regionName);
+      assertThat(regionConfig).isNotNull();
+      assertThat(regionConfig.getRegionAttributes()).isNotNull();
+      assertThat(regionConfig.getRegionAttributes().getGatewaySenderIds()).isNotEmpty()
+          .isEqualTo(gatewaySenderName);
+    });
+  }
+
+  @Test
+  public void alterNonColocatedPartitionRegionWithTheSameParallelGatewaySenderShouldThrowExceptionAndPreventTheClusterConfigurationServiceFromBeingUpdated() {
+    IgnoredException.addIgnoredException("could not get remote locator information");
+    IgnoredException.addIgnoredException(
+        "Non colocated regions (.*) cannot have the same parallel gateway sender id (.*) configured.");
+
+    String gatewaySenderName = "gatewaySender";
+    String region1Name = testName.getMethodName() + "1";
+    String region2Name = testName.getMethodName() + "2";
+
+    gfsh.executeAndAssertThat(
+        "create gateway-sender --parallel=true --enable-persistence=false --remote-distributed-system-id=2 --id="
+            + gatewaySenderName)
+        .statusIsSuccess();
+    locator.waitUntilGatewaySendersAreReadyOnExactlyThisManyServers(1);
+
+    gfsh.executeAndAssertThat("create region --type=PARTITION --name=" + region1Name)
+        .statusIsSuccess();
+    locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/" + region1Name, 1);
+
+    gfsh.executeAndAssertThat("create region --type=PARTITION --name=" + region2Name)
+        .statusIsSuccess();
+    locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/" + region2Name, 1);
+
+    // Associate the gateway-sender to both regions (second one should fail because they are not
+    // co-located)
+    gfsh.executeAndAssertThat(
+        "alter region --name=" + region1Name + " --gateway-sender-id=" + gatewaySenderName)
+        .statusIsSuccess().containsOutput("server-1", "OK", "Region " + region1Name + " altered");
+    gfsh.executeAndAssertThat(
+        "alter region --name=" + region2Name + " --gateway-sender-id=" + gatewaySenderName)
+        .statusIsError().containsOutput("server-1", "ERROR",
+            "Non colocated regions /" + region2Name + ", /" + region1Name
+                + " cannot have the same parallel gateway sender id " + gatewaySenderName
+                + " configured.");
+
+    // The exception must be thrown early in the initialization, so the change shouldn't be
+    // persisted to the cluster configuration service for the second region.
+    locator.invoke(() -> {
+      InternalLocator internalLocator = ClusterStartupRule.getLocator();
+      assertThat(internalLocator).isNotNull();
+      CacheConfig config =
+          internalLocator.getConfigurationPersistenceService().getCacheConfig("cluster");
+
+      RegionConfig regionConfig = CacheElement.findElement(config.getRegions(), region1Name);
+      assertThat(regionConfig).isNotNull();
+      assertThat(regionConfig.getRegionAttributes()).isNotNull();
+      assertThat(regionConfig.getRegionAttributes().getGatewaySenderIds())
+          .isEqualTo(gatewaySenderName);
+
+      RegionConfig region2Config = CacheElement.findElement(config.getRegions(), region2Name);
+      assertThat(region2Config).isNotNull();
+      assertThat(region2Config.getRegionAttributes()).isNotNull();
+      assertThat(region2Config.getRegionAttributes().getGatewaySenderIds()).isBlank();
+    });
+  }
+
+  @Test
+  public void alterPartitionPersistentRegionWithParallelNonPersistentGatewaySenderShouldThrowExceptionAndPreventTheClusterConfigurationServiceFromBeingUpdated() {
+    IgnoredException.addIgnoredException("could not get remote locator information");
+    IgnoredException.addIgnoredException(
+        "Non persistent gateway sender (.*) can not be attached to persistent region (.*)");
+    String regionName = testName.getMethodName();
+    String gatewaySenderName = "gatewaySender";
+
+    gfsh.executeAndAssertThat(
+        "create gateway-sender --parallel=true --enable-persistence=false --remote-distributed-system-id=2 --id="
+            + gatewaySenderName)
+        .statusIsSuccess();
+    locator.waitUntilGatewaySendersAreReadyOnExactlyThisManyServers(1);
+
+    gfsh.executeAndAssertThat("create region --type=PARTITION_PERSISTENT --name=" + regionName
+        + " --disk-store=diskStore").statusIsSuccess();
+    locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/" + regionName, 1);
+
+    // Make sure that the next invocations also fail and that the changes are not persisted to the
+    // cluster configuration service. See GEODE-6551.
+    gfsh.executeAndAssertThat(
+        "alter region --name=" + regionName + " --gateway-sender-id=" + gatewaySenderName)
+        .statusIsError().containsOutput("server-1", "ERROR", "Non persistent gateway sender "
+            + gatewaySenderName + " can not be attached to persistent region /" + regionName);
+    gfsh.executeAndAssertThat(
+        "alter region --name=" + regionName + " --gateway-sender-id=" + gatewaySenderName)
+        .statusIsError().containsOutput("server-1", "ERROR", "Non persistent gateway sender "
+            + gatewaySenderName + " can not be attached to persistent region /" + regionName);
+    gfsh.executeAndAssertThat(
+        "alter region --name=" + regionName + " --gateway-sender-id=" + gatewaySenderName)
+        .statusIsError().containsOutput("server-1", "ERROR", "Non persistent gateway sender "
+            + gatewaySenderName + " can not be attached to persistent region /" + regionName);
+
+    // The exception must be thrown early in the initialization, so the change shouldn't be
+    // persisted to the cluster configuration service.
+    locator.invoke(() -> {
+      InternalLocator internalLocator = ClusterStartupRule.getLocator();
+      assertThat(internalLocator).isNotNull();
+      CacheConfig config =
+          internalLocator.getConfigurationPersistenceService().getCacheConfig("cluster");
+
+      RegionConfig regionConfig = CacheElement.findElement(config.getRegions(), regionName);
+      assertThat(regionConfig).isNotNull();
+      assertThat(regionConfig.getRegionAttributes()).isNotNull();
+      assertThat(regionConfig.getRegionAttributes().getGatewaySenderIds()).isBlank();
+    });
+  }
+}


Mime
View raw message