geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [10/51] [partial] incubator-geode git commit: WAN and CQ code drop under the Pivotal SGA
Date Wed, 20 Jan 2016 02:22:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueueOverflowDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueueOverflowDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueueOverflowDUnitTest.java
new file mode 100644
index 0000000..ee8b5d6
--- /dev/null
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueueOverflowDUnitTest.java
@@ -0,0 +1,523 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import java.io.File;
+import java.util.Properties;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.cache30.MyGatewayEventFilter1;
+import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1;
+import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+
+import dunit.VM;
+
+/**
+ * DUnit for ParallelSenderQueue overflow operations.
+ * 
+ * @author pdeole
+ *
+ */
+public class ParallelGatewaySenderQueueOverflowDUnitTest extends WANTestBase {
+
+  private static final long serialVersionUID = 1L;
+  
+  public ParallelGatewaySenderQueueOverflowDUnitTest(String name) {
+    super(name);
+  }
+  
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+  
+  public void testParallelSenderQueueEventsOverflow_NoDiskStoreSpecified() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSenderWithoutDiskStore", new Object[] { "ln", 2,
+        true, 10, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSenderWithoutDiskStore", new Object[] { "ln", 2,
+        true, 10, 10, false, false, null, true });
+    vm6.invoke(WANTestBase.class, "createSenderWithoutDiskStore", new Object[] { "ln", 2,
+        true, 10, 10, false, false, null, true });
+    vm7.invoke(WANTestBase.class, "createSenderWithoutDiskStore", new Object[] { "ln", 2,
+        true, 10, 10, false, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    
+    //give some time for the senders to pause
+    pause(1000);
+    
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+
+    int numEventPuts = 50;
+    vm4.invoke(WANTestBase.class, "doHeavyPuts", new Object[] { testName, numEventPuts });
+    
+    long numOvVm4 = (Long) vm4.invoke(WANTestBase.class, "getNumberOfEntriesOverflownToDisk", new Object[] { "ln" });
+    long numOvVm5 = (Long) vm5.invoke(WANTestBase.class, "getNumberOfEntriesOverflownToDisk", new Object[] { "ln" });
+    long numOvVm6 = (Long) vm6.invoke(WANTestBase.class, "getNumberOfEntriesOverflownToDisk", new Object[] { "ln" });
+    long numOvVm7 = (Long) vm7.invoke(WANTestBase.class, "getNumberOfEntriesOverflownToDisk", new Object[] { "ln" });
+    
+    long numMemVm4 = (Long) vm4.invoke(WANTestBase.class, "getNumberOfEntriesInVM", new Object[] { "ln" });
+    long numMemVm5 = (Long) vm5.invoke(WANTestBase.class, "getNumberOfEntriesInVM", new Object[] { "ln" });
+    long numMemVm6 = (Long) vm6.invoke(WANTestBase.class, "getNumberOfEntriesInVM", new Object[] { "ln" });
+    long numMemVm7 = (Long) vm7.invoke(WANTestBase.class, "getNumberOfEntriesInVM", new Object[] { "ln" });
+    
+    getLogWriter().info("Entries overflown to disk: " + numOvVm4 + "," + numOvVm5 + "," + numOvVm6 + "," + numOvVm7);
+    getLogWriter().info("Entries in VM: " + numMemVm4 + "," + numMemVm5 + "," + numMemVm6 + "," + numMemVm7);
+    
+    long totalOverflown = numOvVm4 + numOvVm5 + numOvVm6 + numOvVm7; 
+    //considering a memory limit of 40 MB, maximum of 40 events can be in memory. Rest should be on disk.
+    assertTrue("Total number of entries overflown to disk should be at least greater than 55", (totalOverflown > 55));
+    
+    long totalInMemory = numMemVm4 + numMemVm5 + numMemVm6 + numMemVm7;
+    //expected is twice the number of events put due to redundancy level of 1  
+    assertEquals("Total number of entries on disk and in VM is incorrect", (numEventPuts*2), (totalOverflown + totalInMemory));
+    
+    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName, 50 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName, 50 });
+  }
+  
+  /**
+   * Keep same max memory limit for all the VMs
+   *   
+   * @throws Exception
+   */
+  public void _testParallelSenderQueueEventsOverflow() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 10, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 10, 10, false, false, null, true });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 10, 10, false, false, null, true });
+    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 10, 10, false, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    
+    //give some time for the senders to pause
+    pause(1000);
+    
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+
+    int numEventPuts = 50;
+    vm4.invoke(WANTestBase.class, "doHeavyPuts", new Object[] { testName, numEventPuts });
+    
+    long numOvVm4 = (Long) vm4.invoke(WANTestBase.class, "getNumberOfEntriesOverflownToDisk", new Object[] { "ln" });
+    long numOvVm5 = (Long) vm5.invoke(WANTestBase.class, "getNumberOfEntriesOverflownToDisk", new Object[] { "ln" });
+    long numOvVm6 = (Long) vm6.invoke(WANTestBase.class, "getNumberOfEntriesOverflownToDisk", new Object[] { "ln" });
+    long numOvVm7 = (Long) vm7.invoke(WANTestBase.class, "getNumberOfEntriesOverflownToDisk", new Object[] { "ln" });
+    
+    long numMemVm4 = (Long) vm4.invoke(WANTestBase.class, "getNumberOfEntriesInVM", new Object[] { "ln" });
+    long numMemVm5 = (Long) vm5.invoke(WANTestBase.class, "getNumberOfEntriesInVM", new Object[] { "ln" });
+    long numMemVm6 = (Long) vm6.invoke(WANTestBase.class, "getNumberOfEntriesInVM", new Object[] { "ln" });
+    long numMemVm7 = (Long) vm7.invoke(WANTestBase.class, "getNumberOfEntriesInVM", new Object[] { "ln" });
+    
+    getLogWriter().info("Entries overflown to disk: " + numOvVm4 + "," + numOvVm5 + "," + numOvVm6 + "," + numOvVm7);
+    getLogWriter().info("Entries in VM: " + numMemVm4 + "," + numMemVm5 + "," + numMemVm6 + "," + numMemVm7);
+    
+    long totalOverflown = numOvVm4 + numOvVm5 + numOvVm6 + numOvVm7; 
+    //considering a memory limit of 40 MB, maximum of 40 events can be in memory. Rest should be on disk.
+    assertTrue("Total number of entries overflown to disk should be at least greater than 55", (totalOverflown > 55));
+    
+    long totalInMemory = numMemVm4 + numMemVm5 + numMemVm6 + numMemVm7;
+    //expected is twice the number of events put due to redundancy level of 1  
+    assertEquals("Total number of entries on disk and in VM is incorrect", (numEventPuts*2), (totalOverflown + totalInMemory));
+    
+    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName, 50 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName, 50 });
+  }
+
+  /**
+   * Set a different memory limit for each VM and make sure that all the VMs are utilized to
+   * full extent of available memory.
+   * 
+   * @throws Exception
+   */
+  public void _testParallelSenderQueueEventsOverflow_2() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 10, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 5, 10, false, false, null, true });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 5, 10, false, false, null, true });
+    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 20, 10, false, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    
+    //give some time for the senders to pause
+    pause(1000);
+    
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+
+    int numEventPuts = 50;
+    vm4.invoke(WANTestBase.class, "doHeavyPuts", new Object[] { testName, numEventPuts });
+    
+    long numOvVm4 = (Long) vm4.invoke(WANTestBase.class, "getNumberOfEntriesOverflownToDisk", new Object[] { "ln" });
+    long numOvVm5 = (Long) vm5.invoke(WANTestBase.class, "getNumberOfEntriesOverflownToDisk", new Object[] { "ln" });
+    long numOvVm6 = (Long) vm6.invoke(WANTestBase.class, "getNumberOfEntriesOverflownToDisk", new Object[] { "ln" });
+    long numOvVm7 = (Long) vm7.invoke(WANTestBase.class, "getNumberOfEntriesOverflownToDisk", new Object[] { "ln" });
+    
+    long numMemVm4 = (Long) vm4.invoke(WANTestBase.class, "getNumberOfEntriesInVM", new Object[] { "ln" });
+    long numMemVm5 = (Long) vm5.invoke(WANTestBase.class, "getNumberOfEntriesInVM", new Object[] { "ln" });
+    long numMemVm6 = (Long) vm6.invoke(WANTestBase.class, "getNumberOfEntriesInVM", new Object[] { "ln" });
+    long numMemVm7 = (Long) vm7.invoke(WANTestBase.class, "getNumberOfEntriesInVM", new Object[] { "ln" });
+    
+    getLogWriter().info("Entries overflown to disk: " + numOvVm4 + "," + numOvVm5 + "," + numOvVm6 + "," + numOvVm7);
+    getLogWriter().info("Entries in VM: " + numMemVm4 + "," + numMemVm5 + "," + numMemVm6 + "," + numMemVm7);
+    
+    long totalOverflown = numOvVm4 + numOvVm5 + numOvVm6 + numOvVm7; 
+    //considering a memory limit of 40 MB, maximum of 40 events can be in memory. Rest should be on disk.
+    assertTrue("Total number of entries overflown to disk should be at least greater than 55", (totalOverflown > 55));
+    
+    long totalInMemory = numMemVm4 + numMemVm5 + numMemVm6 + numMemVm7;
+    //expected is twice the number of events put due to redundancy level of 1  
+    assertEquals("Total number of entries on disk and in VM is incorrect", (numEventPuts*2), (totalOverflown + totalInMemory));
+    
+    //assert the numbers for each VM
+    assertTrue("Number of entries in memory VM4 is incorrect. Should be less than 10", (numMemVm4 < 10));
+    assertTrue("Number of entries in memory VM5 is incorrect. Should be less than 5", (numMemVm5 < 5));
+    assertTrue("Number of entries in memory VM6 is incorrect. Should be less than 5", (numMemVm6 < 5));
+    assertTrue("Number of entries in memory VM7 is incorrect. Should be less than 20", (numMemVm7 < 20));
+    
+    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName, 50 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName, 50 });
+  }
+
+  public void _testParallelSenderQueueNoEventsOverflow() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 10, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 10, 10, false, false, null, true });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 10, 10, false, false, null, true });
+    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 10, 10, false, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    
+    //give some time for the senders to pause
+    pause(1000);
+    
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+
+    int numEventPuts = 15;
+    vm4.invoke(WANTestBase.class, "doHeavyPuts", new Object[] { testName, numEventPuts });
+    
+    long numOvVm4 = (Long) vm4.invoke(WANTestBase.class, "getNumberOfEntriesOverflownToDisk", new Object[] { "ln" });
+    long numOvVm5 = (Long) vm5.invoke(WANTestBase.class, "getNumberOfEntriesOverflownToDisk", new Object[] { "ln" });
+    long numOvVm6 = (Long) vm6.invoke(WANTestBase.class, "getNumberOfEntriesOverflownToDisk", new Object[] { "ln" });
+    long numOvVm7 = (Long) vm7.invoke(WANTestBase.class, "getNumberOfEntriesOverflownToDisk", new Object[] { "ln" });
+    
+    long numMemVm4 = (Long) vm4.invoke(WANTestBase.class, "getNumberOfEntriesInVM", new Object[] { "ln" });
+    long numMemVm5 = (Long) vm5.invoke(WANTestBase.class, "getNumberOfEntriesInVM", new Object[] { "ln" });
+    long numMemVm6 = (Long) vm6.invoke(WANTestBase.class, "getNumberOfEntriesInVM", new Object[] { "ln" });
+    long numMemVm7 = (Long) vm7.invoke(WANTestBase.class, "getNumberOfEntriesInVM", new Object[] { "ln" });
+    
+    getLogWriter().info("Entries overflown to disk: " + numOvVm4 + "," + numOvVm5 + "," + numOvVm6 + "," + numOvVm7);
+    getLogWriter().info("Entries in VM: " + numMemVm4 + "," + numMemVm5 + "," + numMemVm6 + "," + numMemVm7);
+    
+    long totalOverflown = numOvVm4 + numOvVm5 + numOvVm6 + numOvVm7; 
+    //all 30 (considering redundant copies) events should accommodate in 40 MB space given to 4 senders
+    assertEquals("Total number of entries overflown to disk is incorrect", 0, totalOverflown);
+    
+    long totalInMemory = numMemVm4 + numMemVm5 + numMemVm6 + numMemVm7;
+    //expected is twice the number of events put due to redundancy level of 1  
+    assertEquals("Total number of entries on disk and in VM is incorrect", (numEventPuts*2), (totalOverflown + totalInMemory));
+    
+    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName, 15 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName, 15 });
+  }
+  
+  /**
+   * Test to validate that ParallelGatewaySenderQueue diskSynchronous attribute
+   * when persistence of sender is enabled. 
+   */
+  public void _test_ValidateParallelGatewaySenderQueueAttributes_1() {
+    Integer localLocPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    
+    Integer remoteLocPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, localLocPort });
+    
+    WANTestBase test = new WANTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + localLocPort + "]");
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);  
+
+    File directory = new File("TKSender" + "_disk_"
+        + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+    directory.mkdir();
+    File[] dirs1 = new File[] { directory };
+    DiskStoreFactory dsf = cache.createDiskStoreFactory();
+    dsf.setDiskDirs(dirs1);
+    DiskStore diskStore = dsf.create("FORNY");
+    
+    GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+    fact.setParallel(true);//set parallel to true
+    fact.setBatchConflationEnabled(true);
+    fact.setBatchSize(200);
+    fact.setBatchTimeInterval(300);
+    fact.setPersistenceEnabled(true);//enable the persistence
+    fact.setDiskSynchronous(true);
+    fact.setDiskStoreName("FORNY");
+    fact.setMaximumQueueMemory(200);
+    fact.setAlertThreshold(1200);
+    GatewayEventFilter myeventfilter1 = new MyGatewayEventFilter1();
+    fact.addGatewayEventFilter(myeventfilter1);
+    GatewayTransportFilter myStreamfilter1 = new MyGatewayTransportFilter1();
+    fact.addGatewayTransportFilter(myStreamfilter1);
+    GatewayTransportFilter myStreamfilter2 = new MyGatewayTransportFilter2();
+    fact.addGatewayTransportFilter(myStreamfilter2);
+    final ExpectedException exTKSender = addExpectedException("Could not connect");
+    try {
+      GatewaySender sender1 = fact.create("TKSender", 2);
+
+      AttributesFactory factory = new AttributesFactory();
+      factory.addGatewaySenderId(sender1.getId());
+      factory.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+      Region region = cache.createRegionFactory(factory.create()).create(
+          "test_ValidateGatewaySenderAttributes");
+      Set<GatewaySender> senders = cache.getGatewaySenders();
+      assertEquals(senders.size(), 1);
+      GatewaySender gatewaySender = senders.iterator().next();
+      Set<RegionQueue> regionQueues = ((AbstractGatewaySender) gatewaySender)
+          .getQueues();
+      assertEquals(regionQueues.size(), 1);
+      RegionQueue regionQueue = regionQueues.iterator().next();
+      assertEquals(true, regionQueue.getRegion().getAttributes()
+          .isDiskSynchronous());
+    } finally {
+      exTKSender.remove();
+    }
+  }
+  
+  /**
+   * Test to validate that ParallelGatewaySenderQueue diskSynchronous attribute
+   * when persistence of sender is not enabled. 
+   */
+  public void _test_ValidateParallelGatewaySenderQueueAttributes_2() {
+    Integer localLocPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    
+    Integer remoteLocPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, localLocPort });
+    
+    WANTestBase test = new WANTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + localLocPort + "]");
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);  
+
+    GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+    fact.setParallel(true);//set parallel to true
+    fact.setBatchConflationEnabled(true);
+    fact.setBatchSize(200);
+    fact.setBatchTimeInterval(300);
+    fact.setPersistenceEnabled(false);//set persistence to false
+    fact.setDiskSynchronous(true);
+    fact.setMaximumQueueMemory(200);
+    fact.setAlertThreshold(1200);
+    GatewayEventFilter myeventfilter1 = new MyGatewayEventFilter1();
+    fact.addGatewayEventFilter(myeventfilter1);
+    GatewayTransportFilter myStreamfilter1 = new MyGatewayTransportFilter1();
+    fact.addGatewayTransportFilter(myStreamfilter1);
+    GatewayTransportFilter myStreamfilter2 = new MyGatewayTransportFilter2();
+    fact.addGatewayTransportFilter(myStreamfilter2);
+    final ExpectedException ex = addExpectedException("Could not connect");
+    try {
+      GatewaySender sender1 = fact.create("TKSender", 2);
+      AttributesFactory factory = new AttributesFactory();
+      factory.addGatewaySenderId(sender1.getId());
+      factory.setDataPolicy(DataPolicy.PARTITION);
+      Region region = cache.createRegionFactory(factory.create()).create(
+          "test_ValidateGatewaySenderAttributes");
+      Set<GatewaySender> senders = cache.getGatewaySenders();
+      assertEquals(senders.size(), 1);
+      GatewaySender gatewaySender = senders.iterator().next();
+      Set<RegionQueue> regionQueues = ((AbstractGatewaySender) gatewaySender)
+          .getQueues();
+      assertEquals(regionQueues.size(), 1);
+      RegionQueue regionQueue = regionQueues.iterator().next();
+      assertEquals(false, regionQueue.getRegion().getAttributes()
+          .isDiskSynchronous());
+    } finally {
+      ex.remove();
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
new file mode 100644
index 0000000..79a34c8
--- /dev/null
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
@@ -0,0 +1,791 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+
+/**
+ * @author skumar
+ * 
+ */
+public class ParallelWANConflationDUnitTest extends WANTestBase {
+  private static final long serialVersionUID = 1L;
+
+  public ParallelWANConflationDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    addExpectedException("java.net.ConnectException");
+  }
+
+  public void testParallelPropagationConflationDisabled() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true  });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true  });
+    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true  });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    pause(3000);
+
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+
+    pause(2000);
+    
+    final Map keyValues = new HashMap();
+    final Map updateKeyValues = new HashMap();
+    for(int i=0; i< 1000; i++) {
+      keyValues.put(i, i);
+    }
+    
+    
+    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
+
+    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
+    for(int i=0;i<500;i++) {
+      updateKeyValues.put(i, i+"_updated");
+    }
+    
+    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+
+    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", (keyValues.size() + updateKeyValues.size()) });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, 0 });
+
+    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+
+    keyValues.putAll(updateKeyValues);
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, keyValues.size() });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+      testName, keyValues.size() });
+    
+    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        testName, keyValues });
+    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        testName, keyValues });
+    
+  }
+
+  /**
+   * This test is desabled as it is not guaranteed to pass it everytime. This
+   * test is related to the conflation in batch. yet did find any way to
+   * ascertain that the vents in the batch will always be conflated.
+   * 
+   * @throws Exception
+   */
+  public void testParallelPropagationBatchConflation() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 50, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+      true, 100, 50, false, false, null, true });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+      true, 100, 50, false, false, null, true });
+    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+    true, 100, 50, false, false, null, true });
+  
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap() });
+    
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+
+    final Map keyValues = new HashMap();
+    
+    for (int i = 1; i <= 100; i++) {
+      for (int j = 1; j <= 10; j++) {
+        keyValues.put(j, i) ;
+      }
+      vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] {
+        testName, keyValues });
+    }
+    
+    vm4.invoke(WANTestBase.class, "enableConflation", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "enableConflation", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "enableConflation", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "enableConflation", new Object[] { "ln" });
+    
+    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    
+    pause(2000);
+    ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(
+        WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
+    ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(
+        WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
+    ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(
+        WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
+    ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(
+        WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
+    
+    getLogWriter().info("KBKBKB: batch conflated events : vm4 : " + v4List.get(8));
+    getLogWriter().info("KBKBKB: batch conflated events : vm5 : " + v5List.get(8));
+    getLogWriter().info("KBKBKB: batch conflated events : vm6 : " + v6List.get(8));
+    getLogWriter().info("KBKBKB: batch conflated events : vm7 : " + v7List.get(8));
+    getLogWriter().info("KBKBKB: batch conflated events : " + (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)));
+    assertTrue("No events conflated in batch", (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0);
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+      testName, 10 });
+
+  }
+  
+  public void testParallelPropagationConflation() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    pause(3000);
+
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+
+    pause(2000);
+    
+    final Map keyValues = new HashMap();
+    final Map updateKeyValues = new HashMap();
+    for(int i=0; i< 1000; i++) {
+      keyValues.put(i, i);
+    }
+    
+    
+    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
+
+    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
+    for(int i=0;i<500;i++) {
+      updateKeyValues.put(i, i+"_updated");
+    }
+    
+    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+
+    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() }); // creates aren't conflated
+    
+    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+
+    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() }); // creates aren't conflated
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, 0 });
+
+    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+
+    keyValues.putAll(updateKeyValues);
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, keyValues.size() });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+      testName, keyValues.size() });
+    
+    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        testName, keyValues });
+    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        testName, keyValues });
+  }
+  
+
+  /**
+   * Reproduce the bug #47213.
+   * The test is same as above test, with the only difference that 
+   * redundancy is set to 1.
+   * @throws Exception
+   */
+  public void testParallelPropagationConflation_Bug47213() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 2, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 2, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 2, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 2, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    pause(3000);
+
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+
+    pause(2000);//give some time for all the senders to pause
+    
+    final Map keyValues = new HashMap();
+    final Map updateKeyValues = new HashMap();
+    for(int i=0; i< 1000; i++) {
+      keyValues.put(i, i);
+    }
+    
+    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
+
+    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
+    for(int i=0;i<500;i++) {
+      updateKeyValues.put(i, i+"_updated");
+    }
+    
+    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+
+    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() }); // creates aren't conflated
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, 0 });
+
+    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+
+    keyValues.putAll(updateKeyValues);
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, keyValues.size() });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+      testName, keyValues.size() });
+    
+    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        testName, keyValues });
+    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        testName, keyValues });
+  }
+  
+  public void testParallelPropagationConflationOfRandomKeys() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    pause(3000);
+
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+
+    pause(2000);
+
+    final Map keyValues = new HashMap();
+    final Map updateKeyValues = new HashMap();
+    for(int i=0; i< 1000; i++) {
+      keyValues.put(i, i);
+    }
+    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
+
+    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
+    
+    while(updateKeyValues.size()!=500) {
+      int key = (new Random()).nextInt(keyValues.size());
+      updateKeyValues.put(key, key+"_updated");
+    }
+    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+
+    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() });
+
+    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+
+    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, 0 });
+
+    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+
+    
+    keyValues.putAll(updateKeyValues);
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, keyValues.size() });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+      testName, keyValues.size() });
+    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+      testName, keyValues });
+    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+      testName, keyValues });
+    
+  }
+  
+  public void testParallelPropagationColocatedRegionConflation()
+      throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+
+    vm4.invoke(WANTestBase.class,
+        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
+            testName, "ln", 0, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class,
+        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
+            testName, "ln", 0, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class,
+        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
+            testName, "ln", 0, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class,
+        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
+            testName, "ln", 0, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    pause(3000);
+
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class,
+        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
+            testName, null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class,
+        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
+            testName, null, 1, 100, isOffHeap() });
+
+    pause(2000);
+
+    Map custKeyValues = (Map)vm4.invoke(WANTestBase.class, "putCustomerPartitionedRegion",
+        new Object[] { 1000 });
+    Map orderKeyValues = (Map)vm4.invoke(WANTestBase.class, "putOrderPartitionedRegion",
+        new Object[] { 1000 });
+    Map shipmentKeyValues = (Map)vm4.invoke(WANTestBase.class, "putShipmentPartitionedRegion",
+        new Object[] { 1000 });
+
+    vm4.invoke(
+        WANTestBase.class,
+        "checkQueueSize",
+        new Object[] {
+            "ln",
+            (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
+                .size()) });
+
+    Map updatedCustKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateCustomerPartitionedRegion",
+        new Object[] { 500 });
+    Map updatedOrderKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateOrderPartitionedRegion",
+        new Object[] { 500 });
+    Map updatedShipmentKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateShipmentPartitionedRegion",
+        new Object[] { 500 });
+
+    vm4.invoke(
+        WANTestBase.class,
+        "checkQueueSize",
+        new Object[] {
+            "ln",
+            (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
+                .size())
+                + updatedCustKeyValues.size()
+                + updatedOrderKeyValues.size()
+                + updatedShipmentKeyValues.size() });
+
+    updatedCustKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateCustomerPartitionedRegion",
+        new Object[] { 500 });
+    updatedOrderKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateOrderPartitionedRegion",
+        new Object[] { 500 });
+    updatedShipmentKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateShipmentPartitionedRegion",
+        new Object[] { 500 });
+
+    vm4.invoke(
+        WANTestBase.class,
+        "checkQueueSize",
+        new Object[] {
+            "ln",
+            (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
+                .size())
+                + updatedCustKeyValues.size()
+                + updatedOrderKeyValues.size()
+                + updatedShipmentKeyValues.size() });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        WANTestBase.customerRegionName, 0 });
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        WANTestBase.orderRegionName, 0 });
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        WANTestBase.shipmentRegionName, 0 });
+
+    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    
+    custKeyValues.putAll(updatedCustKeyValues);
+    orderKeyValues.putAll(updatedOrderKeyValues);
+    shipmentKeyValues.putAll(updatedShipmentKeyValues);
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        WANTestBase.customerRegionName, custKeyValues.size() });
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        WANTestBase.orderRegionName, orderKeyValues.size() });
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        WANTestBase.shipmentRegionName, shipmentKeyValues.size() });
+
+    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        WANTestBase.customerRegionName, custKeyValues });
+    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        WANTestBase.orderRegionName, orderKeyValues });
+    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        WANTestBase.shipmentRegionName, shipmentKeyValues });
+    
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        WANTestBase.customerRegionName, custKeyValues.size() });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        WANTestBase.orderRegionName, orderKeyValues.size() });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        WANTestBase.shipmentRegionName, shipmentKeyValues.size() });
+
+    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        WANTestBase.customerRegionName, custKeyValues });
+    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        WANTestBase.orderRegionName, orderKeyValues });
+    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        WANTestBase.shipmentRegionName, shipmentKeyValues });
+    
+  }
+  
+  public void testParallelPropagationColoatedRegionConflationSameKey()
+      throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+
+    vm4.invoke(WANTestBase.class,
+        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
+            testName, "ln", 0, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class,
+        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
+            testName, "ln", 0, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class,
+        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
+            testName, "ln", 0, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class,
+        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
+            testName, "ln", 0, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    pause(3000);
+
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class,
+        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
+            testName, null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class,
+        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
+            testName, null, 1, 100, isOffHeap() });
+
+    pause(2000);
+
+    Map custKeyValues = (Map)vm4.invoke(WANTestBase.class, "putCustomerPartitionedRegion",
+        new Object[] { 1000 });
+    Map orderKeyValues = (Map)vm4.invoke(WANTestBase.class, "putOrderPartitionedRegionUsingCustId",
+        new Object[] { 1000 });
+    Map shipmentKeyValues = (Map)vm4.invoke(WANTestBase.class, "putShipmentPartitionedRegionUsingCustId",
+        new Object[] { 1000 });
+
+    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
+      .size()) });
+
+    Map updatedCustKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateCustomerPartitionedRegion",
+        new Object[] { 500 });
+    Map updatedOrderKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateOrderPartitionedRegionUsingCustId",
+        new Object[] { 500 });
+    Map updatedShipmentKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateShipmentPartitionedRegionUsingCustId",
+        new Object[] { 500 });
+
+    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
+        .size()) + updatedCustKeyValues.size() + updatedOrderKeyValues.size() + updatedShipmentKeyValues.size() });
+
+    updatedCustKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateCustomerPartitionedRegion",
+        new Object[] { 500 });
+    updatedOrderKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateOrderPartitionedRegionUsingCustId",
+        new Object[] { 500 });
+    updatedShipmentKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateShipmentPartitionedRegionUsingCustId",
+        new Object[] { 500 });
+
+    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
+        .size()) + updatedCustKeyValues.size() + updatedOrderKeyValues.size() + updatedShipmentKeyValues.size() });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        WANTestBase.customerRegionName, 0 });
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        WANTestBase.orderRegionName, 0 });
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        WANTestBase.shipmentRegionName, 0 });
+
+    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+
+    custKeyValues.putAll(updatedCustKeyValues);
+    orderKeyValues.putAll(updatedOrderKeyValues);
+    shipmentKeyValues.putAll(updatedShipmentKeyValues);
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        WANTestBase.customerRegionName, custKeyValues.size() });
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        WANTestBase.orderRegionName, orderKeyValues.size() });
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        WANTestBase.shipmentRegionName, shipmentKeyValues.size() });
+
+    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        WANTestBase.customerRegionName, custKeyValues });
+    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        WANTestBase.orderRegionName, orderKeyValues });
+    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        WANTestBase.shipmentRegionName, shipmentKeyValues });
+
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        WANTestBase.customerRegionName, custKeyValues.size() });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        WANTestBase.orderRegionName, orderKeyValues.size() });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        WANTestBase.shipmentRegionName, shipmentKeyValues.size() });
+
+    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        WANTestBase.customerRegionName, custKeyValues });
+    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        WANTestBase.orderRegionName, orderKeyValues });
+    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        WANTestBase.shipmentRegionName, shipmentKeyValues });
+  }
+  
+}


Mime
View raw message