geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [11/50] [abbrv] incubator-geode git commit: GEODE-865: CI Failure: ConcurrentParallelGatewaySenderOperation_2_DUnitTest.testParallelGatewaySenders_SingleNode_UserPR_localDestroy_RecreateRegion
Date Mon, 01 Feb 2016 20:55:28 GMT
GEODE-865: CI Failure: ConcurrentParallelGatewaySenderOperation_2_DUnitTest.testParallelGatewaySenders_SingleNode_UserPR_localDestroy_RecreateRegion


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/68a85fed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/68a85fed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/68a85fed

Branch: refs/heads/feature/GEODE-773-2
Commit: 68a85fed991d7ebd9519c0f77b5b5b564b965a56
Parents: 09bd530
Author: Barry Oglesby <boglesby@pivotal.io>
Authored: Tue Jan 26 13:43:21 2016 -0800
Committer: Barry Oglesby <boglesby@pivotal.io>
Committed: Wed Jan 27 18:02:19 2016 -0800

----------------------------------------------------------------------
 .../gemfire/internal/cache/wan/WANTestBase.java |  14 +-
 ...allelGatewaySenderOperation_2_DUnitTest.java | 841 +++++--------------
 2 files changed, 234 insertions(+), 621 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/68a85fed/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
index 0c32d1f..3dd4742 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
@@ -2395,7 +2395,7 @@ public class WANTestBase extends DistributedTestCase{
   public static void createConcurrentSender(String dsName, int remoteDsId,
       boolean isParallel, Integer maxMemory, Integer batchSize,
       boolean isConflation, boolean isPersistent, GatewayEventFilter filter,
-      boolean isManulaStart, int concurrencyLevel, OrderPolicy policy) {
+      boolean isManualStart, int concurrencyLevel, OrderPolicy policy) {
 
     File persistentDirectory = new File(dsName + "_disk_"
         + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
@@ -2408,7 +2408,7 @@ public class WANTestBase extends DistributedTestCase{
       gateway.setParallel(true);
       gateway.setMaximumQueueMemory(maxMemory);
       gateway.setBatchSize(batchSize);
-      gateway.setManualStart(isManulaStart);
+      gateway.setManualStart(isManualStart);
       ((InternalGatewaySenderFactory) gateway)
           .setLocatorDiscoveryCallback(new MyLocatorCallback());
       if (filter != null) {
@@ -2430,7 +2430,7 @@ public class WANTestBase extends DistributedTestCase{
       GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
       gateway.setMaximumQueueMemory(maxMemory);
       gateway.setBatchSize(batchSize);
-      gateway.setManualStart(isManulaStart);
+      gateway.setManualStart(isManualStart);
       ((InternalGatewaySenderFactory) gateway)
           .setLocatorDiscoveryCallback(new MyLocatorCallback());
       if (filter != null) {
@@ -4825,7 +4825,13 @@ public class WANTestBase extends DistributedTestCase{
       }
     }
   }
-  
+
+  protected Integer[] createLNAndNYLocators() {
+    Integer lnPort = (Integer) vm0.invoke(() -> createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+    return new Integer[] { lnPort, nyPort };
+  }
+
   public static class MyLocatorCallback extends
       LocatorDiscoveryCallbackAdapter {
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/68a85fed/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
index f770062..694fc1f 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
@@ -23,6 +23,8 @@ import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
 import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
 import com.gemstone.gemfire.test.dunit.AsyncInvocation;
 import com.gemstone.gemfire.test.dunit.DistributedTestCase;
+import com.gemstone.gemfire.test.dunit.VM;
+
 /**
  * @author skumar
  *
@@ -46,239 +48,75 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
   // to test that when userPR is locally destroyed, shadow Pr is also locally
   // destroyed and on recreation usrePr , shadow Pr is also recreated.
   public void testParallelGatewaySender_SingleNode_UserPR_localDestroy_RecreateRegion() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+    Integer[] locatorPorts = createLNAndNYLocators();
+    Integer lnPort = locatorPorts[0];
+    Integer nyPort = locatorPorts[1];
 
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-    
     try {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
-          new Object[] { lnPort });
-
-      vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY });
-
-      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
-      vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
-      vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln", 1, 10, isOffHeap() });
-
-      getLogWriter().info("Created PRs on local site");
-
-      vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", null, 1, 10, isOffHeap() });
-
-      vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
-          10 });
-
-      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 10 });
+      createAndStartSender(vm4, lnPort, 5, false, true);
 
-      // since sender is paused, no dispatching
-      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 0 });
+      createReceiverAndDoPutsInPausedSender(nyPort);
 
-      vm4.invoke(WANTestBase.class, "localDestroyRegion",
-          new Object[] { testName + "_PR" });
+      vm4.invoke(() -> localDestroyRegion(testName + "_PR"));
 
-      // since shodowPR is locally destroyed, so no data to dispatch
-      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 0 });
-
-      vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-
-      vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln", 1, 10, isOffHeap() });
-
-      vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] {
-          testName + "_PR", 10, 20 });
-
-      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 10 });
-
-      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 10 });
+      recreatePRDoPutsAndValidateRegionSizes(0, true);
     } finally {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
+      vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
     }
   }
   
   public void testParallelGatewaySender_SingleNode_UserPR_Destroy_RecreateRegion() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+    Integer[] locatorPorts = createLNAndNYLocators();
+    Integer lnPort = locatorPorts[0];
+    Integer nyPort = locatorPorts[1];
 
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
     try {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
-          new Object[] { lnPort });
+      createAndStartSender(vm4, lnPort, 4, false, true);
 
-      vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln", 2, true, 100, 10, false, false, null, false, 4, OrderPolicy.KEY });
+      createReceiverAndDoPutsInPausedSender(nyPort);
 
-      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+      vm4.invoke(() -> resumeSender("ln"));
 
-      vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+      vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
 
-      vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln", 1, 10, isOffHeap() });
-
-      getLogWriter().info("Created PRs on local site");
+      vm4.invoke(() -> localDestroyRegion(testName + "_PR"));
 
-      vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", null, 1, 10, isOffHeap() });
-
-      vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
-          10 });
-
-      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 10 });
-
-      // since resume is paused, no dispatching
-      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 0 });
-
-      vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+      recreatePRDoPutsAndValidateRegionSizes(10, false);
+    } finally {
+      vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
+    }
+  }
 
-      vm4.invoke(WANTestBase.class, "destroyRegion", new Object[] { testName
-          + "_PR" });
+  public void testParallelGatewaySender_SingleNode_UserPR_Close_RecreateRegion() throws Exception {
+    Integer[] locatorPorts = createLNAndNYLocators();
+    Integer lnPort = locatorPorts[0];
+    Integer nyPort = locatorPorts[1];
 
-      // before destoy, there is wait for queue to drain, so data will be
-      // dispatched
-      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 10 });
+    createAndStartSender(vm4, lnPort, 7, false, true);
 
-      vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln", 1, 10, isOffHeap() });
+    createReceiverAndDoPutsInPausedSender(nyPort);
 
-      vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] {
-          testName + "_PR", 10, 20 });
+    vm4.invoke(() -> closeRegion(testName + "_PR"));
 
-      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 10 });
+    vm4.invoke(() -> resumeSender("ln"));
 
-      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 20 });
-    } finally {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-        "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
-    }
-  }
-  
-  
-  public void testParallelGatewaySender_SingleNode_UserPR_Close_RecreateRegion() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+    pause(500); //paused if there is any element which is received on remote site
 
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    recreatePRDoPutsAndValidateRegionSizes(0, false);
 
-    vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME", new Object[] { lnPort });
-    
-    getLogWriter().info("Created cache on local site");
-    
-    getLogWriter().info("Created senders on local site");
-    
-    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
-      true, 100, 10, false, false, null, false, 7, OrderPolicy.KEY });
-  
-    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    
-    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    
-    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", "ln", 1, 10, isOffHeap()});
-    
-    getLogWriter().info("Created PRs on local site");
-    
-    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName + "_PR", null, 1, 10, isOffHeap() });
-   
-    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 10 });
-    
-    vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-      testName + "_PR", 10 });
-    
-    //since resume is paused, no dispatching
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-      testName + "_PR", 0 });
-    
-    vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class, "closeRegion", new Object[] { testName + "_PR" });
-    
-    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    
-    pause(500); //paused if there is any element which is received on remote site
-    
-    //before close, there is wait for queue to drain, so data will be dispatched
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-      testName + "_PR", 0 });
-    
-    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", "ln", 1, 10, isOffHeap() });
-    
-    vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] { testName + "_PR", 10, 20 });
-    
-    vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-      testName + "_PR", 10 });
-    
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-      testName + "_PR", 10 });
-    
-    vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-        "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
+    vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
   }
   
   //to test that while localDestroy is in progress, put operation does not successed
   public void testParallelGatewaySender_SingleNode_UserPR_localDestroy_SimultenuousPut_RecreateRegion() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    Integer[] locatorPorts = createLNAndNYLocators();
+    Integer lnPort = locatorPorts[0];
+    Integer nyPort = locatorPorts[1];
 
     try {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
-          new Object[] { lnPort });
-
-      getLogWriter().info("Created cache on local site");
+      createAndStartSender(vm4, lnPort, 5, false, true);
 
-      getLogWriter().info("Created senders on local site");
-
-      vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY });
-
-      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
-      vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
-      vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln", 1, 10, isOffHeap() });
-
-      getLogWriter().info("Created PRs on local site");
-
-      vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", null, 1, 10, isOffHeap() });
-
-      vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
-          10 });
-
-      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 10 });
-
-      // since resume is paused, no dispatching
-      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 0 });
+      createReceiverAndDoPutsInPausedSender(nyPort);
 
       AsyncInvocation putAsync = vm4.invokeAsync(WANTestBase.class,
           "doPutsFrom", new Object[] { testName + "_PR", 100, 2000 });
@@ -302,73 +140,30 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
             putAsync.getException());
       }
 
-      vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+      vm4.invoke(() -> resumeSender("ln"));
 
       pause(500); // paused if there is any element which is received on remote
                   // site
 
-      // since shodowPR is locally destroyed, so no data to dispatch
-      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 0 });
-
-      vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln", 1, 10, isOffHeap() });
-
-      vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] {
-          testName + "_PR", 10, 20 });
-
-      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 10 });
-
-      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 10 });
+      recreatePRDoPutsAndValidateRegionSizes(0, false);
     } finally {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
+      vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
     }
   }
   
   public void testParallelGatewaySender_SingleNode_UserPR_Destroy_SimultenuousPut_RecreateRegion() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    Integer[] locatorPorts = createLNAndNYLocators();
+    Integer lnPort = locatorPorts[0];
+    Integer nyPort = locatorPorts[1];
 
     try {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
-          new Object[] { lnPort });
-
-      vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln", 2, true, 100, 10, false, false, null, false, 6, OrderPolicy.KEY });
-
-      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
-      vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
-      vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln", 1, 10, isOffHeap() });
-      vm4.invoke(WANTestBase.class, "addCacheListenerAndDestroyRegion", new Object[] {
-        testName + "_PR"});
-      
-      getLogWriter().info("Created PRs on local site");
-
-      vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", null, 1, 10, isOffHeap() });
-
-      vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
-          10 });
+      createAndStartSender(vm4, lnPort, 6, false, true);
 
-      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 10 });
+      vm4.invoke(() -> addCacheListenerAndDestroyRegion(testName + "_PR"));
 
-      // since resume is paused, no dispatching
-      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 0 });
+      createReceiverAndDoPutsInPausedSender(nyPort);
 
-      vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+      vm4.invoke(() -> resumeSender("ln"));
 
       AsyncInvocation putAsync = vm4.invokeAsync(WANTestBase.class,
           "doPutsFrom", new Object[] { testName + "_PR", 10, 101 });
@@ -387,27 +182,18 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
 
       // before destroy, there is wait for queue to drain, so data will be
       // dispatched
-      vm2.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "validateRegionSizeWithinRange", new Object[] { testName + "_PR", 10,
-              101 }); // possible size is more than 10
+      vm2.invoke(() -> validateRegionSizeWithinRange(testName + "_PR", 10, 101)); // possible size is more than 10
 
-      vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln", 1, 10, isOffHeap() });
+      vm4.invoke(() -> createPartitionedRegion(testName + "_PR", "ln", 1, 10, isOffHeap()));
 
-      vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] {
-          testName + "_PR", 10, 20 });
+      vm4.invoke(() -> doPutsFrom(testName + "_PR", 10, 20));
 
-      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 10 });
+      vm4.invoke(() -> validateRegionSize(testName + "_PR", 10));
 
-      vm2.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "validateRegionSizeWithinRange", new Object[] { testName + "_PR", 20,
-              101 });// possible size is more than 20
+      vm2.invoke(() -> validateRegionSizeWithinRange(testName + "_PR", 20, 101)); // possible size is more than 20
     } finally {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
+      vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
     }
-    
   }
   
   public void testParallelGatewaySender_SingleNode_UserPR_Destroy_NodeDown()
@@ -415,60 +201,16 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
     addExpectedException("Broken pipe");
     addExpectedException("Connection reset");
     addExpectedException("Unexpected IOException");
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    Integer[] locatorPorts = createLNAndNYLocators();
+    Integer lnPort = locatorPorts[0];
+    Integer nyPort = locatorPorts[1];
 
     try {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
-          new Object[] { lnPort });
-      vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
-          new Object[] { lnPort });
-      vm6.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
-          new Object[] { lnPort });
-
-      vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY });
-      vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY });
-      vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY });
-
-      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-      vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-      vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
-      vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-      vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-      vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
-      vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln", 1, 10, isOffHeap() });
-      vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln", 1, 10, isOffHeap() });
-      vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln", 1, 10, isOffHeap() });
-
-      getLogWriter().info("Created PRs on local site");
-
-      vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", null, 1, 10, isOffHeap() });
-
-      vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
-          10000 });
+      createAndStartSender(vm4, lnPort, 5, false, true);
+      createAndStartSender(vm5, lnPort, 5, false, true);
+      createAndStartSender(vm6, lnPort, 5, false, true);
 
-      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 10000 });
-
-      // since resume is paused, no dispatching
-      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 0 });
+      createReceiverAndDoPutsInPausedSender(nyPort);
 
       vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
       vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
@@ -488,60 +230,24 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
         fail("Interrupted the async invocation.");
       }
 
-      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 10000 });// possible size is more than 20
+      vm2.invoke(() -> validateRegionSize(testName + "_PR", 10));
     } finally {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
-      vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
-      vm6.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
+      vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
+      vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
+      vm6.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
     }
 
   }
   
   public void testParallelGatewaySender_SingleNode_UserPR_Close_SimultenuousPut_RecreateRegion() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    Integer[] locatorPorts = createLNAndNYLocators();
+    Integer lnPort = locatorPorts[0];
+    Integer nyPort = locatorPorts[1];
 
     try {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
-          new Object[] { lnPort });
-
-      getLogWriter().info("Created cache on local site");
-
-      getLogWriter().info("Created senders on local site");
+      createAndStartSender(vm4, lnPort, 5, false, true);
 
-      vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY });
-
-      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
-      vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
-      vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln", 1, 10, isOffHeap() });
-
-      getLogWriter().info("Created PRs on local site");
-
-      vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", null, 1, 10, isOffHeap() });
-
-      vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
-          10 });
-
-      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 10 });
-
-      // since resume is paused, no dispatching
-      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 0 });
+      createReceiverAndDoPutsInPausedSender(nyPort);
 
       AsyncInvocation putAsync = vm4.invokeAsync(WANTestBase.class,
           "doPutsFrom", new Object[] { testName + "_PR", 10, 2000 });
@@ -556,140 +262,108 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
         fail("Interrupted the async invocation.");
       }
 
-      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 0 });
-
-      vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-
-      vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln", 1, 10, isOffHeap() });
-
-      vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] {
-          testName + "_PR", 10, 20 });
+      recreatePRDoPutsAndValidateRegionSizes(0, true);
+    } finally {
+      vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
+    }
+  }
 
-      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 10 });
+  private void createReceiverAndDoPutsInPausedSender(int port) {
+    // Note: This is a test-specific method used by several tests to do puts from vm4 to vm2.
+    String regionName = testName + "_PR";
+    vm2.invoke(() -> createReceiver(port));
+    vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
+    vm4.invoke(() -> doPuts(regionName, 10));
+    vm4.invoke(() -> validateRegionSize(regionName, 10));
+    // since sender is paused, no dispatching
+    vm2.invoke(() -> validateRegionSize(regionName, 0));
+  }
 
-      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 10 });
-    } finally {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
+  private void recreatePRDoPutsAndValidateRegionSizes(int expectedRegionSize, boolean resumeSender) {
+    // Note: This is a test-specific method used by several test to recreate a partitioned region,
+    // do puts and validate region sizes in vm2 and vm4.
+    // since shadowPR is locally destroyed, so no data to dispatch
+    String regionName = testName + "_PR";
+    vm2.invoke(() -> validateRegionSize(regionName, expectedRegionSize));
+    if (resumeSender) {
+      vm4.invoke(() -> resumeSender("ln"));
     }
+    vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap()));
+    vm4.invoke(() -> doPutsFrom(regionName, 10, 20));
+    validateRegionSizes(regionName, 10, vm4, vm2);
   }
-  
+
   public void testParallelGatewaySenders_SingleNode_UserPR_localDestroy_RecreateRegion() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
-    Integer tkPort = (Integer)vm2.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 3, lnPort });
-    Integer pnPort = (Integer)vm3.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 4, lnPort });
-    
-    vm4.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-    vm5.invoke(WANTestBase.class, "createReceiver", new Object[] { tkPort });
-    vm6.invoke(WANTestBase.class, "createReceiver", new Object[] { pnPort });
+    Integer[] locatorPorts = createLNAndNYLocators();
+    Integer lnPort = locatorPorts[0];
+    Integer nyPort = locatorPorts[1];
+    Integer tkPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator(3, lnPort));
+    Integer pnPort = (Integer)vm3.invoke(() -> createFirstRemoteLocator(4, lnPort));
+
+    vm4.invoke(() -> createReceiver(nyPort));
+    vm5.invoke(() -> createReceiver(tkPort));
+    vm6.invoke(() -> createReceiver(pnPort));
 
     try {
-      vm7.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
-          new Object[] { lnPort });
+      vm7.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(lnPort));
 
       getLogWriter().info("Created cache on local site");
 
-      vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln1", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
+      vm7.invoke(() -> createConcurrentSender("ln1", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY));
+      vm7.invoke(() -> createConcurrentSender("ln2", 3, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY));
+      vm7.invoke(() -> createConcurrentSender("ln3", 4, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY));
 
-      vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln2", 3, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
+      vm7.invoke(() -> startSender("ln1"));
+      vm7.invoke(() -> startSender("ln2"));
+      vm7.invoke(() -> startSender("ln3"));
 
-      vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln3", 4, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
+      String regionName = testName + "_PR";
+      vm7.invoke(() -> createPartitionedRegion(regionName, "ln1,ln2,ln3", 1, 10, isOffHeap()));
 
-      vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" });
-      vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln2" });
-      vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln3" });
+      getLogWriter().info("Created PRs on local site");
 
-      vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln1,ln2,ln3", 1, 10, isOffHeap() });
+      vm4.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
+      vm5.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
+      vm6.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
 
-      getLogWriter().info("Created PRs on local site");
+      vm7.invoke(() -> doPuts(regionName, 10));
 
-      vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", null, 1, 10, isOffHeap() });
-      vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", null, 1, 10, isOffHeap() });
-      vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", null, 1, 10, isOffHeap() });
+      vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln1"));
+      vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln2"));
+      vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln3"));
 
-      vm7.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
-          10 });
-      pause(1000);
-      vm7.invoke(WANTestBase.class, "localDestroyRegion",
-          new Object[] { testName + "_PR" });
+      vm7.invoke(() -> localDestroyRegion(regionName));
 
-      vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln1,ln2,ln3", 1, 10, isOffHeap() });
+      vm7.invoke(() -> createPartitionedRegion(regionName, "ln1,ln2,ln3", 1, 10, isOffHeap()));
 
-      vm7.invoke(WANTestBase.class, "doPutsFrom", new Object[] {
-          testName + "_PR", 10, 20 });
+      vm7.invoke(() -> doPutsFrom(regionName, 10, 20));
 
-      vm7.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 10 });
+      vm7.invoke(() -> validateRegionSize(regionName, 10));
 
-      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 20 });
-      vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 20 });
-      vm6.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 20 });
+      validateRegionSizes(regionName, 20, vm4, vm5, vm6);
     } finally {
-      vm7.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
+      vm7.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
     }
   }
   
   public void testParallelGatewaySender_MultipleNode_UserPR_localDestroy_Recreate() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+    Integer[] locatorPorts = createLNAndNYLocators();
+    Integer lnPort = locatorPorts[0];
+    Integer nyPort = locatorPorts[1];
 
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm2.invoke(() -> createReceiver(nyPort));
 
     try {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
-          new Object[] { lnPort });
-      vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
-          new Object[] { lnPort });
-
-      vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln", 1, 100, isOffHeap() });
-      vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln", 1, 100, isOffHeap() });
-
-      vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
-      vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
-
-      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-      vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
-      getLogWriter().info("Created PRs on local site");
+      createAndStartSender(vm4, lnPort, 5, true, false);
+      createAndStartSender(vm5, lnPort, 5, true, false);
 
-      vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", null, 1, 100, isOffHeap() });
+      String regionName = testName + "_PR";
+      vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
 
       AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, "doPuts",
-          new Object[] { testName + "_PR", 1000 });
+          new Object[] { regionName, 10 });
       pause(1000);
-      vm5.invoke(WANTestBase.class, "localDestroyRegion",
-          new Object[] { testName + "_PR" });
+      vm5.invoke(() -> localDestroyRegion(regionName));
 
       try {
         inv1.join();
@@ -698,79 +372,45 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
         fail("Interrupted the async invocation.");
       }
 
-      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 1000 });
-
-      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 1000 });
 
-      vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln", 1, 100, isOffHeap() });
+      validateRegionSizes(regionName, 10, vm4, vm2);
 
-      vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] {
-          testName + "_PR", 1000, 2000 });
+      vm5.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap()));
 
-      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 2000 });
+      vm4.invoke(() -> doPutsFrom(regionName, 10, 20));
 
-      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 2000 });
+      validateRegionSizes(regionName, 20, vm4, vm2);
     } finally {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
-      vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
+      vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
+      vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
     }
   }
   
   public void testParallelGatewaySenders_MultiplNode_UserPR_localDestroy_Recreate() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
-    Integer tkPort = (Integer)vm2.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 3, lnPort });
-    
-    vm6.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-    vm7.invoke(WANTestBase.class, "createReceiver", new Object[] { tkPort });
-    
+    Integer[] locatorPorts = createLNAndNYLocators();
+    Integer lnPort = locatorPorts[0];
+    Integer nyPort = locatorPorts[1];
+    Integer tkPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator(3, lnPort));
+
+    vm6.invoke(() -> createReceiver(nyPort));
+    vm7.invoke(() -> createReceiver(tkPort));
+
     try {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
-          new Object[] { lnPort });
-      vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
-          new Object[] { lnPort });
-
-      vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln1,ln2", 1, 100, isOffHeap() });
-      vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln1,ln2", 1, 100, isOffHeap() });
-
-      vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln1", 2, true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY });
-      vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln2", 3, true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY });
-      vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln1", 2, true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY });
-      vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln2", 3, true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY });
-
-      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" });
-      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln2" });
-      vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" });
-      vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln2" });
+      createAndStartTwoSenders(vm4, lnPort);
+      createAndStartTwoSenders(vm5, lnPort);
 
+      String regionName = testName + "_PR";
       vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", null, 1, 100, isOffHeap() });
+          regionName, null, 1, 100, isOffHeap() });
       vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", null, 1, 100, isOffHeap() });
+          regionName, null, 1, 100, isOffHeap() });
 
       AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, "doPuts",
-          new Object[] { testName + "_PR", 1000 });
+          new Object[] { regionName, 10 });
+
       pause(1000);
       vm5.invoke(WANTestBase.class, "localDestroyRegion",
-          new Object[] { testName + "_PR" });
+          new Object[] { regionName });
 
       try {
         inv1.join();
@@ -779,79 +419,52 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
         fail("Interrupted the async invocation.");
       }
 
-      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 1000 });
-
-      vm6.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 1000 });
-      vm7.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 1000 });
+      validateRegionSizes(regionName, 10, vm4, vm6, vm7);
 
       vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-          testName + "_PR", "ln1,ln2", 1, 100, isOffHeap() });
+          regionName, "ln1,ln2", 1, 100, isOffHeap() });
 
       vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] {
-          testName + "_PR", 1000, 2000 });
+          regionName, 10, 20 });
 
-      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 2000 });
-
-      vm6.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 2000 });
-      vm7.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          testName + "_PR", 2000 });
+      validateRegionSizes(regionName, 20, vm4, vm6, vm7);
     } finally {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
-      vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
+      vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
+      vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
     }
   }
-  
-  public void testParallelGatewaySender_ColocatedPartitionedRegions_localDestroy() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
 
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+  private void createAndStartTwoSenders(VM vm, int port) {
+    // Note: This is a test-specific method used to create and start 2 senders.
+    vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port));
+    vm.invoke(() -> createPartitionedRegion(testName + "_PR", "ln1,ln2", 1, 100, isOffHeap()));
+    vm.invoke(() -> createConcurrentSender("ln1", 2, true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY));
+    vm.invoke(() -> createConcurrentSender("ln2", 3, true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY));
+    vm.invoke(() -> startSender("ln1"));
+    vm.invoke(() -> startSender("ln2"));
+  }
 
-    try {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
-          new Object[] { lnPort });
-      vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
-          new Object[] { lnPort });
-
-      vm4.invoke(WANTestBase.class,
-          "createCustomerOrderShipmentPartitionedRegion", new Object[] { null,
-              "ln", 1, 100, isOffHeap() });
-      vm5.invoke(WANTestBase.class,
-          "createCustomerOrderShipmentPartitionedRegion", new Object[] { null,
-              "ln", 1, 100, isOffHeap() });
+  public void testParallelGatewaySender_ColocatedPartitionedRegions_localDestroy() throws Exception {
+    Integer[] locatorPorts = createLNAndNYLocators();
+    Integer lnPort = locatorPorts[0];
+    Integer nyPort = locatorPorts[1];
 
-      vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
-      vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
+    vm2.invoke(() -> createReceiver(nyPort));
 
-      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-      vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    try {
+      createAndStartSenderWithCustomerOrderShipmentRegion(vm4, lnPort, 5, true);
+      createAndStartSenderWithCustomerOrderShipmentRegion(vm5, lnPort, 5, true);
 
       getLogWriter().info("Created PRs on local site");
 
-      vm2.invoke(WANTestBase.class,
-          "createCustomerOrderShipmentPartitionedRegion", new Object[] { null,
-              null, 1, 100, isOffHeap() });
+      vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, null, 1, 100, isOffHeap()));
 
       AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class,
-          "putcolocatedPartitionedRegion", new Object[] { 2000 });
+          "putcolocatedPartitionedRegion", new Object[] { 10 });
       pause(1000);
 
       try {
-        vm5.invoke(WANTestBase.class, "localDestroyRegion",
-            new Object[] { customerRegionName });
+        vm5.invoke(() -> localDestroyRegion(customerRegionName));
       } catch (Exception ex) {
         assertTrue(ex.getCause() instanceof UnsupportedOperationException);
       }
@@ -862,51 +475,29 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
         fail("Unexpected exception", e);
       }
 
-      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          customerRegionName, 2000 });
-      vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          customerRegionName, 2000 });
-      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-          customerRegionName, 2000 });
+      validateRegionSizes(customerRegionName, 10, vm4, vm5, vm2);
     } finally {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
-      vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
+      vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
+      vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
     }
-    
   }
-  
+
+  private void validateRegionSizes(String regionName, int expectedRegionSize, VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(() -> validateRegionSize(regionName, expectedRegionSize));
+    }
+  }
+
   public void testParallelGatewaySender_ColocatedPartitionedRegions_destroy() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+    Integer[] locatorPorts = createLNAndNYLocators();
+    Integer lnPort = locatorPorts[0];
+    Integer nyPort = locatorPorts[1];
 
     vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
 
     try {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
-          new Object[] { lnPort });
-      vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
-          new Object[] { lnPort });
-
-      vm4.invoke(WANTestBase.class,
-          "createCustomerOrderShipmentPartitionedRegion", new Object[] { null,
-              "ln", 1, 100, isOffHeap() });
-      vm5.invoke(WANTestBase.class,
-          "createCustomerOrderShipmentPartitionedRegion", new Object[] { null,
-              "ln", 1, 100, isOffHeap() });
-
-      vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln", 2, true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY });
-      vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] {
-          "ln", 2, true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY });
-
-      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-      vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+      createAndStartSenderWithCustomerOrderShipmentRegion(vm4, lnPort, 6, true);
+      createAndStartSenderWithCustomerOrderShipmentRegion(vm5, lnPort, 6, true);
 
       getLogWriter().info("Created PRs on local site");
 
@@ -927,13 +518,29 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
       }
       fail("Excpeted UnsupportedOperationException");
     } finally {
-      vm4.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
-      vm5.invoke(ConcurrentParallelGatewaySenderOperation_2_DUnitTest.class,
-          "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
+      vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
+      vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
     }
   }
-  
+
+  private void createAndStartSenderWithCustomerOrderShipmentRegion(VM vm, int port, int concurrencyLevel, boolean manualStart) {
+    vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port));
+    vm.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, "ln", 1, 100, isOffHeap()));
+    vm.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, manualStart, concurrencyLevel, OrderPolicy.KEY));
+    vm.invoke(() -> startSender("ln"));
+  }
+
+  private void createAndStartSender(VM vm, int port, int concurrencyLevel, boolean manualStart, boolean pause) {
+    vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port));
+    vm.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, manualStart, concurrencyLevel, OrderPolicy.KEY));
+    vm.invoke(() -> startSender("ln"));
+    if (pause) {
+      vm.invoke(() -> pauseSender("ln"));
+    }
+    vm.invoke(() -> createPartitionedRegion(testName + "_PR", "ln", 1, 10, isOffHeap()));
+    getLogWriter().info("Created PRs on local site");
+  }
+
   public static void createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(
       Integer locPort) {
     createCache(false, locPort);


Mime
View raw message