geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [02/51] [partial] incubator-geode git commit: WAN and CQ code drop under the Pivotal SGA
Date Wed, 20 Jan 2016 02:22:08 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogation_PartitionedRegionDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogation_PartitionedRegionDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogation_PartitionedRegionDUnitTest.java
new file mode 100644
index 0000000..ddf4901
--- /dev/null
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogation_PartitionedRegionDUnitTest.java
@@ -0,0 +1,431 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+
+import dunit.AsyncInvocation;
+
+public class SerialWANPropogation_PartitionedRegionDUnitTest extends WANTestBase {
+
+  private static final long serialVersionUID = 1L;
+
+  public SerialWANPropogation_PartitionedRegionDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+  
+  
+  public void testPartitionedSerialPropagation() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    //vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
+        1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 1000 });
+  }
+
+  public void testBothReplicatedAndPartitionedSerialPropagation()
+      throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+    vm5.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
+        1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 1000 });
+  }
+
+  public void testSerialReplicatedAndPartitionedPropagation() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "lnSerial",
+        2, false, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "lnSerial",
+        2, false, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "lnSerial", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "lnSerial", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "lnSerial", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "lnSerial", isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "lnSerial", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "lnSerial", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "lnSerial", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "lnSerial", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial" });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+    vm5.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
+        1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 1000 });
+  }
+
+  public void testSerialReplicatedAndSerialPartitionedPropagation()
+      throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "lnSerial1",
+        2, false, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "lnSerial1",
+        2, false, 100, 10, false, false, null, true });
+
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "lnSerial2",
+        2, false, 100, 10, false, false, null, true });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "lnSerial2",
+        2, false, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "lnSerial1", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "lnSerial1", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "lnSerial1", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "lnSerial1", isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "lnSerial2", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "lnSerial2", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "lnSerial2", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "lnSerial2", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial1" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial1" });
+
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial2" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial2" });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+    vm5.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
+        1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 1000 });
+  }
+  
+  public void testPartitionedSerialPropagationToTwoWanSites() throws Exception {
+
+    Integer lnPort = createFirstLocatorWithDSId(1);
+    Integer nyPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] {2, lnPort });
+    Integer tkPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] {3,lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver",
+        new Object[] {nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver",
+        new Object[] {tkPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "lnSerial1",
+        2, false, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "lnSerial1",
+        2, false, 100, 10, false, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "lnSerial2",
+        3, false, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "lnSerial2",
+        3, false, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial1" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial1" });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial2" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial2" });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
+        1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 1000 });
+  }
+
+  public void testPartitionedSerialPropagationHA() throws Exception {
+    addExpectedException("Broken pipe");
+    addExpectedException("Connection reset");
+    addExpectedException("Unexpected IOException");
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+
+    //do initial 100 puts to create all the buckets
+    vm5.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 100 });
+    
+    addExpectedException(CancelException.class.getName());
+    addExpectedException(CacheClosedException.class.getName());
+    addExpectedException(ForceReattemptException.class.getName());
+    //start async puts
+    AsyncInvocation inv = vm5.invokeAsync(WANTestBase.class, "doPuts",
+        new Object[] { testName + "_PR", 1000 });
+    //close the cache on vm4 in between the puts
+    vm4.invoke(WANTestBase.class, "killSender", new Object[] {});
+
+    inv.join();
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 1000 });
+  }
+
+  public void testPartitionedSerialPropagationWithParallelThreads()
+      throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "doMultiThreadedPuts", new Object[] {
+        testName + "_PR", 1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 1000 });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationsFeatureDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationsFeatureDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationsFeatureDUnitTest.java
new file mode 100644
index 0000000..d0b984d
--- /dev/null
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationsFeatureDUnitTest.java
@@ -0,0 +1,364 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase.MyGatewayEventFilter;
+
+
+public class SerialWANPropogationsFeatureDUnitTest extends WANTestBase{
+
+  private static final long serialVersionUID = 1L;
+
+  public SerialWANPropogationsFeatureDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+  
+  public void testSerialReplicatedWanWithOverflow() {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, "createFirstLocatorWithDSId",
+        new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver",
+        new Object[] {nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver",
+        new Object[] {nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "doHeavyPuts", new Object[] {
+        testName + "_RR", 120 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 120 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 120 });
+  }
+
+  public void testSerialReplicatedWanWithPersistence() {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, true, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, true, null, true });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+
+  }
+
+  public void testReplicatedSerialPropagationWithConflation() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 1000, true, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 1000, true, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+  }
+  
+  public void testReplicatedSerialPropagationWithParallelThreads()
+      throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "doMultiThreadedPuts", new Object[] {
+        testName + "_RR", 1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+  }
+  
+  public void testSerialPropogationWithFilter() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, "createFirstLocatorWithDSId",
+        new Object[] {1});
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] {2,lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false,
+        new MyGatewayEventFilter(), true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false,
+        new MyGatewayEventFilter(), true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap()  });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap()  });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap()  });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap()  });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName, 1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, 800 });
+  }
+
+  public void testReplicatedSerialPropagationWithFilter() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false,
+        new MyGatewayEventFilter(), true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false,
+        new MyGatewayEventFilter(), true });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName, null, isOffHeap()  });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName, null, isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName, "ln", isOffHeap()  });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName, "ln", isOffHeap()  });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName, "ln", isOffHeap()  });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName, "ln", isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName, 1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, 800 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, 800 });
+  }
+  
+  public void testReplicatedSerialPropagationWithFilter_AfterAck()
+      throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm6.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm7.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm2.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender",
+        new Object[] { "ln", 2, false, 100, 10, false, false,
+            new MyGatewayEventFilter_AfterAck(), true });
+    vm5.invoke(WANTestBase.class, "createSender",
+        new Object[] { "ln", 2, false, 100, 10, false, false,
+            new MyGatewayEventFilter_AfterAck(), true });
+
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName, null, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName, null, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName, "ln", isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName, "ln", isOffHeap() });
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName, "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName, "ln", isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName, 1000 });
+
+    vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { "ln",
+        0 });
+    vm5.invoke(WANTestBase.class, "validateQueueContents", new Object[] { "ln",
+        0 });
+
+    Integer vm4Acks = (Integer)vm4.invoke(WANTestBase.class,
+        "validateAfterAck", new Object[] { "ln"});
+    Integer vm5Acks = (Integer)vm5.invoke(WANTestBase.class,
+        "validateAfterAck", new Object[] { "ln"});
+
+    assertEquals(2000, (vm4Acks + vm5Acks));
+
+    vm6.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, 1000 });
+    vm7.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, 1000 });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
new file mode 100644
index 0000000..dff58bf
--- /dev/null
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
@@ -0,0 +1,588 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase.MyGatewayEventFilter;
+
+import dunit.AsyncInvocation;
+
+public class SerialWANStatsDUnitTest extends WANTestBase {
+  
+  private static final long serialVersionUID = 1L;
+
+  public SerialWANStatsDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    addExpectedException("java.net.ConnectException");
+    addExpectedException("java.net.SocketException");
+    addExpectedException("Unexpected IOException");
+  }
+  
+  public void testReplicatedSerialPropagation() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+
+    vm5.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    
+    pause(2000);
+    vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {100, 1000, 1000 });
+    
+    vm4.invoke(WANTestBase.class, "checkQueueStats", new Object[] {"ln",
+      0, 1000, 1000, 1000});
+    vm4.invoke(WANTestBase.class, "checkBatchStats", new Object[] {"ln",
+      100});
+    
+    vm5.invoke(WANTestBase.class, "checkQueueStats", new Object[] {"ln",
+      0, 1000, 0, 0});
+    vm5.invoke(WANTestBase.class, "checkBatchStats", new Object[] {"ln",
+      0});
+    
+  }
+  
+  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
+	Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+		"createFirstLocatorWithDSId", new Object[] { 1 });
+	Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+		"createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+	vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+	vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+	vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+	vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+	vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+	vm4.invoke(WANTestBase.class, "createSenderWithMultipleDispatchers", new Object[] { "ln", 2,
+		false, 100, 10, false, false, null, true, 2, OrderPolicy.KEY });
+	vm5.invoke(WANTestBase.class, "createSenderWithMultipleDispatchers", new Object[] { "ln", 2,
+		false, 100, 10, false, false, null, true, 2, OrderPolicy.KEY });
+
+	vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+		testName + "_RR", null, isOffHeap()  });
+
+	vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+	vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+	vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+		testName + "_RR", "ln", isOffHeap()  });
+	vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+		testName + "_RR", "ln", isOffHeap()  });
+	vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+		testName + "_RR", "ln", isOffHeap()  });
+	vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+		testName + "_RR", "ln", isOffHeap()  });
+
+	vm5.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+		1000 });
+
+	vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+		testName + "_RR", 1000 });
+	    
+	pause(2000);
+	vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {100, 1000, 1000 });
+	    
+	vm4.invoke(WANTestBase.class, "checkQueueStats", new Object[] {"ln",
+		0, 1000, 1000, 1000});
+	vm4.invoke(WANTestBase.class, "checkBatchStats", new Object[] {"ln",
+		100});
+	    
+	vm5.invoke(WANTestBase.class, "checkQueueStats", new Object[] {"ln",
+		0, 1000, 0, 0});
+	vm5.invoke(WANTestBase.class, "checkBatchStats", new Object[] {"ln",
+		0});
+	    
+	  }
+  
+  public void testWANStatsTwoWanSites() throws Exception {
+
+    Integer lnPort = createFirstLocatorWithDSId(1);
+    Integer nyPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+    Integer tkPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 3, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { tkPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "lnSerial1",
+        2, false, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "lnSerial1",
+        2, false, 100, 10, false, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "lnSerial2",
+        3, false, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "lnSerial2",
+        3, false, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial1" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial1" });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial2" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial2" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "lnSerial1,lnSerial2", isOffHeap()  });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "lnSerial1,lnSerial2", isOffHeap()  });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "lnSerial1,lnSerial2", isOffHeap()  });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "lnSerial1,lnSerial2", isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    
+    pause(2000);
+    vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {100, 1000, 1000 });
+    vm3.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {100, 1000, 1000 });
+    
+    vm4.invoke(WANTestBase.class, "checkQueueStats", new Object[] {"lnSerial1",
+      0, 1000, 1000, 1000});
+    vm4.invoke(WANTestBase.class, "checkBatchStats", new Object[] {"lnSerial1",
+      100});
+    vm4.invoke(WANTestBase.class, "checkQueueStats", new Object[] {"lnSerial2",
+      0, 1000, 1000, 1000});
+    vm4.invoke(WANTestBase.class, "checkBatchStats", new Object[] {"lnSerial2",
+      100});
+    vm5.invoke(WANTestBase.class, "checkQueueStats", new Object[] {"lnSerial1",
+      0, 1000, 0, 0});
+    vm5.invoke(WANTestBase.class, "checkBatchStats", new Object[] {"lnSerial1",
+      0});
+    vm5.invoke(WANTestBase.class, "checkQueueStats", new Object[] {"lnSerial2",
+      0, 1000, 0, 0});
+    vm5.invoke(WANTestBase.class, "checkBatchStats", new Object[] {"lnSerial2",
+      0});
+    
+  }
+
+  public void testReplicatedSerialPropagationHA() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+    
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    
+    AsyncInvocation inv1 = vm5.invokeAsync(WANTestBase.class, "doPuts",
+        new Object[] { testName + "_RR", 10000 });
+    pause(2000);
+    AsyncInvocation inv2 = vm4.invokeAsync(WANTestBase.class, "killSender", new Object[] { "ln" });
+    Boolean isKilled = Boolean.FALSE;
+    try {
+      isKilled = (Boolean)inv2.getResult();
+    }
+    catch (Throwable e) {
+      fail("Unexpected exception while killing a sender");
+    }
+    AsyncInvocation inv3 = null; 
+    if(!isKilled){
+      inv3 = vm5.invokeAsync(WANTestBase.class, "killSender", new Object[] { "ln" });
+      inv3.join();
+    }
+    inv1.join();
+    inv2.join();
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 10000 });
+  
+    vm2.invoke(WANTestBase.class, "checkGatewayReceiverStatsHA", new Object[] {1000, 10000, 10000 });
+    
+    vm5.invoke(WANTestBase.class, "checkStats_Failover", new Object[] {"ln", 10000});
+  }
+  
+  public void testReplicatedSerialPropagationUNPorcessedEvents() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    //these are part of remote site
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    //these are part of local site
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    //senders are created on local site
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 20, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 20, false, false, null, true });
+
+    //create one RR (RR_1) on remote site
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", null, isOffHeap()  });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", null, isOffHeap()  });
+
+    //create another RR (RR_2) on remote site
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", null, isOffHeap()  });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", null, isOffHeap()  });
+    
+    //start the senders on local site
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    //create one RR (RR_1) on local site
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap()  });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap()  });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap()  });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap()  });
+
+    //create another RR (RR_2) on local site
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap()  });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap()  });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap()  });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap()  });
+    
+    //start puts in RR_1 in another thread
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR_1", 1000 });
+    //do puts in RR_2 in main thread
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR_2", 500 });
+    //sleep for some time to let all the events propagate to remote site
+    Thread.sleep(20);
+    //vm4.invoke(WANTestBase.class, "verifyQueueSize", new Object[] { "ln", 0 });
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR_1", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR_2", 500 });
+    
+    pause(2000);
+    vm4.invoke(WANTestBase.class, "checkQueueStats", new Object[] {"ln",
+      0, 1500, 1500, 1500});
+    vm4.invoke(WANTestBase.class, "checkBatchStats", new Object[] {"ln",
+      75});
+    vm4.invoke(WANTestBase.class, "checkUnProcessedStats", new Object[] {"ln", 0});
+    
+    
+    vm5.invoke(WANTestBase.class, "checkQueueStats", new Object[] {"ln",
+      0, 1500, 0, 0});
+    vm5.invoke(WANTestBase.class, "checkBatchStats", new Object[] {"ln",
+      0});
+    vm5.invoke(WANTestBase.class, "checkUnProcessedStats", new Object[] {"ln", 1500});
+  }
+  
+  /**
+   * 
+   * Disabled - see ticket #52118
+   * 
+   * 1 region and sender configured on local site and 1 region and a 
+   * receiver configured on remote site. Puts to the local region are in progress.
+   * Remote region is destroyed in the middle.
+   * 
+   * @throws Exception
+   */
+  public void testReplicatedSerialPropagationWithRemoteRegionDestroy() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    //these are part of remote site
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    //these are part of local site
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    //senders are created on local site
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 100, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 100, false, false, null, true });
+
+    //create one RR (RR_1) on remote site
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", null, isOffHeap()  });
+
+    //start the senders on local site
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    //create one RR (RR_1) on local site
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap()  });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap()  });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap()  });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap()  });
+
+    //start puts in RR_1 in another thread
+    AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, "doPuts", new Object[] { testName + "_RR_1", 20000 });
+    //destroy RR_1 in remote site
+    vm2.invoke(WANTestBase.class, "destroyRegion", new Object[] { testName + "_RR_1", 500});
+    
+    try {
+      inv1.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+
+    //verify that all is well in local site. All the events should be present in local region
+    vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR_1", 20000 });
+    //assuming some events might have been dispatched before the remote region was destroyed,
+    //sender's region queue will have events less than 1000 but the queue will not be empty.
+    //NOTE: this much verification might be sufficient in DUnit. Hydra will take care of 
+    //more in depth validations.
+    vm4.invoke(WANTestBase.class, "verifyRegionQueueNotEmpty", new Object[] {"ln" });
+    
+    vm4.invoke(WANTestBase.class, "checkBatchStats", new Object[] {"ln", true, true});
+    
+    vm5.invoke(WANTestBase.class, "checkUnProcessedStats", new Object[] {"ln", 20000});
+    
+    vm2.invoke(WANTestBase.class, "checkExcepitonStats", new Object[] {1});
+    
+  }
+  
+  public void testSerialPropogationWithFilter() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, "createFirstLocatorWithDSId",
+        new Object[] {1});
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] {2,lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false,
+        new MyGatewayEventFilter(), true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false,
+        new MyGatewayEventFilter(), true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap()  });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap()  });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap()  });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap()  });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName, 1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, 800 });
+    
+    pause(2000);
+    vm4.invoke(WANTestBase.class, "checkQueueStats", new Object[] {"ln",
+      0, 1000, 900, 800});
+    vm4.invoke(WANTestBase.class, "checkEventFilteredStats", new Object[] {"ln",
+      200});
+    vm4.invoke(WANTestBase.class, "checkBatchStats", new Object[] {"ln",
+      80});
+    vm4.invoke(WANTestBase.class, "checkUnProcessedStats", new Object[] {"ln", 0});
+    
+    
+    vm5.invoke(WANTestBase.class, "checkQueueStats", new Object[] {"ln",
+      0, 1000, 0, 0});
+    vm5.invoke(WANTestBase.class, "checkBatchStats", new Object[] {"ln",
+      0});
+    vm5.invoke(WANTestBase.class, "checkUnProcessedStats", new Object[] {"ln",900});
+  }
+  
+  public void testSerialPropagationConflation() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, true, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap()  });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap()  });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap()  });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 0, 100, isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "pauseSenderAndWaitForDispatcherToPause", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null,1, 100, isOffHeap()  });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null,1, 100, isOffHeap()  });
+
+    final Map keyValues = new HashMap();
+    final Map updateKeyValues = new HashMap();
+    for(int i=0; i< 1000; i++) {
+      keyValues.put(i, i);
+    }
+    
+    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
+
+    pause(5000);
+    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
+    for(int i=0;i<500;i++) {
+      updateKeyValues.put(i, i+"_updated");
+    }
+    
+    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+
+    pause(5000);
+    
+    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size()  + updateKeyValues.size() });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, 0 });
+    
+    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+
+    pause(5000);
+    
+    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size()  + updateKeyValues.size() });
+
+    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+
+    keyValues.putAll(updateKeyValues);
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, keyValues.size() });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+      testName, keyValues.size() });
+    
+    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        testName, keyValues });
+    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
+        testName, keyValues });
+    
+    pause(2000);
+    vm4.invoke(WANTestBase.class, "checkQueueStats", new Object[] {"ln",
+      0, 2000, 2000, 1500});
+    vm4.invoke(WANTestBase.class, "checkConflatedStats", new Object[] {"ln",
+      500});
+  }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/wancommand/WANCommandTestBase.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/wancommand/WANCommandTestBase.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/wancommand/WANCommandTestBase.java
new file mode 100644
index 0000000..6f9347c
--- /dev/null
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/wancommand/WANCommandTestBase.java
@@ -0,0 +1,505 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan.wancommand;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.management.remote.JMXConnectorServer;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.Locator;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.management.ManagementService;
+import com.gemstone.gemfire.management.internal.cli.commands.CliCommandTestBase;
+
+import dunit.Host;
+import dunit.SerializableRunnable;
+import dunit.VM;
+
+public class WANCommandTestBase extends CliCommandTestBase{
+
+  static Cache cache;
+  private JMXConnectorServer jmxConnectorServer;
+  private ManagementService managementService;
+//  public String jmxHost;
+//  public int jmxPort;
+
+  static VM vm0;
+  static VM vm1;
+  static VM vm2;
+  static VM vm3;
+  static VM vm4;
+  static VM vm5;
+  static VM vm6;
+  static VM vm7;
+
+  public WANCommandTestBase(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    final Host host = Host.getHost(0);
+    vm0 = host.getVM(0);
+    vm1 = host.getVM(1);
+    vm2 = host.getVM(2);
+    vm3 = host.getVM(3);
+    vm4 = host.getVM(4);
+    vm5 = host.getVM(5);
+    vm6 = host.getVM(6);
+    vm7 = host.getVM(7);
+    enableManagement();
+  }
+
+  public static Integer createFirstLocatorWithDSId(int dsId) {
+    WANCommandTestBase test = new WANCommandTestBase(testName);
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME,"0");
+    props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, ""+dsId);
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + port + "]");
+    props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+    return port;
+  }
+
+  public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
+    WANCommandTestBase test = new WANCommandTestBase(testName);
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME,"0");
+    props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, ""+dsId);
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + port + "]");
+    props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
+    props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, "localhost[" + remoteLocPort + "]");
+    test.getSystem(props);
+    return port;
+  }
+
+  public static void createCache(Integer locPort){
+    WANCommandTestBase test = new WANCommandTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + "]");
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+  }
+
+  public static void createCacheWithGroups(Integer locPort, String groups){
+    WANCommandTestBase test = new WANCommandTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + "]");
+    props.setProperty(DistributionConfig.GROUPS_NAME, groups);
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+  }
+
+  public static void createSender(String dsName, int remoteDsId,
+      boolean isParallel, Integer maxMemory,
+      Integer batchSize, boolean isConflation, boolean isPersistent,
+      GatewayEventFilter filter, boolean isManulaStart) {
+    File persistentDirectory = new File(dsName +"_disk_"+System.currentTimeMillis()+"_" + VM.getCurrentVMNum());
+    persistentDirectory.mkdir();
+    DiskStoreFactory dsf = cache.createDiskStoreFactory();
+    File [] dirs1 = new File[] {persistentDirectory};
+    if(isParallel) {
+      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+      gateway.setParallel(true);
+      gateway.setMaximumQueueMemory(maxMemory);
+      gateway.setBatchSize(batchSize);
+      gateway.setManualStart(isManulaStart);
+      if (filter != null) {
+        gateway.addGatewayEventFilter(filter);
+      }
+      if(isPersistent) {
+        gateway.setPersistenceEnabled(true);
+        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
+      }
+      else {
+        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+        gateway.setDiskStoreName(store.getName());
+      }
+      gateway.setBatchConflationEnabled(isConflation);
+      gateway.create(dsName, remoteDsId);
+
+    }else {
+      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+      gateway.setMaximumQueueMemory(maxMemory);
+      gateway.setBatchSize(batchSize);
+      gateway.setManualStart(isManulaStart);
+      if (filter != null) {
+        gateway.addGatewayEventFilter(filter);
+      }
+      gateway.setBatchConflationEnabled(isConflation);
+      if(isPersistent) {
+        gateway.setPersistenceEnabled(true);
+        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
+      }
+      else {
+        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+        gateway.setDiskStoreName(store.getName());
+      }
+      gateway.create(dsName, remoteDsId);
+    }
+  }
+
+  public static void startSender(String senderId){
+    final ExpectedException exln = addExpectedException("Could not connect");
+    try {
+      Set<GatewaySender> senders = cache.getGatewaySenders();
+      GatewaySender sender = null;
+      for (GatewaySender s : senders) {
+        if (s.getId().equals(senderId)) {
+          sender = s;
+          break;
+        }
+      }
+      sender.start();
+    } finally {
+      exln.remove();
+    }
+  }
+
+  public static void pauseSender(String senderId){
+    final ExpectedException exln = addExpectedException("Could not connect");
+    try {
+      Set<GatewaySender> senders = cache.getGatewaySenders();
+      GatewaySender sender = null;
+      for (GatewaySender s : senders) {
+        if (s.getId().equals(senderId)) {
+          sender = s;
+          break;
+        }
+      }
+      sender.pause();
+    } finally {
+      exln.remove();
+    }
+  }
+
+  public static int createAndStartReceiver(int locPort) {
+    WANCommandTestBase test = new WANCommandTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort
+        + "]");
+
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    fact.setStartPort(port);
+    fact.setEndPort(port);
+    fact.setManualStart(true);
+    GatewayReceiver receiver = fact.create();
+    try {
+         receiver.start();
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail("Test " + test.getName() + " failed to start GatewayRecevier");
+    }
+    return port;
+  }
+
+  public static int createReceiver(int locPort) {
+    WANCommandTestBase test = new WANCommandTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort
+        + "]");
+
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    fact.setStartPort(port);
+    fact.setEndPort(port);
+    fact.setManualStart(true);
+    GatewayReceiver receiver = fact.create();
+    return port;
+  }
+
+  public static int createReceiverWithGroup(int locPort, String groups) {
+    WANCommandTestBase test = new WANCommandTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort
+        + "]");
+    props.setProperty(DistributionConfig.GROUPS_NAME, groups);
+
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    fact.setStartPort(port);
+    fact.setEndPort(port);
+    fact.setManualStart(true);
+    GatewayReceiver receiver = fact.create();
+    return port;
+  }
+
+  public static void startReceiver() {
+    WANCommandTestBase test = new WANCommandTestBase(testName);
+    try {
+      Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
+      for (GatewayReceiver receiver : receivers) {
+        receiver.start();
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail("Test " + test.getName() + " failed to start GatewayRecevier");
+    }
+  }
+
+  public static void stopReceiver() {
+    WANCommandTestBase test = new WANCommandTestBase(testName);
+    Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
+    for (GatewayReceiver receiver : receivers) {
+      receiver.stop();
+    }
+  }
+
+  public static int createAndStartReceiverWithGroup(int locPort, String groups) {
+    WANCommandTestBase test = new WANCommandTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort
+        + "]");
+    props.setProperty(DistributionConfig.GROUPS_NAME, groups);
+
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    fact.setStartPort(port);
+    fact.setEndPort(port);
+    fact.setManualStart(true);
+    GatewayReceiver receiver = fact.create();
+    try {
+      receiver.start();
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail("Test " + test.getName() + " failed to start GatewayRecevier on port " + port);
+    }
+    return port;
+  }
+
+  public static DistributedMember getMember(){
+    return cache.getDistributedSystem().getDistributedMember();
+  }
+
+
+  public static int getLocatorPort(){
+    return Locator.getLocators().get(0).getPort();
+  }
+
+  /**
+   * Enable system property gemfire.disableManagement false in each VM.
+   *
+   * @throws Exception
+   */
+  public void enableManagement() {
+    invokeInEveryVM(new SerializableRunnable("Enable Management") {
+      public void run() {
+        System.setProperty(InternalDistributedSystem.DISABLE_MANAGEMENT_PROPERTY, "false");
+      }
+    });
+
+  }
+
+  public static void verifySenderState(String senderId, boolean isRunning, boolean isPaused) {
+    final ExpectedException exln = addExpectedException("Could not connect");
+    try {
+      Set<GatewaySender> senders = cache.getGatewaySenders();
+      AbstractGatewaySender sender = null;
+      for (GatewaySender s : senders) {
+        if (s.getId().equals(senderId)) {
+          sender = (AbstractGatewaySender) s;
+          break;
+        }
+      }
+
+      assertEquals(isRunning, sender.isRunning());
+      assertEquals(isPaused, sender.isPaused());
+    } finally {
+      exln.remove();
+    }
+  }
+
+  public static void verifySenderAttributes(String senderId, int remoteDsID,
+      boolean isParallel, boolean manualStart, int socketBufferSize,
+      int socketReadTimeout, boolean enableBatchConflation, int batchSize,
+      int batchTimeInterval, boolean enablePersistence,
+      boolean diskSynchronous, int maxQueueMemory, int alertThreshold,
+      int dispatcherThreads, OrderPolicy orderPolicy,
+      List<String> expectedGatewayEventFilters,
+      List<String> expectedGatewayTransportFilters) {
+
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    AbstractGatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = (AbstractGatewaySender)s;
+        break;
+      }
+    }
+    assertEquals("remoteDistributedSystemId", remoteDsID, sender
+        .getRemoteDSId());
+    assertEquals("isParallel", isParallel, sender.isParallel());
+    assertEquals("manualStart", manualStart, sender.isManualStart());
+    assertEquals("socketBufferSize", socketBufferSize, sender
+        .getSocketBufferSize());
+    assertEquals("socketReadTimeout", socketReadTimeout, sender
+        .getSocketReadTimeout());
+    assertEquals("enableBatchConflation", enableBatchConflation, sender
+        .isBatchConflationEnabled());
+    assertEquals("batchSize", batchSize, sender.getBatchSize());
+    assertEquals("batchTimeInterval", batchTimeInterval, sender
+        .getBatchTimeInterval());
+    assertEquals("enablePersistence", enablePersistence, sender
+        .isPersistenceEnabled());
+    assertEquals("diskSynchronous", diskSynchronous, sender.isDiskSynchronous());
+    assertEquals("maxQueueMemory", maxQueueMemory, sender
+        .getMaximumQueueMemory());
+    assertEquals("alertThreshold", alertThreshold, sender.getAlertThreshold());
+    assertEquals("dispatcherThreads", dispatcherThreads, sender
+        .getDispatcherThreads());
+    assertEquals("orderPolicy", orderPolicy, sender.getOrderPolicy());
+
+    // verify GatewayEventFilters
+    if (expectedGatewayEventFilters != null) {
+      assertEquals("gatewayEventFilters", expectedGatewayEventFilters.size(),
+          sender.getGatewayEventFilters().size());
+
+      List<GatewayEventFilter> actualGatewayEventFilters = sender
+          .getGatewayEventFilters();
+      List<String> actualEventFilterClassnames = new ArrayList<String>(
+          actualGatewayEventFilters.size());
+      for (GatewayEventFilter filter : actualGatewayEventFilters) {
+        actualEventFilterClassnames.add(filter.getClass().getName());
+      }
+
+      for (String expectedGatewayEventFilter : expectedGatewayEventFilters) {
+        if (!actualEventFilterClassnames.contains(expectedGatewayEventFilter)) {
+          fail("GatewayEventFilter " + expectedGatewayEventFilter
+              + " is not added to the GatewaySender");
+        }
+      }
+    }
+
+    // verify GatewayTransportFilters
+    if (expectedGatewayTransportFilters != null) {
+      assertEquals("gatewayTransportFilters", expectedGatewayTransportFilters
+          .size(), sender.getGatewayTransportFilters().size());
+      List<GatewayTransportFilter> actualGatewayTransportFilters = sender
+          .getGatewayTransportFilters();
+      List<String> actualTransportFilterClassnames = new ArrayList<String>(
+          actualGatewayTransportFilters.size());
+      for (GatewayTransportFilter filter : actualGatewayTransportFilters) {
+        actualTransportFilterClassnames.add(filter.getClass().getName());
+      }
+
+      for (String expectedGatewayTransportFilter : expectedGatewayTransportFilters) {
+        if (!actualTransportFilterClassnames
+            .contains(expectedGatewayTransportFilter)) {
+          fail("GatewayTransportFilter " + expectedGatewayTransportFilter
+              + " is not added to the GatewaySender.");
+        }
+      }
+    }
+  }
+
+  public static void verifyReceiverState(boolean isRunning) {
+    WANCommandTestBase test = new WANCommandTestBase(testName);
+    Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
+    for (GatewayReceiver receiver : receivers) {
+      assertEquals(isRunning, receiver.isRunning());
+    }
+  }
+
+  public static void verifyReceiverCreationWithAttributes(boolean isRunning,
+      int startPort, int endPort, String bindAddress, int maxTimeBetweenPings,
+      int socketBufferSize, List<String> expectedGatewayTransportFilters) {
+
+    WANCommandTestBase test = new WANCommandTestBase(testName);
+    Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
+    assertEquals("Number of receivers is incorrect", 1, receivers.size());
+    for (GatewayReceiver receiver : receivers) {
+      assertEquals("isRunning", isRunning, receiver.isRunning());
+      assertEquals("startPort", startPort, receiver.getStartPort());
+      assertEquals("endPort", endPort, receiver.getEndPort());
+      assertEquals("bindAddress", bindAddress, receiver.getBindAddress());
+      assertEquals("maximumTimeBetweenPings", maxTimeBetweenPings, receiver
+          .getMaximumTimeBetweenPings());
+      assertEquals("socketBufferSize", socketBufferSize, receiver
+          .getSocketBufferSize());
+
+      // verify GatewayTransportFilters
+      if (expectedGatewayTransportFilters != null) {
+        assertEquals("gatewayTransportFilters", expectedGatewayTransportFilters
+            .size(), receiver.getGatewayTransportFilters().size());
+        List<GatewayTransportFilter> actualGatewayTransportFilters = receiver
+            .getGatewayTransportFilters();
+        List<String> actualTransportFilterClassnames = new ArrayList<String>(
+            actualGatewayTransportFilters.size());
+        for (GatewayTransportFilter filter : actualGatewayTransportFilters) {
+          actualTransportFilterClassnames.add(filter.getClass().getName());
+        }
+
+        for (String expectedGatewayTransportFilter : expectedGatewayTransportFilters) {
+          if (!actualTransportFilterClassnames
+              .contains(expectedGatewayTransportFilter)) {
+            fail("GatewayTransportFilter " + expectedGatewayTransportFilter
+                + " is not added to the GatewayReceiver.");
+          }
+        }
+      }
+    }
+  }
+
+  public void tearDown2() throws Exception {
+    super.tearDown2();
+    closeCache();
+    vm0.invoke(WANCommandTestBase.class, "closeCache");
+    vm1.invoke(WANCommandTestBase.class, "closeCache");
+    vm2.invoke(WANCommandTestBase.class, "closeCache");
+    vm3.invoke(WANCommandTestBase.class, "closeCache");
+    vm4.invoke(WANCommandTestBase.class, "closeCache");
+    vm5.invoke(WANCommandTestBase.class, "closeCache");
+    vm6.invoke(WANCommandTestBase.class, "closeCache");
+    vm7.invoke(WANCommandTestBase.class, "closeCache");
+  }
+
+  public static void closeCache() {
+    if (cache != null && !cache.isClosed()) {
+      cache.close();
+      cache.getDistributedSystem().disconnect();
+    }
+  }
+}



Mime
View raw message