geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jasonhu...@apache.org
Subject [3/7] incubator-geode git commit: GEODE-1062: Refactor of WANTestBase
Date Fri, 01 Apr 2016 17:32:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
index fd1f0ee..8139dca 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
@@ -86,13 +86,10 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
 
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, true, null, true ));
@@ -112,10 +109,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
         getTestMethodName(), "ln", 1, 100, isOffHeap() ));
 
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
     vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
         getTestMethodName(), null, 1, 100, isOffHeap() ));
@@ -131,15 +125,6 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
 
   }
 
-  protected SerializableCallableIF<Integer> createReceiverRunnable(
-      Integer nyPort) {
-    return () -> WANTestBase.createReceiver( nyPort );
-  }
-
-  protected SerializableRunnableIF createCacheRunnable(Integer lnPort) {
-    return () -> WANTestBase.createCache( lnPort );
-  }
-
   /**
    * Enable persistence for the GatewaySender but not the region
    */
@@ -147,15 +132,11 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
     
     LogWriterUtils.getLogWriter().info("Created remote receivers");
-    
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
     
     LogWriterUtils.getLogWriter().info("Created local site cache");
 
@@ -176,13 +157,9 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     vm7.invoke(createPartitionedRegionRunnable());
 
     LogWriterUtils.getLogWriter().info("Created local site persistent PR");
-    
-    vm4.invoke(startSenderRunnable());
-    LogWriterUtils.getLogWriter().info("Started sender on vm4");
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
-    
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
     LogWriterUtils.getLogWriter().info("Started the senders");
 
     vm2.invoke(() -> WANTestBase.createPartitionedRegion(
@@ -203,11 +180,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
         getTestMethodName(), "ln", 1, 100, isOffHeap() );
   }
 
-  protected SerializableRunnableIF startSenderRunnable() {
-    return () -> WANTestBase.startSender( "ln" );
-  }
-  
-  
+
+
   
   /**
    * Enable persistence for GatewaySender.
@@ -223,14 +197,11 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on remote site
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
 
     //create cache in local site
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     //create senders with disk store
     String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -253,10 +224,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     vm7.invoke(createPartitionedRegionRunnable());
 
     //start the senders on local site
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
     
     //wait for senders to become running
     vm4.invoke(waitForSenderRunnable());
@@ -284,11 +252,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     LogWriterUtils.getLogWriter().info("Killed all the senders.");
     
     //restart the vm
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
-    
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
     LogWriterUtils.getLogWriter().info("Created back the cache");
     
    //create senders with disk store
@@ -318,10 +283,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     //start the senders in async mode. This will ensure that the 
     //node of shadow PR that went down last will come up first
-    vm4.invokeAsync(startSenderRunnable());
-    vm5.invokeAsync(startSenderRunnable());
-    vm6.invokeAsync(startSenderRunnable());
-    vm7.invokeAsync(startSenderRunnable());
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
     
     LogWriterUtils.getLogWriter().info("Waiting for senders running.");
     //wait for senders running
@@ -366,14 +328,11 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on remote site
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
 
     //create cache in local site
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     //create senders with disk store
     String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -400,11 +359,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName(), "ln", 1, 100, isOffHeap() ));
 
     //start the senders on local site
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
-    
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
     //wait for senders to become running
     vm4.invoke(waitForSenderRunnable());
     vm5.invoke(waitForSenderRunnable());
@@ -431,11 +387,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     LogWriterUtils.getLogWriter().info("Killed all the senders.");
     
     //restart the vm
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
-    
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
     LogWriterUtils.getLogWriter().info("Created back the cache");
     
    //create senders with disk store
@@ -469,11 +422,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     //start the senders in async mode. This will ensure that the 
     //node of shadow PR that went down last will come up first
-    vm4.invokeAsync(startSenderRunnable());
-    vm5.invokeAsync(startSenderRunnable());
-    vm6.invokeAsync(startSenderRunnable());
-    vm7.invokeAsync(startSenderRunnable());
-    
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
     LogWriterUtils.getLogWriter().info("Waiting for senders running.");
     //wait for senders running
     vm4.invoke(waitForSenderRunnable());
@@ -505,10 +455,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create cache in local site
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     //create senders with disk store
     String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, false ));
@@ -529,10 +476,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName(), "ln", 1, 100, isOffHeap() ));
 
     //start the senders on local site
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
     
     //wait for senders to become running
     vm4.invoke(waitForSenderRunnable());
@@ -560,11 +504,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     LogWriterUtils.getLogWriter().info("Killed all the senders.");
     
     //restart the vm
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
-    
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
     LogWriterUtils.getLogWriter().info("Created back the cache");
     
   //create senders with disk store
@@ -596,18 +537,10 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     }
     
     LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
-    
-    vm4.invoke(() -> WANTestBase.unsetRemoveFromQueueOnException( "ln" ));
-    vm5.invoke(() -> WANTestBase.unsetRemoveFromQueueOnException( "ln" ));
-    vm6.invoke(() -> WANTestBase.unsetRemoveFromQueueOnException( "ln" ));
-    vm7.invoke(() -> WANTestBase.unsetRemoveFromQueueOnException( "ln" ));
 
     //start the senders in async mode. This will ensure that the 
     //node of shadow PR that went down last will come up first
-    vm4.invokeAsync(startSenderRunnable());
-    vm5.invokeAsync(startSenderRunnable());
-    vm6.invokeAsync(startSenderRunnable());
-    vm7.invokeAsync(startSenderRunnable());
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
     
     LogWriterUtils.getLogWriter().info("Waiting for senders running.");
     //wait for senders running
@@ -619,8 +552,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
 
     LogWriterUtils.getLogWriter().info("Creating the receiver.");
     //create receiver on remote site
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
+    createCacheInVMs(nyPort, vm2, vm3);
     //create PR on remote site
     
     LogWriterUtils.getLogWriter().info("Creating the partitioned region at receiver. ");
@@ -628,6 +560,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName(), null, 1, 100, isOffHeap() ));
     vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
       getTestMethodName(), null, 1, 100, isOffHeap() ));
+    createReceiverInVMs(nyPort, vm2, vm3);
+
     vm4.invoke(pauseSenderRunnable());
     vm5.invoke(pauseSenderRunnable());
     vm6.invoke(pauseSenderRunnable());
@@ -665,14 +599,10 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on remote site
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
 
-    //create cache in local site
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     //create senders with disk store
     String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -699,11 +629,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName(), "ln", 1, 100, isOffHeap() ));
 
     //start the senders on local site
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
-    
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
     //wait for senders to become running
     vm4.invoke(waitForSenderRunnable());
     vm5.invoke(waitForSenderRunnable());
@@ -730,11 +657,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     LogWriterUtils.getLogWriter().info("Killed all the senders.");
     
     //restart the vm
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
-    
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
     LogWriterUtils.getLogWriter().info("Created back the cache");
     
    //create senders with disk store
@@ -769,10 +693,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     //start the senders in async mode. This will ensure that the 
     //node of shadow PR that went down last will come up first
-    vm4.invokeAsync(startSenderRunnable());
-    vm5.invokeAsync(startSenderRunnable());
-    vm6.invokeAsync(startSenderRunnable());
-    vm7.invokeAsync(startSenderRunnable());
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
     
     LogWriterUtils.getLogWriter().info("Waiting for senders running.");
     //wait for senders running
@@ -813,14 +734,11 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on remote site
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
 
     //create cache in local site
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     //create senders with disk store
     String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -847,10 +765,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName(), "ln", 1, 100, isOffHeap() ));
 
     //start the senders on local site
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
     
     //wait for senders to become running
     vm4.invoke(waitForSenderRunnable());
@@ -878,11 +793,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     LogWriterUtils.getLogWriter().info("Killed all the senders.");
     
     //restart the vm
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
-    
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
     LogWriterUtils.getLogWriter().info("Created back the cache");
     
   //create senders with disk store
@@ -917,11 +829,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     //start the senders in async mode. This will ensure that the 
     //node of shadow PR that went down last will come up first
-    vm4.invokeAsync(startSenderRunnable());
-    vm5.invokeAsync(startSenderRunnable());
-    vm6.invokeAsync(startSenderRunnable());
-    vm7.invokeAsync(startSenderRunnable());
-    
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
     LogWriterUtils.getLogWriter().info("Waiting for senders running.");
     //wait for senders running
     vm4.invoke(waitForSenderRunnable());
@@ -953,14 +862,10 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on remote site
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
 
-    //create cache in local site
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     //create senders with disk store
     String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -987,11 +892,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName(), "ln", 1, 100, isOffHeap() ));
 
     //start the senders on local site
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
-    
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
     //wait for senders to become running
     vm4.invoke(waitForSenderRunnable());
     vm5.invoke(waitForSenderRunnable());
@@ -1018,11 +920,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     LogWriterUtils.getLogWriter().info("Killed all the senders.");
     
     //restart the vm
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
-    
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
     LogWriterUtils.getLogWriter().info("Created back the cache");
     
   //create senders with disk store
@@ -1043,11 +942,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     //start the senders in async mode. This will ensure that the 
     //node of shadow PR that went down last will come up first
-    vm4.invokeAsync(startSenderRunnable());
-    vm5.invokeAsync(startSenderRunnable());
-    vm6.invokeAsync(startSenderRunnable());
-    vm7.invokeAsync(startSenderRunnable());
-    
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
     LogWriterUtils.getLogWriter().info("Waiting for senders running.");
     //wait for senders running
     vm4.invoke(waitForSenderRunnable());
@@ -1089,10 +985,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
 
     // create cache in local site
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     // create PR on local site
     vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
@@ -1116,10 +1009,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     LogWriterUtils.getLogWriter().info("Killed all the senders.");
 
     // restart the vm
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     LogWriterUtils.getLogWriter().info("Created back the cache");
 
@@ -1182,14 +1072,11 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on remote site
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
 
     //create cache in local site
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     //create senders with disk store
     String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -1216,11 +1103,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName(), "ln", 1, 100, isOffHeap() ));
 
     //start the senders on local site
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
-    
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
     //wait for senders to become running
     vm4.invoke(waitForSenderRunnable());
     vm5.invoke(waitForSenderRunnable());
@@ -1247,11 +1131,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     LogWriterUtils.getLogWriter().info("Killed all the senders.");
     
     //restart the vm
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
-    
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
     LogWriterUtils.getLogWriter().info("Created back the cache");
     
     
@@ -1265,11 +1146,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     
     //start the senders. NOTE that the senders are not associated with partitioned region
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
-    
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
     LogWriterUtils.getLogWriter().info("Started the senders.");
     
     LogWriterUtils.getLogWriter().info("Waiting for senders running.");
@@ -1302,14 +1180,11 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on remote site
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
 
     //create cache in local site
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     //create senders with disk store
     String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -1332,11 +1207,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     vm7.invoke(createPartitionedRegionRunnable());
 
     //start the senders on local site
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
-    
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
     //wait for senders to become running
     vm4.invoke(waitForSenderRunnable());
     vm5.invoke(waitForSenderRunnable());
@@ -1362,11 +1234,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     LogWriterUtils.getLogWriter().info("Killed all the senders. The local site has been brought down.");
     
     //restart the vm
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
-    
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
     LogWriterUtils.getLogWriter().info("Created back the cache");
     
   //create senders with disk store
@@ -1386,11 +1255,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
     
     //start the senders
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
-    
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
     LogWriterUtils.getLogWriter().info("Started the senders.");
     
     LogWriterUtils.getLogWriter().info("Waiting for senders running.");
@@ -1426,14 +1292,11 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on remote site
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
 
     //create cache in local site
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
       true, 100, 10, false, false, null, true ));
@@ -1461,11 +1324,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName(), "ln", 1, 100, isOffHeap() ));
     
     //start the senders on local site
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
-    
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
     //wait for senders to become running
     vm4.invoke(waitForSenderRunnable());
     vm5.invoke(waitForSenderRunnable());
@@ -1492,11 +1352,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     LogWriterUtils.getLogWriter().info("Killed all the senders.");
     
     //restart the vm
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
-    
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
     LogWriterUtils.getLogWriter().info("Created back the cache");
     
    //create back the senders
@@ -1511,17 +1368,9 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     LogWriterUtils.getLogWriter().info("Created the senders again");
     
-    vm4.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
-    vm5.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
-    vm6.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
-    vm7.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
-    
     //start the senders
-    vm4.invokeAsync(startSenderRunnable());
-    vm5.invokeAsync(startSenderRunnable());
-    vm6.invokeAsync(startSenderRunnable());
-    vm7.invokeAsync(startSenderRunnable());
-    
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
     LogWriterUtils.getLogWriter().info("Started the senders.");
     
     LogWriterUtils.getLogWriter().info("Waiting for senders running.");
@@ -1581,14 +1430,11 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on remote site
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
 
     //create cache in local site
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
     
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
       true, 100, 10, false, true, null, true ));
@@ -1612,12 +1458,9 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
     vm3.invoke(() -> WANTestBase.createPartitionedRegion(
       getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
-    
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
     //start puts in region on local site
     vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
     LogWriterUtils.getLogWriter().info("Completed puts in the region");
@@ -1640,13 +1483,10 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
       Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-      vm2.invoke(createReceiverRunnable(nyPort));
-      vm3.invoke(createReceiverRunnable(nyPort));
+      createCacheInVMs(nyPort, vm2, vm3);
+      createReceiverInVMs(nyPort, vm2, vm3);
 
-      vm4.invoke(createCacheRunnable(lnPort));
-      vm5.invoke(createCacheRunnable(lnPort));
-      vm6.invoke(createCacheRunnable(lnPort));
-      vm7.invoke(createCacheRunnable(lnPort));
+      createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
       vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, true, null, true ));
@@ -1666,10 +1506,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
         getTestMethodName(), "ln", 1, 100, isOffHeap() ));
 
-      vm4.invoke(startSenderRunnable());
-      vm5.invoke(startSenderRunnable());
-      vm6.invoke(startSenderRunnable());
-      vm7.invoke(startSenderRunnable());
+      startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
       vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
         getTestMethodName(), null, 1, 100, isOffHeap() ));
@@ -1694,10 +1531,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       vm6.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { true});
       vm7.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { true});
 
-      vm4.invoke(createCacheRunnable(lnPort));
-      vm5.invoke(createCacheRunnable(lnPort));
-      vm6.invoke(createCacheRunnable(lnPort));
-      vm7.invoke(createCacheRunnable(lnPort));
+      createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
       AsyncInvocation async4 = vm4.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
         getTestMethodName(), null, 1, 100, isOffHeap() ));

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
index 594514b..52169ba 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
@@ -63,12 +63,9 @@ public class ParallelWANPropagationClientServerDUnitTest extends WANTestBase {
 
     vm7.invoke(() -> WANTestBase.createClientWithLocator(
       lnPort, "localhost", getTestMethodName() + "_PR" ));
-    
-    AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    AsyncInvocation inv2 = vm6.invokeAsync(() -> WANTestBase.startSender( "ln" ));
 
-    inv1.join();
-    inv2.join();
+    startSenderInVMsAsync("ln", vm5, vm6);
+
     // before doing any puts, let the senders be running in order to ensure that
     // not a single event will be lost
     

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
index 8ae5d0b..2c0d693 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
@@ -41,11 +41,10 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
-    vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
 
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+    createCacheInVMs(lnPort, vm4, vm5);
  
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false, null, true ));
@@ -56,10 +55,9 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
         getTestMethodName() + "_PR", "ln", 1, 1, isOffHeap() ));
     vm5.invoke(() -> WANTestBase.createPartitionedRegion(
         getTestMethodName() + "_PR", "ln", 1, 1, isOffHeap() ));
- 
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
- 
+
+    startSenderInVMs("ln", vm4, vm5);
+
     vm2.invoke(() -> WANTestBase.createPartitionedRegion(
         getTestMethodName() + "_PR", null, 1, 1, isOffHeap() ));
     vm3.invoke(() -> WANTestBase.createPartitionedRegion(
@@ -107,11 +105,10 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
-    vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
 
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+    createCacheInVMs(lnPort, vm4, vm5);
  
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false, null, true ));
@@ -122,10 +119,9 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
         getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap() ));
     vm5.invoke(() -> WANTestBase.createPartitionedRegion(
         getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap() ));
- 
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
- 
+
+    startSenderInVMs("ln", vm4, vm5);
+
     vm2.invoke(() -> WANTestBase.createPartitionedRegion(
         getTestMethodName() + "_PR", null, 1, 10, isOffHeap() ));
     vm3.invoke(() -> WANTestBase.createPartitionedRegion(
@@ -154,17 +150,15 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
+    createCacheInVMs(nyPort, vm2);
     vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
 
     vm2.invoke(() -> WANTestBase.createPartitionedRegion(
       getTestMethodName() + "_PR", null, 3, 4, isOffHeap() ));
   
     vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_PR"));
-  
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false, null, true ));
@@ -183,11 +177,8 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
         getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() ));
     vm7.invoke(() -> WANTestBase.createPartitionedRegion(
         getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() ));
-    
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.addQueueListener( "ln", true));
     vm5.invoke(() -> WANTestBase.addQueueListener( "ln", true));

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
index d8d2585..6d4b03a 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
@@ -82,11 +82,8 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
   public void testParallelPropagation_withoutRemoteSite() throws Exception {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-    
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     //keep a larger batch to minimize number of exception occurrences in the log
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
@@ -103,15 +100,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm6.invoke(createPartitionedRegionRedundancy1Runnable());
     vm7.invoke(createPartitionedRegionRedundancy1Runnable());
 
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
-
-    vm4.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", false ));
-    vm5.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", false));
-    vm6.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", false ));
-    vm7.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", false ));
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
     
     //make sure all the senders are running before doing any puts
     vm4.invoke(waitForSenderRunnable());
@@ -122,12 +111,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
       1000 ));
 
-    
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
-    
+    createCacheInVMs(nyPort, vm2, vm3);
     vm2.invoke(createReceiverPartitionedRegionRedundancy1());
     vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+    createReceiverInVMs(nyPort, vm2, vm3);
     
     //verify all buckets drained on all sender nodes.
     vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
@@ -142,10 +129,6 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm2.invoke(() -> WANTestBase.validateRegionSize(
         getTestMethodName() + "_PR", 1000 ));
   }
-
-  protected SerializableRunnableIF createCacheRunnable(Integer lnPort) {
-    return () -> WANTestBase.createCache( lnPort );
-  }
   
   /**
    * Normal happy scenario test case.
@@ -155,13 +138,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
-
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false, null, true ));
@@ -177,10 +156,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm6.invoke(createPartitionedRegionRedundancy1Runnable());
     vm7.invoke(createPartitionedRegionRedundancy1Runnable());
 
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
     vm2.invoke(createReceiverPartitionedRegionRedundancy1());
     vm3.invoke(createReceiverPartitionedRegionRedundancy1());
@@ -215,10 +191,6 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
         getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap()  );
   }
 
-  protected SerializableRunnableIF startSenderRunnable() {
-    return () -> WANTestBase.startSender( "ln" );
-  }
-
   protected SerializableRunnableIF waitForSenderRunnable() {
     return () -> WANTestBase.waitForSenderRunningState( "ln" );
   }
@@ -227,13 +199,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
-
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false, null, false ));
@@ -280,13 +248,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
-
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false, null, true ));
@@ -304,21 +268,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
 
     vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
       1000 ));
-    
-    AsyncInvocation inv1 = vm4.invokeAsync(startSenderRunnable());
-    AsyncInvocation inv2 = vm5.invokeAsync(startSenderRunnable());
-    AsyncInvocation inv3 = vm6.invokeAsync(startSenderRunnable());
-    AsyncInvocation inv4 = vm7.invokeAsync(startSenderRunnable());
 
-    try{
-      inv1.join();
-      inv2.join();
-      inv3.join();
-      inv4.join();
-    }
-    catch(InterruptedException ie) {
-      fail("Caught interrupted exception");
-    }
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
     vm2.invoke(createReceiverPartitionedRegionRedundancy1());
     vm3.invoke(createReceiverPartitionedRegionRedundancy1());
 
@@ -349,13 +301,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
-
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false, null, true ));
@@ -371,10 +319,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm6.invoke(createPartitionedRegionRedundancy1Runnable());
     vm7.invoke(createPartitionedRegionRedundancy1Runnable());
 
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
     vm2.invoke(createReceiverPartitionedRegionRedundancy1());
     vm3.invoke(createReceiverPartitionedRegionRedundancy1());
@@ -400,11 +345,8 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     Integer regionSize = 
       (Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() + "_PR" ));
     LogWriterUtils.getLogWriter().info("Region size on remote is: " + regionSize);
-    
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
     
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false, null, true ));
@@ -415,20 +357,12 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false, null, true ));
 
-    vm4.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
-    vm5.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
-    vm6.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
-    vm7.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
-    
     vm4.invoke(createPartitionedRegionRedundancy1Runnable());
     vm5.invoke(createPartitionedRegionRedundancy1Runnable());
     vm6.invoke(createPartitionedRegionRedundancy1Runnable());
     vm7.invoke(createPartitionedRegionRedundancy1Runnable());
 
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
     vm4.invoke(waitForSenderRunnable());
     vm5.invoke(waitForSenderRunnable());
@@ -458,13 +392,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
-
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false, null, true ));
@@ -480,10 +410,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
     vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
 
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
     vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap()  ));
     vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap()  ));
@@ -512,13 +439,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
-
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false, null, true ));
@@ -534,10 +457,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
     vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
 
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
     vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), null, 1, 100, isOffHeap()  ));
     vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), null, 1, 100, isOffHeap()  ));
@@ -565,13 +485,13 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
-
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap()  ));
+    createReceiverInVMs(nyPort, vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false, null, true ));
@@ -591,15 +511,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm7.invoke(() -> WANTestBase.createPartitionedRegion(
         getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
 
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
-
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName(), null, 1, 100, isOffHeap()  ));
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName(), null, 1, 100, isOffHeap()  ));
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
     //let all the senders start before doing any puts to ensure that none of the events is lost
     vm4.invoke(waitForSenderRunnable());
@@ -625,13 +537,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
-
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender( "lnSerial",
         2, false, 100, 10, false, false, null, true ));
@@ -673,13 +581,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm7.invoke(() -> WANTestBase.createPartitionedRegion(
         getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap()  ));
 
-    vm4.invoke(() -> WANTestBase.startSender( "lnSerial" ));
-    vm5.invoke(() -> WANTestBase.startSender( "lnSerial" ));
+    startSenderInVMs("lnSerial", vm4, vm5);
 
-    vm4.invoke(() -> WANTestBase.startSender( "lnParallel" ));
-    vm5.invoke(() -> WANTestBase.startSender( "lnParallel" ));
-    vm6.invoke(() -> WANTestBase.startSender( "lnParallel" ));
-    vm7.invoke(() -> WANTestBase.startSender( "lnParallel" ));
+    startSenderInVMs("lnParallel", vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
         1000 ));
@@ -704,13 +608,15 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
     Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
 
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(tkPort));
+    createCacheInVMs(nyPort, vm2);
+    vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+    createReceiverInVMs(nyPort, vm2);
 
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(tkPort, vm3);
+    vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+    createReceiverInVMs(tkPort, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender( "lnParallel1",
         2, true, 100, 10, false, false, null, true ));
@@ -739,18 +645,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm7.invoke(() -> WANTestBase.createPartitionedRegion(
         getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap()  ));
 
-    vm4.invoke(() -> WANTestBase.startSender( "lnParallel1" ));
-    vm5.invoke(() -> WANTestBase.startSender( "lnParallel1" ));
-    vm6.invoke(() -> WANTestBase.startSender( "lnParallel1" ));
-    vm7.invoke(() -> WANTestBase.startSender( "lnParallel1" ));
+    startSenderInVMs("lnParallel1", vm4, vm5, vm6, vm7);
 
-    vm4.invoke(() -> WANTestBase.startSender( "lnParallel2" ));
-    vm5.invoke(() -> WANTestBase.startSender( "lnParallel2" ));
-    vm6.invoke(() -> WANTestBase.startSender( "lnParallel2" ));
-    vm7.invoke(() -> WANTestBase.startSender( "lnParallel2" ));
+    startSenderInVMs("lnParallel2", vm4, vm5, vm6, vm7);
 
-    vm2.invoke(createReceiverPartitionedRegionRedundancy1());
-    vm3.invoke(createReceiverPartitionedRegionRedundancy1());
 
     //before doing puts, make sure that the senders are started.
     //this will ensure that not a single events is lost
@@ -793,13 +691,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
-
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false, null, true ));
@@ -810,10 +704,6 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false, null, true ));
 
-    vm4.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
-    vm5.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
-    vm6.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
-    vm7.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
     
     vm4.invoke(() -> WANTestBase.createPartitionedRegion(
         getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap()  ));
@@ -824,11 +714,8 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm7.invoke(() -> WANTestBase.createPartitionedRegion(
         getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap()  ));
 
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
-    
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
     vm2.invoke(createReceiverPartitionedRegionRedundancy1());
     vm3.invoke(createReceiverPartitionedRegionRedundancy1());
 
@@ -862,13 +749,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
-
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false,
@@ -892,10 +775,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm7.invoke(() -> WANTestBase.createPartitionedRegion(
         getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
 
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
     vm2.invoke(() -> WANTestBase.createPartitionedRegion(
         getTestMethodName(), null, 1, 100, isOffHeap()  ));
@@ -921,13 +801,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
-
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false, null, true ));
@@ -943,10 +819,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm6.invoke(createPartitionedRegionRedundancy1Runnable());
     vm7.invoke(createPartitionedRegionRedundancy1Runnable());
 
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
     vm2.invoke(createReceiverPartitionedRegionRedundancy1());
     vm3.invoke(createReceiverPartitionedRegionRedundancy1());
@@ -983,13 +856,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
-
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
-    vm6.invoke(createCacheRunnable(lnPort));
-    vm7.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 100, false, false, null, true ));
@@ -1005,10 +874,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm6.invoke(createPartitionedRegionRedundancy1Runnable());
     vm7.invoke(createPartitionedRegionRedundancy1Runnable());
 
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
-    vm6.invoke(startSenderRunnable());
-    vm7.invoke(startSenderRunnable());
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
     vm2.invoke(createReceiverPartitionedRegionRedundancy1());
     vm3.invoke(createReceiverPartitionedRegionRedundancy1());
@@ -1064,11 +930,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(createReceiverRunnable(nyPort));
-    vm3.invoke(createReceiverRunnable(nyPort));
-
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5);
     //vm6.invoke(() -> WANTestBase.createCache( lnPort ));
     //vm7.invoke(() -> WANTestBase.createCache( lnPort ));
 
@@ -1088,8 +952,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
 //    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
 //        testName + "_PR", "ln", true, 1, 100, isOffHeap()  ));
 
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
+    startSenderInVMs("ln", vm4, vm5);
 //    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
 //    vm7.invoke(() -> WANTestBase.startSender( "ln" ));
 
@@ -1118,11 +981,11 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
   public void disable_testParallelGatewaySenderQueueLocalSize() {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-    
-    vm2.invoke(createReceiverRunnable(nyPort));
 
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(nyPort, vm2);
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(nyPort, vm2);
+    createCacheInVMs(lnPort, vm4, vm5);
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false, null, true ));
@@ -1132,8 +995,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm4.invoke(createPartitionedRegionRedundancy1Runnable());
     vm5.invoke(createPartitionedRegionRedundancy1Runnable());
 
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
+    startSenderInVMs("ln", vm4, vm5);
 
     vm4.invoke(waitForSenderRunnable());
     vm5.invoke(waitForSenderRunnable());
@@ -1171,11 +1033,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     IgnoredException.addIgnoredException("Unexpected IOException");
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-    
-    vm2.invoke(createReceiverRunnable(nyPort));
 
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(nyPort, vm2);
+    createCacheInVMs(lnPort, vm4, vm5);
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false, null, true ));
@@ -1185,8 +1046,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm4.invoke(createPartitionedRegionRedundancy1Runnable());
     vm5.invoke(createPartitionedRegionRedundancy1Runnable());
 
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
+    startSenderInVMs("ln", vm4, vm5);
 
     vm4.invoke(waitForSenderRunnable());
     vm5.invoke(waitForSenderRunnable());
@@ -1225,22 +1085,22 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
    * Added for defect #50364 Can't colocate region that has AEQ with a region that does not have that same AEQ
    */
   public void testParallelSenderAttachedToChildRegionButNotToParentRegion() {
-	Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
-	Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-	    
-	//create cache and receiver on site2
-	vm2.invoke(createReceiverRunnable(nyPort));
-
-	//create cache on site1
-	vm3.invoke(createCacheRunnable(lnPort));   
-	
-	//create sender on site1
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+	  //create cache and receiver on site2
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(nyPort, vm2);
+	  //create cache on site1
+    createCacheInVMs(lnPort, vm3);
+
+	  //create sender on site1
     vm3.invoke(() -> WANTestBase.createSender( "ln", 2,
       true, 100, 10, false, false, null, true ));
     
     //start sender on site1
-    vm3.invoke(startSenderRunnable());
-    
+    startSenderInVMs("ln", vm3);
+
     //create leader (parent) PR on site1
     vm3.invoke(() -> WANTestBase.createPartitionedRegion(
         getTestMethodName() + "PARENT_PR", null, 0, 100, isOffHeap() ));
@@ -1269,13 +1129,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm6.invoke(createReceiverRunnable(nyPort));
-    vm7.invoke(createReceiverRunnable(nyPort));
+    createCacheInVMs(nyPort, vm6, vm7);
+    createReceiverInVMs(nyPort, vm6, vm7);
 
-    vm2.invoke(createCacheRunnable(lnPort));
-    vm3.invoke(createCacheRunnable(lnPort));
-    vm4.invoke(createCacheRunnable(lnPort));
-    vm5.invoke(createCacheRunnable(lnPort));
+    createCacheInVMs(lnPort, vm2, vm3, vm4, vm5);
 
     vm2.invoke(() -> WANTestBase.createSender( "ln", 2, true,
         100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true ));
@@ -1295,10 +1152,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
     vm5.invoke(() -> WANTestBase.createPartitionedRegion(
         getTestMethodName(), "ln", 1, 100, isOffHeap() ));
 
-    vm2.invoke(startSenderRunnable());
-    vm3.invoke(startSenderRunnable());
-    vm4.invoke(startSenderRunnable());
-    vm5.invoke(startSenderRunnable());
+    startSenderInVMs("ln", vm2, vm3, vm4, vm5);
 
     vm6.invoke(() -> WANTestBase.createPartitionedRegion(
         getTestMethodName(), null, 1, 100, isOffHeap() ));
@@ -1347,8 +1201,4 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
         
   }
 
-  protected SerializableCallableIF<Integer> createReceiverRunnable(
-      Integer nyPort) {
-    return () -> WANTestBase.createReceiver( nyPort );
-  } 
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
index b921c88..7adba41 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
@@ -42,15 +42,11 @@ public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase {
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on site1 and site2
+    createCacheInVMs(lnPort, vm2, vm4, vm5);
     vm2.invoke(() -> WANTestBase.createReceiver( lnPort ));
+    createCacheInVMs(nyPort, vm3, vm6, vm7);
     vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
 
-    //create cache on site1 and site2
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( nyPort ));
-    vm7.invoke(() -> WANTestBase.createCache( nyPort ));
-
     //create senders on site1
     vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
       true, 100, 10, false, false, null, true ));
@@ -84,15 +80,13 @@ public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase {
       getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap()  ));
     
     //start sender on site1
-    vm2.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-    
+    startSenderInVMs("ln", vm2, vm4, vm5);
+
+
     //start sender on site2
-    vm3.invoke(() -> WANTestBase.startSender( "ny" ));
-    vm6.invoke(() -> WANTestBase.startSender( "ny" ));
-    vm7.invoke(() -> WANTestBase.startSender( "ny" ));
-    
+    startSenderInVMs("ny", vm3, vm6, vm7);
+
+
     //pause senders on site1
     vm2.invoke(() -> WANTestBase.pauseSender( "ln" ));
     vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
@@ -162,13 +156,14 @@ public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase {
     Integer tkPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
     
     //create cache and receivers on all the 3 sites
-    vm3.invoke(() -> WANTestBase.createReceiver( lnPort ));
-    vm4.invoke(() -> WANTestBase.createReceiver( nyPort ));
-    vm5.invoke(() -> WANTestBase.createReceiver( tkPort ));
-    
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( nyPort ));
-    
+    createCacheInVMs(lnPort, vm3, vm6);
+    createReceiverInVMs(lnPort, vm3, vm6);
+    createCacheInVMs(nyPort, vm4, vm7);
+    createReceiverInVMs(nyPort, vm4, vm7);
+    createCacheInVMs(tkPort, vm5);
+    createReceiverInVMs(tkPort, vm5);
+
+
     //create senders on all the 3 sites
     vm3.invoke(() -> WANTestBase.createSender( "ln", 2,
       true, 100, 10, false, false, null, true ));
@@ -198,12 +193,10 @@ public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase {
       getTestMethodName() + "_PR", "tk", 0, 100, isOffHeap() ));
     
     //start senders on all the sites 
-    vm3.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-    
-    vm4.invoke(() -> WANTestBase.startSender( "ny" ));
-    vm7.invoke(() -> WANTestBase.startSender( "ny" ));
-    
+    startSenderInVMs("ln", vm3, vm6);
+
+    startSenderInVMs("ny", vm4, vm7);
+
     vm5.invoke(() -> WANTestBase.startSender( "tk" ));
     
     //pause senders on site1 and site3. Site2 has the sender running to pass along events
@@ -265,14 +258,14 @@ public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
     Integer tkPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
-    
+
+    createCacheInVMs(lnPort, vm3, vm6);
+    createCacheInVMs(nyPort, vm4, vm7);
+    createCacheInVMs(tkPort, vm5);
     vm3.invoke(() -> WANTestBase.createReceiver( lnPort ));
     vm4.invoke(() -> WANTestBase.createReceiver( nyPort ));
     vm5.invoke(() -> WANTestBase.createReceiver( tkPort ));
-    
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( nyPort ));
-    
+
     //site1
     vm3.invoke(() -> WANTestBase.createSender( "ln1", 2,
       true, 100, 10, false, false, null, true ));

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index 55fd5ad..9d9c074 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -47,8 +47,8 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    createReceiver(vm2, nyPort);
-    createReceiver(vm3, nyPort);
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
 
     createSendersWithConflation(lnPort);
 
@@ -79,16 +79,16 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    createReceiver(vm2, nyPort);
-   
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(nyPort, vm2);
 
     createSenders(lnPort);
 
     createReceiverPR(vm2, 0);
    
     createSenderPRs(0);
-    
-    startSenders();
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.doPuts( testName,
         NUM_PUTS ));
@@ -116,15 +116,16 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    createReceiver(vm2, nyPort);
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(nyPort, vm2);
 
     createSenders(lnPort);
 
     createReceiverPR(vm2, 0);
 
     createSenderPRs(3);
-    
-    startSenders();
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.doPuts( testName,
         NUM_PUTS ));
@@ -152,8 +153,10 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
     Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
 
-    createReceiver(vm2, nyPort);
-    createReceiver(vm3, tkPort);
+    createCacheInVMs(nyPort, vm2);
+    createCacheInVMs(tkPort, vm3);
+    createReceiverInVMs(nyPort, vm2);
+    createReceiverInVMs(tkPort, vm3);
 
     vm4.invoke(() -> WANTestBase.createCache(lnPort ));
 
@@ -207,16 +210,16 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    createReceiver(vm2, nyPort);
-    
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(nyPort, vm2);
 
     createSenders(lnPort);
     
     createReceiverPR(vm2, 0);
     
     createSenderPRs(3);
-    
-    startSenders();
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
     AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( testName, 1000 ));
     pause(200);
@@ -256,7 +259,8 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    createReceiver(vm2, nyPort);
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(nyPort, vm2);
 
     createSenders(lnPort);
 
@@ -267,7 +271,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
 
     createSenderPRs(0);
 
-    startSenders();
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
     //start puts in RR_1 in another thread
     vm4.invoke(() -> WANTestBase.doPuts( testName, 2000 ));
@@ -291,12 +295,10 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort ));
 
-    createReceiver(vm2, nyPort);
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(nyPort, vm2);
 
-    vm4.invoke(() -> WANTestBase.createCache(lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache(lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache(lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache(lnPort ));
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, false,
@@ -313,7 +315,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
   
     createSenderPRs(0);
 
-    startSenders();
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
     createReceiverPR(vm2, 1);
     
@@ -342,7 +344,8 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    createReceiver(vm2, nyPort);
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(nyPort, vm2);
 
     createSendersWithConflation(lnPort);
 
@@ -433,7 +436,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
   }
 
   protected void startPausedSenders() {
-    startSenders();
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
     
     vm4.invoke(() ->pauseSender( "ln" ));
     vm5.invoke(() ->pauseSender( "ln" ));
@@ -441,22 +444,8 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     vm7.invoke(() ->pauseSender( "ln" ));
   }
 
-  protected void createReceiver(VM vm, Integer nyPort) {
-    vm.invoke(() -> WANTestBase.createReceiver( nyPort ));
-  }
-  
-  protected void startSenders() {
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.startSender( "ln" ));
-  }
-
   protected void createSendersWithConflation(Integer lnPort) {
-    vm4.invoke(() -> WANTestBase.createCache(lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache(lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache(lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache(lnPort ));
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, true, false, null, true ));

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
index 7a05644..7f69904 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
@@ -58,8 +58,7 @@ public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase {
     vm5.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
         false, 100, 10, false, false, null, false, true));
 
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+    startSenderInVMs("ln", vm4, vm5);
 
     vm4.invoke(() -> WANTestBase.createReplicatedRegion(
         getTestMethodName() + "_RR", "ln", isOffHeap() ));
@@ -98,13 +97,11 @@ public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
-    vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
 
-    vm4.invoke(() -> WANTestBase.createCache(lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache(lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache(lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache(lnPort ));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
         false, 100, 10, false, false, null, false, true));
@@ -116,8 +113,7 @@ public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase {
     vm3.invoke(() -> WANTestBase.createReplicatedRegion(
         getTestMethodName() + "_RR", null, isOffHeap() ));
 
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+    startSenderInVMs("ln", vm4, vm5);
 
     vm4.invoke(() -> WANTestBase.createReplicatedRegion(
         getTestMethodName() + "_RR", "ln", isOffHeap() ));
@@ -158,13 +154,10 @@ public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
-    vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
 
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
         false, 100, 10, false, false, null, true, true));
@@ -176,8 +169,7 @@ public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase {
     vm3.invoke(() -> WANTestBase.createReplicatedRegion(
         getTestMethodName() + "_RR", null, isOffHeap() ));
 
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+    startSenderInVMs("ln", vm4, vm5);
 
     vm4.invoke(() -> WANTestBase.createReplicatedRegion(
         getTestMethodName() + "_RR", "ln", isOffHeap() ));
@@ -213,10 +205,10 @@ public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort ));
 
-    vm2.invoke(() -> WANTestBase.createReceiver(nyPort ));
-    vm3.invoke(() -> WANTestBase.createReceiver(nyPort ));
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(nyPort, vm2, vm3);
 
-    vm4.invoke(() -> WANTestBase.createCache(lnPort ));
+    createCacheInVMs(lnPort, vm4);
 
     vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
         false, 100, 10, false, false, null, false, false ));



Mime
View raw message