geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From upthewatersp...@apache.org
Subject [22/70] [partial] incubator-geode git commit: WAN and CQ code drop under the Pivotal SGA
Date Thu, 28 Jan 2016 18:13:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
new file mode 100644
index 0000000..751a697
--- /dev/null
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
@@ -0,0 +1,5143 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.AttributesMutator;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.CacheListener;
+import com.gemstone.gemfire.cache.CacheTransactionManager;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
+import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallbackAdapter;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener;
+import com.gemstone.gemfire.cache.persistence.PartitionOfflineException;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.Locator;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.InternalLocator;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.FileUtil;
+import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.CacheConfig;
+import com.gemstone.gemfire.internal.cache.CacheServerImpl;
+import com.gemstone.gemfire.internal.cache.CustomerIDPartitionResolver;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.execute.data.CustId;
+import com.gemstone.gemfire.internal.cache.execute.data.Customer;
+import com.gemstone.gemfire.internal.cache.execute.data.Order;
+import com.gemstone.gemfire.internal.cache.execute.data.OrderId;
+import com.gemstone.gemfire.internal.cache.execute.data.Shipment;
+import com.gemstone.gemfire.internal.cache.execute.data.ShipmentId;
+import com.gemstone.gemfire.internal.cache.partitioned.PRLocallyDestroyedException;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerStats;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
+import com.gemstone.gemfire.pdx.SimpleClass;
+import com.gemstone.gemfire.pdx.SimpleClass1;
+import com.gemstone.gemfire.util.test.TestUtil;
+
+import dunit.DistributedTestCase;
+import dunit.Host;
+import dunit.VM;
+import junit.framework.Assert;
+
+public class WANTestBase extends DistributedTestCase{
+
+  protected static Cache cache;
+  protected static Region region;
+  
+  protected static PartitionedRegion customerRegion;
+  protected static PartitionedRegion orderRegion;
+  protected static PartitionedRegion shipmentRegion;
+  
+  protected static final String customerRegionName = "CUSTOMER";
+  protected static final String orderRegionName = "ORDER";
+  protected static final String shipmentRegionName = "SHIPMENT";
+  
+  protected static VM vm0;
+  protected static VM vm1;
+  protected static VM vm2;
+  protected static VM vm3;
+  protected static VM vm4;
+  protected static VM vm5;
+  protected static VM vm6;
+  protected static VM vm7;
+  
+  protected static QueueListener listener1;
+  protected static QueueListener listener2;
+  
+  protected static List<QueueListener> gatewayListeners;
+  
+  protected static AsyncEventListener eventListener1 ;
+  protected static AsyncEventListener eventListener2 ;
+
+  private static final long MAX_WAIT = 10000;
+  
+  protected static GatewayEventFilter eventFilter;
+  
+  protected static boolean destroyFlag = false;
+  
+  protected static List<Integer> dispatcherThreads = 
+	  new ArrayList<Integer>(Arrays.asList(1, 3, 5));
+  //this will be set for each test method run with one of the values from above list
+  protected static int numDispatcherThreadsForTheRun = 1;
+  
+  public WANTestBase(String name) {
+    super(name);
+  }
+  
+  public void setUp() throws Exception {
+    final Host host = Host.getHost(0);
+    vm0 = host.getVM(0); 
+    vm1 = host.getVM(1);
+    vm2 = host.getVM(2);
+    vm3 = host.getVM(3); 
+    vm4 = host.getVM(4);
+    vm5 = host.getVM(5);
+    vm6 = host.getVM(6); 
+    vm7 = host.getVM(7);
+    //Need to set the test name after the VMs are created
+    super.setUp();
+    //this is done to vary the number of dispatchers for sender 
+    //during every test method run
+    shuffleNumDispatcherThreads();
+    invokeInEveryVM(WANTestBase.class,"setNumDispatcherThreadsForTheRun",
+    	new Object[]{dispatcherThreads.get(0)});
+    addExpectedException("Connection refused");
+    addExpectedException("Software caused connection abort");
+    addExpectedException("Connection reset");
+  }
+  
+  public static void shuffleNumDispatcherThreads() {
+	  Collections.shuffle(dispatcherThreads);  
+  }
+  
+  public static void setNumDispatcherThreadsForTheRun(int numThreads) {
+	  numDispatcherThreadsForTheRun = numThreads;
+  }
+  
+  public static void stopOldLocator() {
+    if (Locator.hasLocator()) {
+      Locator.getLocator().stop();
+    }
+  }
+
+  public static void createLocator(int dsId, int port, Set<String> localLocatorsList, Set<String> remoteLocatorsList){
+    WANTestBase test = new WANTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME,"0");
+    props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, ""+dsId);
+    StringBuffer localLocatorBuffer = new StringBuffer(localLocatorsList.toString());
+    localLocatorBuffer.deleteCharAt(0);
+    localLocatorBuffer.deleteCharAt(localLocatorBuffer.lastIndexOf("]"));
+    String localLocator = localLocatorBuffer.toString();
+    localLocator = localLocator.replace(" ", ""); 
+    
+    props.setProperty(DistributionConfig.LOCATORS_NAME, localLocator);
+    props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
+    StringBuffer remoteLocatorBuffer = new StringBuffer(remoteLocatorsList.toString());
+    remoteLocatorBuffer.deleteCharAt(0);
+    remoteLocatorBuffer.deleteCharAt(remoteLocatorBuffer.lastIndexOf("]"));
+    String remoteLocator = remoteLocatorBuffer.toString();
+    remoteLocator = remoteLocator.replace(" ", ""); 
+    props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, remoteLocator);
+    test.getSystem(props);
+  }
+  
+  public static Integer createFirstLocatorWithDSId(int dsId) {
+    stopOldLocator();
+    WANTestBase test = new WANTestBase(testName);
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME,"0");
+    props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, ""+dsId);
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + port + "]");
+    props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
+    test.getSystem(props);
+    return port;
+  }
+  
+  public static Integer createFirstPeerLocator(int dsId) {
+    stopOldLocator();
+    WANTestBase test = new WANTestBase(testName);
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME,"0");
+    props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, ""+dsId);
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + port + "]");
+    props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost");
+    test.getSystem(props);
+    return port;
+  }
+  
+  public static Integer createSecondLocator(int dsId, int locatorPort) {
+    stopOldLocator();
+    WANTestBase test = new WANTestBase(testName);
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME,"0");
+    props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, ""+dsId);
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locatorPort + "]");
+    props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
+    test.getSystem(props);
+    return port;
+  }
+
+  public static Integer createSecondPeerLocator(int dsId, int locatorPort) {
+    stopOldLocator();
+    WANTestBase test = new WANTestBase(testName);
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME,"0");
+    props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, ""+dsId);
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locatorPort + "]");
+    props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost");
+    test.getSystem(props);
+    return port;
+  }
+  
+  public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
+    stopOldLocator();
+    WANTestBase test = new WANTestBase(testName);
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME,"0");
+    props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, ""+dsId);
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + port + "]");
+    props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
+    props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, "localhost[" + remoteLocPort + "]");
+    test.getSystem(props);
+    return port;
+  }
+  
+  public static void bringBackLocatorOnOldPort(int dsId, int remoteLocPort, int oldPort) {
+    WANTestBase test = new WANTestBase(testName);
+    Properties props = new Properties();
+    props.put(DistributionConfig.LOG_LEVEL_NAME, "fine");
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME,"0");
+    props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, ""+dsId);
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + oldPort + "]");
+    props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + oldPort + "],server=true,peer=true,hostname-for-clients=localhost");
+    props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, "localhost[" + remoteLocPort + "]");
+    test.getSystem(props);
+    return;
+  }
+  
+  
+  public static Integer createFirstRemotePeerLocator(int dsId, int remoteLocPort) {
+    stopOldLocator();
+    WANTestBase test = new WANTestBase(testName);
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME,"0");
+    props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, ""+dsId);
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + port + "]");
+    props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost");
+    props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, "localhost[" + remoteLocPort + "]");
+    test.getSystem(props);
+    return port;
+  }
+  
+  public static Integer createSecondRemoteLocator(int dsId, int localPort,
+      int remoteLocPort) {
+    stopOldLocator();
+    WANTestBase test = new WANTestBase(testName);
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME,"0");
+    props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, ""+dsId);
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + localPort + "]");
+    props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
+    props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, "localhost[" + remoteLocPort + "]");
+    test.getSystem(props);
+    return port;
+  }
+  
+  public static Integer createSecondRemotePeerLocator(int dsId, int localPort,
+      int remoteLocPort) {
+    stopOldLocator();
+    WANTestBase test = new WANTestBase(testName);
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME,"0");
+    props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, ""+dsId);
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + localPort + "]");
+    props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost");
+    props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, "localhost[" + remoteLocPort + "]");
+    test.getSystem(props);
+    return port;
+  }
+  
+  public static void createReplicatedRegion(String regionName, String senderIds, Boolean offHeap){
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(InterruptedException.class
+        .getName());
+    ExpectedException exp2 = addExpectedException(GatewaySenderException.class
+        .getName());
+    try {
+      AttributesFactory fact = new AttributesFactory();
+      if (senderIds != null) {
+        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+        while (tokenizer.hasMoreTokens()) {
+          String senderId = tokenizer.nextToken();
+          // GatewaySender sender = cache.getGatewaySender(senderId);
+          // assertNotNull(sender);
+          fact.addGatewaySenderId(senderId);
+        }
+      }
+      fact.setDataPolicy(DataPolicy.REPLICATE);
+      fact.setScope(Scope.DISTRIBUTED_ACK);
+      fact.setOffHeap(offHeap);
+      Region r = cache.createRegionFactory(fact.create()).create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp.remove();
+      exp1.remove();
+      exp2.remove();
+    }
+  }
+
+  public static void createNormalRegion(String regionName, String senderIds){
+    AttributesFactory fact = new AttributesFactory();
+    if(senderIds!= null){
+      StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+      while (tokenizer.hasMoreTokens()){
+        String senderId = tokenizer.nextToken();
+//        GatewaySender sender = cache.getGatewaySender(senderId);
+//        assertNotNull(sender);
+        fact.addGatewaySenderId(senderId);
+      }
+    }
+    fact.setDataPolicy(DataPolicy.NORMAL);
+    fact.setScope(Scope.DISTRIBUTED_ACK);
+    Region r = cache.createRegionFactory(fact.create()).create(regionName);
+    assertNotNull(r);
+  }
+  
+//  public static void createReplicatedRegion_PDX(String regionName, String senderId, DataPolicy policy, InterestPolicy intPolicy){
+//    AttributesFactory fact = new AttributesFactory();
+//    if(senderId!= null){
+//      StringTokenizer tokenizer = new StringTokenizer(senderId, ",");
+//      while (tokenizer.hasMoreTokens()){
+//        String sender = tokenizer.nextToken();
+//        //fact.addSerialGatewaySenderId(sender);
+//      }
+//    }
+//    fact.setDataPolicy(policy);
+//    SubscriptionAttributes subAttr = new SubscriptionAttributes(intPolicy);
+//    fact.setSubscriptionAttributes(subAttr);
+//    fact.setScope(Scope.DISTRIBUTED_ACK);
+//    Region r = cache.createRegionFactory(fact.create()).create(regionName);
+//    assertNotNull(r);
+//    assertTrue(r.size() == 0);
+//  }
+  
+  public static void createPersistentReplicatedRegion(String regionName, String senderIds, Boolean offHeap){
+    AttributesFactory fact = new AttributesFactory();
+    if(senderIds!= null){
+      StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+      while (tokenizer.hasMoreTokens()){
+        String senderId = tokenizer.nextToken();
+//        GatewaySender sender = cache.getGatewaySender(senderId);
+//        assertNotNull(sender);
+        fact.addGatewaySenderId(senderId);
+      }
+    }
+    fact.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
+    fact.setOffHeap(offHeap);
+    Region r = cache.createRegionFactory(fact.create()).create(regionName);
+    assertNotNull(r);
+  }
+  
+//  public static void createReplicatedRegionWithParallelSenderId(String regionName, String senderId){
+//    AttributesFactory fact = new AttributesFactory();
+//    if(senderId!= null){
+//      StringTokenizer tokenizer = new StringTokenizer(senderId, ",");
+//      while (tokenizer.hasMoreTokens()){
+//        String sender = tokenizer.nextToken();
+//        //fact.addParallelGatewaySenderId(sender);
+//      }
+//    }
+//    fact.setDataPolicy(DataPolicy.REPLICATE);
+//    Region r = cache.createRegionFactory(fact.create()).create(regionName);
+//    assertNotNull(r);
+//  }
+  
+//  public static void createReplicatedRegion(String regionName){
+//    AttributesFactory fact = new AttributesFactory();
+//    fact.setDataPolicy(DataPolicy.REPLICATE);
+//    Region r = cache.createRegionFactory(fact.create()).create(regionName);
+//    assertNotNull(r);
+//  }
+  
+  public static void createReplicatedRegionWithAsyncEventQueue(
+      String regionName, String asyncQueueIds, Boolean offHeap) {
+    ExpectedException exp1 = addExpectedException(ForceReattemptException.class
+        .getName());
+    try {
+      AttributesFactory fact = new AttributesFactory();
+      if (asyncQueueIds != null) {
+        StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ",");
+        while (tokenizer.hasMoreTokens()) {
+          String asyncQueueId = tokenizer.nextToken();
+          fact.addAsyncEventQueueId(asyncQueueId);
+        }
+      }
+      fact.setDataPolicy(DataPolicy.REPLICATE);
+      fact.setOffHeap(offHeap);
+      RegionFactory regionFactory = cache.createRegionFactory(fact.create());
+      Region r = regionFactory.create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp1.remove();
+    }
+  }
+  
+  public static void createPersistentReplicatedRegionWithAsyncEventQueue(
+      String regionName, String asyncQueueIds) {
+        
+    AttributesFactory fact = new AttributesFactory();
+    if(asyncQueueIds != null){
+      StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ",");
+      while (tokenizer.hasMoreTokens()){
+        String asyncQueueId = tokenizer.nextToken();
+        fact.addAsyncEventQueueId(asyncQueueId);
+      }
+    }
+    fact.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
+    RegionFactory regionFactory = cache.createRegionFactory(fact.create());
+    Region r = regionFactory.create(regionName);
+    assertNotNull(r);
+  }
+  
+  
+  
+  public static void createReplicatedRegionWithSenderAndAsyncEventQueue(
+      String regionName, String senderIds, String asyncChannelId, Boolean offHeap) {
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    try {
+
+      AttributesFactory fact = new AttributesFactory();
+      if (senderIds != null) {
+        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+        while (tokenizer.hasMoreTokens()) {
+          String senderId = tokenizer.nextToken();
+          fact.addGatewaySenderId(senderId);
+        }
+      }
+      fact.setDataPolicy(DataPolicy.REPLICATE);
+      fact.setScope(Scope.DISTRIBUTED_ACK);
+      fact.setOffHeap(offHeap);
+      RegionFactory regionFactory = cache.createRegionFactory(fact.create());
+      regionFactory.addAsyncEventQueueId(asyncChannelId);
+      Region r = regionFactory.create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp.remove();
+    }
+  }
+  
+  public static void createReplicatedRegion(String regionName, String senderIds, Scope scope, DataPolicy policy, Boolean offHeap){
+    AttributesFactory fact = new AttributesFactory();
+    if(senderIds!= null){
+      StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+      while (tokenizer.hasMoreTokens()){
+        String senderId = tokenizer.nextToken();
+//        GatewaySender sender = cache.getGatewaySender(senderId);
+//        assertNotNull(sender);
+        fact.addGatewaySenderId(senderId);
+      }
+    }
+    fact.setDataPolicy(policy);
+    fact.setScope(scope);
+    fact.setOffHeap(offHeap);
+    Region r = cache.createRegionFactory(fact.create()).create(regionName);
+    assertNotNull(r);
+  }
+  
+  public static void createAsyncEventQueue(
+      String asyncChannelId, boolean isParallel, 
+      Integer maxMemory, Integer batchSize, boolean isConflation, 
+      boolean isPersistent, String diskStoreName, boolean isDiskSynchronous) {
+    
+    if (diskStoreName != null) {
+      File directory = new File(asyncChannelId + "_disk_"
+          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+      directory.mkdir();
+      File[] dirs1 = new File[] { directory };
+      DiskStoreFactory dsf = cache.createDiskStoreFactory();
+      dsf.setDiskDirs(dirs1);
+      DiskStore ds = dsf.create(diskStoreName);
+    }
+    
+    AsyncEventListener asyncEventListener = new MyAsyncEventListener();
+    
+    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+    factory.setBatchSize(batchSize);
+    factory.setPersistent(isPersistent);
+    factory.setDiskStoreName(diskStoreName);
+    factory.setDiskSynchronous(isDiskSynchronous);
+    factory.setBatchConflationEnabled(isConflation);
+    factory.setMaximumQueueMemory(maxMemory);
+    factory.setParallel(isParallel);
+    //set dispatcher threads
+    factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+    AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
+  }
+  
+  public static void createAsyncEventQueueWithListener2(String asyncChannelId,
+      boolean isParallel, Integer maxMemory, Integer batchSize,
+      boolean isPersistent, String diskStoreName) {
+    
+    if (diskStoreName != null) {
+      File directory = new File(asyncChannelId + "_disk_"
+          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+      directory.mkdir();
+      File[] dirs1 = new File[] { directory };
+      DiskStoreFactory dsf = cache.createDiskStoreFactory();
+      dsf.setDiskDirs(dirs1);
+      DiskStore ds = dsf.create(diskStoreName);
+    }
+    
+    AsyncEventListener asyncEventListener = new MyAsyncEventListener2();
+
+    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+    factory.setBatchSize(batchSize);
+    factory.setPersistent(isPersistent);
+    factory.setDiskStoreName(diskStoreName);
+    factory.setMaximumQueueMemory(maxMemory);
+    factory.setParallel(isParallel);
+    //set dispatcher threads
+    factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+    AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
+        asyncEventListener);
+  }
+  
+  public static void createAsyncEventQueue(
+    String asyncChannelId, boolean isParallel, Integer maxMemory, 
+    Integer batchSize, boolean isConflation, boolean isPersistent, 
+    String diskStoreName, boolean isDiskSynchronous, String asyncListenerClass) throws Exception {
+	    
+	if (diskStoreName != null) {
+	  File directory = new File(asyncChannelId + "_disk_"
+		+ System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+	  directory.mkdir();
+	  File[] dirs1 = new File[] { directory };
+	  DiskStoreFactory dsf = cache.createDiskStoreFactory();
+	  dsf.setDiskDirs(dirs1);
+	  DiskStore ds = dsf.create(diskStoreName);
+	}
+	
+	String packagePrefix = "com.gemstone.gemfire.internal.cache.wan.";
+	String className = packagePrefix + asyncListenerClass;
+	AsyncEventListener asyncEventListener = null;
+	try {
+		Class clazz = Class.forName(className);
+		asyncEventListener = (AsyncEventListener) clazz.newInstance();
+	} catch (ClassNotFoundException e) {
+	  throw e;
+	} catch (InstantiationException e) {
+	  throw e;
+	} catch (IllegalAccessException e) {
+	  throw e;
+	}
+	    
+	AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+	factory.setBatchSize(batchSize);
+	factory.setPersistent(isPersistent);
+	factory.setDiskStoreName(diskStoreName);
+	factory.setDiskSynchronous(isDiskSynchronous);
+	factory.setBatchConflationEnabled(isConflation);
+	factory.setMaximumQueueMemory(maxMemory);
+	factory.setParallel(isParallel);
+	//set dispatcher threads
+	factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+	AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
+  }
+  
+  public static void createAsyncEventQueueWithCustomListener(
+      String asyncChannelId, boolean isParallel, Integer maxMemory,
+      Integer batchSize, boolean isConflation, boolean isPersistent,
+      String diskStoreName, boolean isDiskSynchronous) {
+    createAsyncEventQueueWithCustomListener(asyncChannelId, isParallel, maxMemory, batchSize,
+        isConflation, isPersistent, diskStoreName, isDiskSynchronous, GatewaySender.DEFAULT_DISPATCHER_THREADS);
+  }
+  
+  public static void createAsyncEventQueueWithCustomListener(
+      String asyncChannelId, boolean isParallel, Integer maxMemory,
+      Integer batchSize, boolean isConflation, boolean isPersistent,
+      String diskStoreName, boolean isDiskSynchronous, int nDispatchers) {
+
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+
+    try {
+      if (diskStoreName != null) {
+        File directory = new File(asyncChannelId + "_disk_"
+            + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+        directory.mkdir();
+        File[] dirs1 = new File[] { directory };
+        DiskStoreFactory dsf = cache.createDiskStoreFactory();
+        dsf.setDiskDirs(dirs1);
+        DiskStore ds = dsf.create(diskStoreName);
+      }
+
+      AsyncEventListener asyncEventListener = new CustomAsyncEventListener();
+
+      AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+      factory.setBatchSize(batchSize);
+      factory.setPersistent(isPersistent);
+      factory.setDiskStoreName(diskStoreName);
+      factory.setMaximumQueueMemory(maxMemory);
+      factory.setParallel(isParallel);
+      factory.setDispatcherThreads(nDispatchers);
+      AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
+          asyncEventListener);
+    } finally {
+      exp.remove();
+    }
+  }
+
+  public static void createConcurrentAsyncEventQueue(
+      String asyncChannelId, boolean isParallel, 
+      Integer maxMemory, Integer batchSize, boolean isConflation, 
+      boolean isPersistent, String diskStoreName, boolean isDiskSynchronous,
+      int dispatcherThreads, OrderPolicy policy) {
+    
+    if (diskStoreName != null) {
+      File directory = new File(asyncChannelId + "_disk_"
+          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+      directory.mkdir();
+      File[] dirs1 = new File[] { directory };
+      DiskStoreFactory dsf = cache.createDiskStoreFactory();
+      dsf.setDiskDirs(dirs1);
+      DiskStore ds = dsf.create(diskStoreName);
+    }
+    
+    AsyncEventListener asyncEventListener = new MyAsyncEventListener();
+    
+    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+    factory.setBatchSize(batchSize);
+    factory.setPersistent(isPersistent);
+    factory.setDiskStoreName(diskStoreName);
+    factory.setDiskSynchronous(isDiskSynchronous);
+    factory.setBatchConflationEnabled(isConflation);
+    factory.setMaximumQueueMemory(maxMemory);
+    factory.setParallel(isParallel);
+    factory.setDispatcherThreads(dispatcherThreads);
+    factory.setOrderPolicy(policy);
+    AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
+  }
+  
+  
+  public static String createAsyncEventQueueWithDiskStore(
+      String asyncChannelId, boolean isParallel, 
+      Integer maxMemory, Integer batchSize, 
+      boolean isPersistent, String diskStoreName) {
+    
+    AsyncEventListener asyncEventListener = new MyAsyncEventListener();
+    
+    File persistentDirectory = null;
+    if (diskStoreName == null) {
+      persistentDirectory = new File(asyncChannelId + "_disk_"
+          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+    } else {
+      persistentDirectory = new File(diskStoreName); 
+    }
+    getLogWriter().info("The ds is : " + persistentDirectory.getName());
+    persistentDirectory.mkdir();
+    DiskStoreFactory dsf = cache.createDiskStoreFactory();
+    File [] dirs1 = new File[] {persistentDirectory};
+    
+    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+    factory.setBatchSize(batchSize);
+    factory.setParallel(isParallel);
+    if (isPersistent) {
+      factory.setPersistent(isPersistent);
+      factory.setDiskStoreName(dsf.setDiskDirs(dirs1).create(asyncChannelId).getName());
+    }
+    factory.setMaximumQueueMemory(maxMemory);
+    //set dispatcher threads
+    factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+    AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
+    return persistentDirectory.getName();
+  }
+  
+  public static void pauseAsyncEventQueue(String asyncChannelId) {
+    AsyncEventQueue theChannel = null;
+    
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncChannelId.equals(asyncChannel.getId())) {
+        theChannel = asyncChannel;
+      }
+    }
+    
+    ((AsyncEventQueueImpl)theChannel).getSender().pause();
+ }
+  
+  public static void pauseAsyncEventQueueAndWaitForDispatcherToPause(String asyncChannelId) {
+    AsyncEventQueue theChannel = null;
+    
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncChannelId.equals(asyncChannel.getId())) {
+        theChannel = asyncChannel;
+        break;
+      }
+    }
+    
+    ((AsyncEventQueueImpl)theChannel).getSender().pause();
+    
+    
+    ((AbstractGatewaySender)((AsyncEventQueueImpl)theChannel).getSender()).getEventProcessor().waitForDispatcherToPause();
+  }
+  
+ public static void resumeAsyncEventQueue(String asyncQueueId) {
+    AsyncEventQueue theQueue = null;
+    
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncQueueId.equals(asyncChannel.getId())) {
+        theQueue = asyncChannel;
+      }
+    }
+    
+    ((AsyncEventQueueImpl)theQueue).getSender().resume();
+  }
+  
+  
+  public static void checkAsyncEventQueueSize(String asyncQueueId, int numQueueEntries) {
+    AsyncEventQueue theAsyncEventQueue = null;
+    
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncQueueId.equals(asyncChannel.getId())) {
+        theAsyncEventQueue = asyncChannel;
+      }
+    }
+    
+    GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue).getSender();
+    
+    if (sender.isParallel()) {
+      Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
+      assertEquals(numQueueEntries,
+          queues.toArray(new RegionQueue[queues.size()])[0].getRegion().size());
+    } else {
+      Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
+      int size = 0;
+      for (RegionQueue q : queues) {
+        size += q.size();
+      }
+      assertEquals(numQueueEntries, size);
+    }
+  }
+  
+  /**
+   * This method verifies the queue size of a ParallelGatewaySender. For
+   * ParallelGatewaySender conflation happens in a separate thread, hence test
+   * code needs to wait for some time for expected result
+   * 
+   * @param asyncQueueId
+   *          Async Queue ID
+   * @param numQueueEntries
+   *          expected number of Queue entries
+   * @throws Exception
+   */
+  public static void waitForAsyncEventQueueSize(String asyncQueueId,
+      final int numQueueEntries) throws Exception {
+    AsyncEventQueue theAsyncEventQueue = null;
+
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncQueueId.equals(asyncChannel.getId())) {
+        theAsyncEventQueue = asyncChannel;
+      }
+    }
+
+    GatewaySender sender = ((AsyncEventQueueImpl) theAsyncEventQueue)
+        .getSender();
+
+    if (sender.isParallel()) {
+      final Set<RegionQueue> queues = ((AbstractGatewaySender) sender)
+          .getQueues();
+
+      waitForCriterion(new WaitCriterion() {
+
+        public String description() {
+          return "Waiting for EventQueue size to be " + numQueueEntries;
+        }
+
+        public boolean done() {
+          boolean done = numQueueEntries == queues
+              .toArray(new RegionQueue[queues.size()])[0].getRegion().size();
+          return done;
+        }
+
+      }, MAX_WAIT, 500, true);
+
+    } else {
+      throw new Exception(
+          "This method should be used for only ParallelGatewaySender,SerialGatewaySender should use checkAsyncEventQueueSize() method instead");
+
+    }
+  }
+  
+  public static void createPartitionedRegion(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(PartitionOfflineException.class
+        .getName());
+    try {
+      AttributesFactory fact = new AttributesFactory();
+      if (senderIds != null) {
+        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+        while (tokenizer.hasMoreTokens()) {
+          String senderId = tokenizer.nextToken();
+          // GatewaySender sender = cache.getGatewaySender(senderId);
+          // assertNotNull(sender);
+          fact.addGatewaySenderId(senderId);
+        }
+      }
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(totalNumBuckets);
+      pfact.setRedundantCopies(redundantCopies);
+      pfact.setRecoveryDelay(0);
+      fact.setPartitionAttributes(pfact.create());
+      fact.setOffHeap(offHeap);
+      Region r = cache.createRegionFactory(fact.create()).create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+  
+  // TODO:OFFHEAP: add offheap flavor
+  public static void createPartitionedRegionWithPersistence(String regionName,
+      String senderIds, Integer redundantCopies, Integer totalNumBuckets) {
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(PartitionOfflineException.class
+        .getName());
+    try {
+      AttributesFactory fact = new AttributesFactory();
+      if (senderIds != null) {
+        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+        while (tokenizer.hasMoreTokens()) {
+          String senderId = tokenizer.nextToken();
+          // GatewaySender sender = cache.getGatewaySender(senderId);
+          // assertNotNull(sender);
+          fact.addGatewaySenderId(senderId);
+        }
+      }
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(totalNumBuckets);
+      pfact.setRedundantCopies(redundantCopies);
+      pfact.setRecoveryDelay(0);
+      fact.setPartitionAttributes(pfact.create());
+      fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+      Region r = cache.createRegionFactory(fact.create()).create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+  public static void createColocatedPartitionedRegion(String regionName,
+	      String senderIds, Integer redundantCopies, Integer totalNumBuckets, String colocatedWith) {
+	ExpectedException exp = addExpectedException(ForceReattemptException.class
+		.getName());
+	ExpectedException exp1 = addExpectedException(PartitionOfflineException.class
+		.getName());
+	try {
+	  AttributesFactory fact = new AttributesFactory();
+	  if (senderIds != null) {
+		  StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+		  while (tokenizer.hasMoreTokens()) {
+			  String senderId = tokenizer.nextToken();
+	          // GatewaySender sender = cache.getGatewaySender(senderId);
+	          // assertNotNull(sender);
+			  fact.addGatewaySenderId(senderId);
+		  }
+	  }
+	  PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+	  pfact.setTotalNumBuckets(totalNumBuckets);
+	  pfact.setRedundantCopies(redundantCopies);
+	  pfact.setRecoveryDelay(0);
+	  pfact.setColocatedWith(colocatedWith);
+	  fact.setPartitionAttributes(pfact.create());
+	  Region r = cache.createRegionFactory(fact.create()).create(regionName);
+	  assertNotNull(r);
+	}
+	finally {
+		exp.remove();
+		exp1.remove();
+	}
+  }
+  
+  public static void addSenderThroughAttributesMutator(String regionName,
+      String senderIds){
+    final Region r = cache.getRegion(Region.SEPARATOR + regionName);
+    assertNotNull(r);
+    AttributesMutator mutator = r.getAttributesMutator();
+    mutator.addGatewaySenderId(senderIds);
+  }
+  
+  public static void addAsyncEventQueueThroughAttributesMutator(
+      String regionName, String queueId) {
+    final Region r = cache.getRegion(Region.SEPARATOR + regionName);
+    assertNotNull(r);
+    AttributesMutator mutator = r.getAttributesMutator();
+    mutator.addAsyncEventQueueId(queueId);
+  }
+  
+  public static void createPartitionedRegionWithAsyncEventQueue(
+      String regionName, String asyncEventQueueId, Boolean offHeap) {
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(PartitionOfflineException.class
+        .getName());
+    try {
+      AttributesFactory fact = new AttributesFactory();
+
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(16);
+      fact.setPartitionAttributes(pfact.create());
+      fact.setOffHeap(offHeap);
+      Region r = cache.createRegionFactory(fact.create())
+          .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+  
+  public static void createColocatedPartitionedRegionWithAsyncEventQueue(
+    String regionName, String asyncEventQueueId, Integer totalNumBuckets, String colocatedWith) {
+	
+	ExpectedException exp = addExpectedException(ForceReattemptException.class
+	  .getName());
+	ExpectedException exp1 = addExpectedException(PartitionOfflineException.class
+	  .getName());
+	try {
+	  AttributesFactory fact = new AttributesFactory();
+
+	  PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+	  pfact.setTotalNumBuckets(totalNumBuckets);
+	  pfact.setColocatedWith(colocatedWith);
+	  fact.setPartitionAttributes(pfact.create());
+	  Region r = cache.createRegionFactory(fact.create())
+	    .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
+	  assertNotNull(r);
+	}
+	finally {
+	  exp.remove();
+	  exp1.remove();
+	}
+  }
+  
+  public static void createPersistentPartitionedRegionWithAsyncEventQueue(
+      String regionName, String asyncEventQueueId) {
+    AttributesFactory fact = new AttributesFactory();
+
+    PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+    fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+    pfact.setTotalNumBuckets(16);
+    fact.setPartitionAttributes(pfact.create());
+    if (asyncEventQueueId != null) {
+      StringTokenizer tokenizer = new StringTokenizer(asyncEventQueueId, ",");
+      while (tokenizer.hasMoreTokens()) {
+        String asyncId = tokenizer.nextToken();
+        fact.addAsyncEventQueueId(asyncId);
+      }
+    }
+    Region r = cache.createRegionFactory(fact.create()).create(regionName);
+    assertNotNull(r);
+  }
+  
+  /**
+   * Create PartitionedRegion with 1 redundant copy
+   */
+  public static void createPRWithRedundantCopyWithAsyncEventQueue(
+      String regionName, String asyncEventQueueId, Boolean offHeap) {
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+
+    try {
+      AttributesFactory fact = new AttributesFactory();
+
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(16);
+      pfact.setRedundantCopies(1);
+      fact.setPartitionAttributes(pfact.create());
+      fact.setOffHeap(offHeap);
+      Region r = cache.createRegionFactory(fact.create())
+          .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp.remove();
+    }
+  }
+  
+  public static void createPartitionedRegionAccessorWithAsyncEventQueue(
+      String regionName, String asyncEventQueueId) {
+    AttributesFactory fact = new AttributesFactory();
+    PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+    pfact.setTotalNumBuckets(16);
+    pfact.setLocalMaxMemory(0);
+    fact.setPartitionAttributes(pfact.create());
+    Region r = cache.createRegionFactory(
+    fact.create()).addAsyncEventQueueId(
+    asyncEventQueueId).create(regionName);
+    //fact.create()).create(regionName);
+    assertNotNull(r);
+  }
+  
+  public static void createPartitionedRegionAsAccessor(
+      String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets){
+    AttributesFactory fact = new AttributesFactory();
+    if(senderIds!= null){
+      StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+      while (tokenizer.hasMoreTokens()){
+        String senderId = tokenizer.nextToken();
+//        GatewaySender sender = cache.getGatewaySender(senderId);
+//        assertNotNull(sender);
+        fact.addGatewaySenderId(senderId);
+      }
+    }
+    PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+    pfact.setTotalNumBuckets(totalNumBuckets);
+    pfact.setRedundantCopies(redundantCopies);
+    pfact.setLocalMaxMemory(0);
+    fact.setPartitionAttributes(pfact.create());
+    Region r = cache.createRegionFactory(fact.create()).create(regionName);
+    assertNotNull(r);
+  }
+  
+  public static void createPartitionedRegionWithSerialParallelSenderIds(String regionName, String serialSenderIds, String parallelSenderIds, String colocatedWith, Boolean offHeap){
+    AttributesFactory fact = new AttributesFactory();
+    if (serialSenderIds != null) {
+      StringTokenizer tokenizer = new StringTokenizer(serialSenderIds, ",");
+      while (tokenizer.hasMoreTokens()) {
+        String senderId = tokenizer.nextToken();
+        // GatewaySender sender = cache.getGatewaySender(senderId);
+        // assertNotNull(sender);
+        fact.addGatewaySenderId(senderId);
+      }
+    }
+    if (parallelSenderIds != null) {
+      StringTokenizer tokenizer = new StringTokenizer(parallelSenderIds, ",");
+      while (tokenizer.hasMoreTokens()) {
+        String senderId = tokenizer.nextToken();
+//        GatewaySender sender = cache.getGatewaySender(senderId);
+//        assertNotNull(sender);
+        fact.addGatewaySenderId(senderId);
+      }
+    }
+    PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+    pfact.setColocatedWith(colocatedWith);
+    fact.setPartitionAttributes(pfact.create());
+    fact.setOffHeap(offHeap);
+    Region r = cache.createRegionFactory(fact.create()).create(regionName);
+    assertNotNull(r);
+  }
+  
+  public static void createPersistentPartitionedRegion(
+      String regionName, 
+      String senderIds, 
+      Integer redundantCopies, 
+      Integer totalNumBuckets,
+      Boolean offHeap){
+    
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(PartitionOfflineException.class
+        .getName());
+    try {
+
+      AttributesFactory fact = new AttributesFactory();
+      if (senderIds != null) {
+        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+        while (tokenizer.hasMoreTokens()) {
+          String senderId = tokenizer.nextToken();
+          // GatewaySender sender = cache.getGatewaySender(senderId);
+          // assertNotNull(sender);
+          fact.addGatewaySenderId(senderId);
+        }
+      }
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(totalNumBuckets);
+      pfact.setRedundantCopies(redundantCopies);
+      fact.setPartitionAttributes(pfact.create());
+      fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+      fact.setOffHeap(offHeap);
+      Region r = cache.createRegionFactory(fact.create()).create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+  
+  public static void createCustomerOrderShipmentPartitionedRegion(
+      String regionName, String senderIds, Integer redundantCopies,
+      Integer totalNumBuckets, Boolean offHeap) {
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    try {
+      AttributesFactory fact = new AttributesFactory();
+      if (senderIds != null) {
+        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+        while (tokenizer.hasMoreTokens()) {
+          String senderId = tokenizer.nextToken();
+          // GatewaySender sender = cache.getGatewaySender(senderId);
+          // assertNotNull(sender);
+          fact.addGatewaySenderId(senderId);
+        }
+      }
+
+      PartitionAttributesFactory paf = new PartitionAttributesFactory();
+      // creating colocated Regions
+      paf = new PartitionAttributesFactory();
+      paf.setRedundantCopies(redundantCopies)
+          .setTotalNumBuckets(totalNumBuckets)
+          .setPartitionResolver(
+              new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
+      fact.setPartitionAttributes(paf.create());
+      fact.setOffHeap(offHeap);
+      customerRegion = (PartitionedRegion)cache.createRegionFactory(
+          fact.create()).create(customerRegionName);
+      assertNotNull(customerRegion);
+      getLogWriter().info(
+          "Partitioned Region CUSTOMER created Successfully :"
+              + customerRegion.toString());
+
+      paf = new PartitionAttributesFactory();
+      paf.setRedundantCopies(redundantCopies)
+          .setTotalNumBuckets(totalNumBuckets)
+          .setColocatedWith(customerRegionName)
+          .setPartitionResolver(
+              new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
+      fact = new AttributesFactory();
+      if (senderIds != null) {
+        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+        while (tokenizer.hasMoreTokens()) {
+          String senderId = tokenizer.nextToken();
+          // GatewaySender sender = cache.getGatewaySender(senderId);
+          // assertNotNull(sender);
+          fact.addGatewaySenderId(senderId);
+        }
+      }
+      fact.setPartitionAttributes(paf.create());
+      fact.setOffHeap(offHeap);
+      orderRegion = (PartitionedRegion)cache.createRegionFactory(fact.create())
+          .create(orderRegionName);
+      assertNotNull(orderRegion);
+      getLogWriter().info(
+          "Partitioned Region ORDER created Successfully :"
+              + orderRegion.toString());
+
+      paf = new PartitionAttributesFactory();
+      paf.setRedundantCopies(redundantCopies)
+          .setTotalNumBuckets(totalNumBuckets)
+          .setColocatedWith(orderRegionName)
+          .setPartitionResolver(
+              new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
+      fact = new AttributesFactory();
+      if (senderIds != null) {
+        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+        while (tokenizer.hasMoreTokens()) {
+          String senderId = tokenizer.nextToken();
+          // GatewaySender sender = cache.getGatewaySender(senderId);
+          // assertNotNull(sender);
+          fact.addGatewaySenderId(senderId);
+        }
+      }
+      fact.setPartitionAttributes(paf.create());
+      fact.setOffHeap(offHeap);
+      shipmentRegion = (PartitionedRegion)cache.createRegionFactory(
+          fact.create()).create(shipmentRegionName);
+      assertNotNull(shipmentRegion);
+      getLogWriter().info(
+          "Partitioned Region SHIPMENT created Successfully :"
+              + shipmentRegion.toString());
+    }
+    finally {
+      exp.remove();
+    }
+  }
+  
+  public static void createColocatedPartitionedRegions(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){
+    AttributesFactory fact = new AttributesFactory();
+    if(senderIds!= null){
+      StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+      while (tokenizer.hasMoreTokens()){
+        String senderId = tokenizer.nextToken();
+//        GatewaySender sender = cache.getGatewaySender(senderId);
+//        assertNotNull(sender);
+        fact.addGatewaySenderId(senderId);
+      }
+    }
+    PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+    pfact.setTotalNumBuckets(totalNumBuckets);
+    pfact.setRedundantCopies(redundantCopies);
+    fact.setPartitionAttributes(pfact.create());
+    fact.setOffHeap(offHeap);
+    Region r = cache.createRegionFactory(fact.create()).create(regionName);
+    assertNotNull(r);
+    
+    pfact.setColocatedWith(r.getName());
+    fact.setPartitionAttributes(pfact.create());
+    fact.setOffHeap(offHeap);
+    Region r1 = cache.createRegionFactory(fact.create()).create(regionName+"_child1");
+    assertNotNull(r1);    
+    
+    Region r2 = cache.createRegionFactory(fact.create()).create(regionName+"_child2");
+    assertNotNull(r2);
+  }
+  
+  public static void createColocatedPartitionedRegions2 (String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){
+    AttributesFactory fact = new AttributesFactory();
+    if(senderIds!= null){
+      StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+      while (tokenizer.hasMoreTokens()){
+        String senderId = tokenizer.nextToken();
+//        GatewaySender sender = cache.getGatewaySender(senderId);
+//        assertNotNull(sender);
+        fact.addGatewaySenderId(senderId);
+      }
+    }
+    PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+    pfact.setTotalNumBuckets(totalNumBuckets);
+    pfact.setRedundantCopies(redundantCopies);
+    fact.setPartitionAttributes(pfact.create());
+    fact.setOffHeap(offHeap);
+    Region r = cache.createRegionFactory(fact.create()).create(regionName);
+    assertNotNull(r);
+    
+    
+    fact = new AttributesFactory();
+    pfact.setColocatedWith(r.getName());
+    fact.setPartitionAttributes(pfact.create());
+    fact.setOffHeap(offHeap);
+    Region r1 = cache.createRegionFactory(fact.create()).create(regionName+"_child1");
+    assertNotNull(r1);    
+    
+    Region r2 = cache.createRegionFactory(fact.create()).create(regionName+"_child2");
+    assertNotNull(r2);
+  }
+  
+  public static void createCache(Integer locPort){
+    createCache(false, locPort);
+  }
+  public static void createManagementCache(Integer locPort){
+    createCache(true, locPort);
+  }
+  
+  public static void createCacheConserveSockets(Boolean conserveSockets,Integer locPort){
+    WANTestBase test = new WANTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + "]");
+    props.setProperty(DistributionConfig.CONSERVE_SOCKETS_NAME, conserveSockets.toString());   
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+  }
+  
+  protected static void createCache(boolean management, Integer locPort) {
+    WANTestBase test = new WANTestBase(testName);
+    Properties props = new Properties();
+    if (management) {
+      props.setProperty(DistributionConfig.JMX_MANAGER_NAME, "true");
+      props.setProperty(DistributionConfig.JMX_MANAGER_START_NAME, "false");
+      props.setProperty(DistributionConfig.JMX_MANAGER_PORT_NAME, "0");
+      props.setProperty(DistributionConfig.JMX_MANAGER_HTTP_PORT_NAME, "0");
+    }
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + "]");
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);    
+  }
+  
+  protected static void createCacheWithSSL(Integer locPort) {
+    WANTestBase test = new WANTestBase(testName);
+
+    boolean gatewaySslenabled = true;
+    String  gatewaySslprotocols = "any";
+    String  gatewaySslciphers = "any";
+    boolean gatewaySslRequireAuth = true;
+    
+    Properties gemFireProps = new Properties();
+    gemFireProps.put(DistributionConfig.LOG_LEVEL_NAME, getDUnitLogLevel());
+    gemFireProps.put(DistributionConfig.GATEWAY_SSL_ENABLED_NAME, String.valueOf(gatewaySslenabled));
+    gemFireProps.put(DistributionConfig.GATEWAY_SSL_PROTOCOLS_NAME, gatewaySslprotocols);
+    gemFireProps.put(DistributionConfig.GATEWAY_SSL_CIPHERS_NAME, gatewaySslciphers);
+    gemFireProps.put(DistributionConfig.GATEWAY_SSL_REQUIRE_AUTHENTICATION_NAME, String.valueOf(gatewaySslRequireAuth));
+    
+    gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_TYPE_NAME, "jks");
+    gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_NAME, 
+        TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/client.keystore"));
+    gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_PASSWORD_NAME, "password");
+    gemFireProps.put(DistributionConfig.GATEWAY_SSL_TRUSTSTORE_NAME, 
+        TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/client.truststore"));
+    gemFireProps.put(DistributionConfig.GATEWAY_SSL_TRUSTSTORE_PASSWORD_NAME, "password");
+    
+    gemFireProps.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    gemFireProps.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + "]");
+    
+    getLogWriter().info("Starting cache ds with following properties \n" + gemFireProps);
+    
+    InternalDistributedSystem ds = test.getSystem(gemFireProps);
+    cache = CacheFactory.create(ds);    
+  }
+  
+  public static void createCache_PDX(Integer locPort){
+    WANTestBase test = new WANTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + "]");
+    InternalDistributedSystem ds = test.getSystem(props);
+    CacheConfig cacheConfig = new CacheConfig();
+    cacheConfig.setPdxPersistent(true);
+    cacheConfig.setPdxDiskStore("PDX_TEST");
+    cache = GemFireCacheImpl.create(ds, false, cacheConfig);
+    
+    File pdxDir = new File(CacheTestCase.getDiskDir(), "pdx");
+    DiskStoreFactory dsf = cache.createDiskStoreFactory();
+    File [] dirs1 = new File[] {pdxDir};
+    DiskStore store = dsf.setDiskDirs(dirs1).setMaxOplogSize(1).create("PDX_TEST");
+  }
+  
+  public static void createCache(Integer locPort1, Integer locPort2){
+    WANTestBase test = new WANTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort1
+        + "],localhost[" + locPort2 + "]");
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+  }
+  
+  public static void createCacheWithoutLocator(Integer mCastPort){
+    WANTestBase test = new WANTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, ""+mCastPort);
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+  }
+  
+  /**
+   * Method that creates a bridge server
+   * 
+   * @return    Integer   Port on which the server is started.
+   */
+  public static Integer createCacheServer() {
+    CacheServer server1 = cache.addCacheServer();
+    assertNotNull(server1);
+    int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    server1.setPort(port);
+    try {
+      server1.start();
+    }
+    catch (IOException e) {
+      fail("Failed to start the Server", e);
+    }
+    assertTrue(server1.isRunning());
+
+    return new Integer(server1.getPort());
+  }
+  
+  /**
+   * Returns a Map that contains the count for number of bridge server and number
+   * of Receivers.
+   * 
+   * @return    Map
+   */
+  public static Map getCacheServers() {
+    List cacheServers = cache.getCacheServers();
+    
+    Map cacheServersMap = new HashMap();
+    Iterator itr = cacheServers.iterator();
+    int bridgeServerCounter = 0;
+    int receiverServerCounter = 0;
+    while (itr.hasNext()) {
+      CacheServerImpl cacheServer = (CacheServerImpl) itr.next();
+      if (cacheServer.getAcceptor().isGatewayReceiver()) {
+        receiverServerCounter++;
+      } else {
+        bridgeServerCounter++;
+      }
+    }
+    cacheServersMap.put("BridgeServer", bridgeServerCounter);
+    cacheServersMap.put("ReceiverServer", receiverServerCounter);
+    return cacheServersMap;
+  }
+  
+  public static void startSender(String senderId) {
+    final ExpectedException exln = addExpectedException("Could not connect");
+
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(InterruptedException.class
+        .getName());
+    try {
+      Set<GatewaySender> senders = cache.getGatewaySenders();
+      GatewaySender sender = null;
+      for (GatewaySender s : senders) {
+        if (s.getId().equals(senderId)) {
+          sender = s;
+          break;
+        }
+      }
+      sender.start();
+    } finally {
+      exp.remove();
+      exp1.remove();
+      exln.remove();
+    }
+
+  }
+  
+  public static void enableConflation(String senderId) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    AbstractGatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = (AbstractGatewaySender)s;
+        break;
+      }
+    }
+    sender.test_setBatchConflationEnabled(true);
+  }
+  
+  public static void startAsyncEventQueue(String senderId) {
+    Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
+    AsyncEventQueue q = null;
+    for (AsyncEventQueue s : queues) {
+      if (s.getId().equals(senderId)) {
+        q = s;
+        break;
+      }
+    }
+    //merge42180: There is no start method on AsyncEventQueue. Cheetah has this method. Yet the code for AsyncEvnt Queue is not properly merged from cheetah to cedar
+    //q.start();
+  }
+  
+  /*
+  public static Map getSenderToReceiverConnectionInfo(String senderId){	
+	  Set<GatewaySender> senders = cache.getGatewaySenders();
+	  GatewaySender sender = null;
+	  for(GatewaySender s : senders){
+		  if(s.getId().equals(senderId)){
+			  sender = s;
+			  break;
+	      }
+	  }
+	  Map connectionInfo = null;
+	  if (!sender.isParallel() && ((AbstractGatewaySender) sender).isPrimary()) {
+		  connectionInfo = new HashMap();
+		  GatewaySenderEventDispatcher dispatcher = 
+			  ((AbstractGatewaySender)sender).getEventProcessor().getDispatcher();
+		  if (dispatcher instanceof GatewaySenderEventRemoteDispatcher) {
+			  ServerLocation serverLocation = 
+				  ((GatewaySenderEventRemoteDispatcher) dispatcher).getConnection().getServer();
+			  connectionInfo.put("serverHost", serverLocation.getHostName());
+			  connectionInfo.put("serverPort", serverLocation.getPort());
+			  
+		  }
+	  }
+	  return connectionInfo;
+  }
+  */
+  public static List<Integer> getSenderStats(String senderId, final int expectedQueueSize){
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    AbstractGatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = (AbstractGatewaySender)s;
+        break;
+      }
+    }
+    final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics();
+    if (expectedQueueSize != -1) {
+      final RegionQueue regionQueue;
+      regionQueue = sender.getQueues().toArray(
+          new RegionQueue[1])[0];
+      WaitCriterion wc = new WaitCriterion() {
+        public boolean done() {
+          if (regionQueue.size() == expectedQueueSize) {
+            return true;
+          }
+          return false;
+        }
+
+        public String description() {
+          return "Expected queue entries: " + expectedQueueSize
+              + " but actual entries: " + regionQueue.size();
+        }
+      };
+      DistributedTestCase.waitForCriterion(wc, 120000, 500, true);
+    }
+    ArrayList<Integer> stats = new ArrayList<Integer>();
+    stats.add(statistics.getEventQueueSize());
+    stats.add(statistics.getEventsReceived());
+    stats.add(statistics.getEventsQueued());
+    stats.add(statistics.getEventsDistributed());
+    stats.add(statistics.getBatchesDistributed());
+    stats.add(statistics.getBatchesRedistributed());
+    stats.add(statistics.getEventsFiltered());
+    stats.add(statistics.getEventsNotQueuedConflated());
+    stats.add(statistics.getEventsConflatedFromBatches());
+    return stats;
+  }
+      
+  public static void checkQueueStats(String senderId, final int queueSize,
+      final int eventsReceived, final int eventsQueued,
+      final int eventsDistributed) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    
+    final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics();
+    assertEquals(queueSize, statistics.getEventQueueSize());
+    assertEquals(eventsReceived, statistics.getEventsReceived());
+    assertEquals(eventsQueued, statistics.getEventsQueued());
+    assert(statistics.getEventsDistributed() >= eventsDistributed);
+  }
+
+  public static void checkAsyncEventQueueStats(String queueId, final int queueSize,
+      final int eventsReceived, final int eventsQueued,
+      final int eventsDistributed) {
+    Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
+    AsyncEventQueue queue = null;
+    for (AsyncEventQueue q : asyncQueues) {
+      if (q.getId().equals(queueId)) {
+        queue = q;
+        break;
+      }
+    }
+    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue).getStatistics();
+    assertEquals(queueSize, statistics.getEventQueueSize()); 
+    assertEquals(eventsReceived, statistics.getEventsReceived());
+    assertEquals(eventsQueued, statistics.getEventsQueued());
+    assert(statistics.getEventsDistributed() >= eventsDistributed);
+  }
+  
+  public static void checkGatewayReceiverStats(int processBatches,
+      int eventsReceived, int creates) {
+    Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
+    GatewayReceiver receiver = (GatewayReceiver)gatewayReceivers.iterator().next();
+    CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
+        .getAcceptor().getStats();
+
+    assertTrue(stats instanceof GatewayReceiverStats);
+    GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats)stats;
+    assertTrue(gatewayReceiverStats.getProcessBatchRequests() >= processBatches);
+    assertEquals(eventsReceived, gatewayReceiverStats.getEventsReceived());
+    assertEquals(creates, gatewayReceiverStats.getCreateRequest());
+  }
+
+  public static void checkMinimumGatewayReceiverStats(int processBatches,
+      int eventsReceived) {
+    Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
+    GatewayReceiver receiver = (GatewayReceiver)gatewayReceivers.iterator().next();
+    CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
+        .getAcceptor().getStats();
+
+    assertTrue(stats instanceof GatewayReceiverStats);
+    GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats)stats;
+    assertTrue(gatewayReceiverStats.getProcessBatchRequests() >= processBatches);
+    assertTrue(gatewayReceiverStats.getEventsReceived()>= eventsReceived);
+  }
+  
+  public static void checkExcepitonStats(int exceptionsOccured) {
+    Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
+    GatewayReceiver receiver = (GatewayReceiver)gatewayReceivers.iterator().next();
+    CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
+        .getAcceptor().getStats();
+
+    assertTrue(stats instanceof GatewayReceiverStats);
+    GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats)stats;
+    if (exceptionsOccured == 0) {
+      assertEquals(exceptionsOccured, gatewayReceiverStats
+          .getExceptionsOccured());
+    }
+    else {
+      assertTrue(gatewayReceiverStats.getExceptionsOccured() >= exceptionsOccured);
+    }
+  }
+
+  public static void checkGatewayReceiverStatsHA(int processBatches,
+      int eventsReceived, int creates) {
+    Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
+    GatewayReceiver receiver = (GatewayReceiver)gatewayReceivers.iterator().next();
+    CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
+        .getAcceptor().getStats();
+
+    assertTrue(stats instanceof GatewayReceiverStats);
+    GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats)stats;
+    assertTrue(gatewayReceiverStats.getProcessBatchRequests() >= processBatches);
+    assertTrue(gatewayReceiverStats.getEventsReceived() >= eventsReceived);
+    assertTrue(gatewayReceiverStats.getCreateRequest() >= creates);
+  }
+  
+  public static void checkEventFilteredStats(String senderId, final int eventsFiltered) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics();
+    assertEquals(eventsFiltered, statistics.getEventsFiltered());
+  }
+  
+  public static void checkConflatedStats(String senderId, final int eventsConflated) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics();
+    assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated());
+  }
+  
+  public static void checkAsyncEventQueueConflatedStats(
+      String asyncEventQueueId, final int eventsConflated) {
+    Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
+    AsyncEventQueue queue = null;
+    for (AsyncEventQueue q : queues) {
+      if (q.getId().equals(asyncEventQueueId)) {
+        queue = q;
+        break;
+      }
+    }
+    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue)
+        .getStatistics();
+    assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated());
+  }
+  
+  public static void checkStats_Failover(String senderId,
+      final int eventsReceived) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    final GatewaySenderStats statistics = ((AbstractGatewaySender)sender)
+        .getStatistics();
+
+    assertEquals(eventsReceived, statistics.getEventsReceived());
+    assertEquals(eventsReceived, (statistics.getEventsQueued()
+        + statistics.getUnprocessedTokensAddedByPrimary() + statistics
+        .getUnprocessedEventsRemovedByPrimary()));
+  }
+  
+  public static void checkAsyncEventQueueStats_Failover(String asyncEventQueueId,
+      final int eventsReceived) {
+    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
+    AsyncEventQueue queue = null;
+    for (AsyncEventQueue q : asyncEventQueues) {
+      if (q.getId().equals(asyncEventQueueId)) {
+        queue = q;
+        break;
+      }
+    }
+    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl) queue)
+        .getStatistics();
+
+    assertEquals(eventsReceived, statistics.getEventsReceived());
+    assertEquals(eventsReceived, (statistics.getEventsQueued()
+        + statistics.getUnprocessedTokensAddedByPrimary() + statistics
+        .getUnprocessedEventsRemovedByPrimary()));
+  }
+
+  public static void checkBatchStats(String senderId, final int batches) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    final GatewaySenderStats statistics = ((AbstractGatewaySender)sender)
+        .getStatistics();
+    assert (statistics.getBatchesDistributed() >= batches);
+    assertEquals(0, statistics.getBatchesRedistributed());
+  }
+  
+  public static void checkAsyncEventQueueBatchStats(String asyncQueueId,
+      final int batches) {
+    Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
+    AsyncEventQueue queue = null;
+    for (AsyncEventQueue q : queues) {
+      if (q.getId().equals(asyncQueueId)) {
+        queue = q;
+        break;
+      }
+    }
+    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue)
+        .getStatistics();
+    assert (statistics.getBatchesDistributed() >= batches);
+    assertEquals(0, statistics.getBatchesRedistributed());
+  }
+
+  public static void checkBatchStats(String senderId,
+      final boolean batchesDistributed, final boolean bathcesRedistributed) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    final GatewaySenderStats statistics = ((AbstractGatewaySender)sender)
+        .getStatistics();
+    assertEquals(batchesDistributed, (statistics.getBatchesDistributed() > 0));
+    assertEquals(bathcesRedistributed,
+        (statistics.getBatchesRedistributed() > 0));
+  }
+
+  public static void checkUnProcessedStats(String senderId, int events) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    final GatewaySenderStats statistics = ((AbstractGatewaySender)sender)
+        .getStatistics();
+    assertEquals(events,
+        (statistics.getUnprocessedEventsAddedBySecondary() + statistics
+            .getUnprocessedTokensRemovedBySecondary()));
+    assertEquals(events,
+        (statistics.getUnprocessedEventsRemovedByPrimary() + statistics
+            .getUnprocessedTokensAddedByPrimary()));
+  }
+  
+  public static void checkAsyncEventQueueUnprocessedStats(String asyncQueueId, int events) {
+    Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
+    AsyncEventQueue queue = null;
+    for (AsyncEventQueue q : asyncQueues) {
+      if (q.getId().equals(asyncQueueId)) {
+        queue = q;
+        break;
+      }
+    }
+    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue).getStatistics();
+    assertEquals(events,
+        (statistics.getUnprocessedEventsAddedBySecondary() + statistics
+            .getUnprocessedTokensRemovedBySecondary()));
+    assertEquals(events,
+        (statistics.getUnprocessedEventsRemovedByPrimary() + statistics
+            .getUnprocessedTokensAddedByPrimary()));
+  }
+  
+  
+  public static void setRemoveFromQueueOnException(String senderId, boolean removeFromQueue){
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for(GatewaySender s : senders){
+      if(s.getId().equals(senderId)){
+        sender = s;
+        break;
+      }
+    }
+    assertNotNull(sender);
+    ((AbstractGatewaySender)sender).setRemoveFromQueueOnException(removeFromQueue);
+  }
+  
+  public static void unsetRemoveFromQueueOnException(String senderId){
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for(GatewaySender s : senders){
+      if(s.getId().equals(senderId)){
+        sender = s;
+        break;
+      }
+    }
+    assertNotNull(sender);
+    ((AbstractGatewaySender)sender).setRemoveFromQueueOnException(false);
+  }
+  
+  public static void waitForSenderRunningState(String senderId){
+    final ExpectedException exln = addExpectedException("Could not connect");
+    try {
+      Set<GatewaySender> senders = cache.getGatewaySenders();
+      final GatewaySender sender = getGatewaySenderById(senders, senderId);
+
+      WaitCriterion wc = new WaitCriterion() {
+        public boolean done() {
+          if (sender != null && sender.isRunning()) {
+            return true;
+          }
+          return false;
+        }
+
+        public String description() {
+          return "Expected sender isRunning state to be true but is false";
+        }
+      };
+      DistributedTestCase.waitForCriterion(wc, 300000, 500, true);
+    } finally {
+      exln.remove();
+    }
+  }
+  
+  public static void waitForSenderToBecomePrimary(String senderId){
+    Set<GatewaySender> senders = ((GemFireCacheImpl)cache).getAllGatewaySenders();
+    final GatewaySender sender = getGatewaySenderById(senders, senderId);
+    WaitCriterion wc = new WaitCriterion() {
+      public boolean done() {
+        if (sender != null && ((AbstractGatewaySender) sender).isPrimary()) {
+          return true;
+        }
+        return false;
+      }
+
+      public String description() {
+        return "Expected sender primary state to be true but is false";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, 10000, 1000, true); 
+  }
+  
+  private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders, String senderId) {
+    for(GatewaySender s : senders){
+      if(s.getId().equals(senderId)){
+        return s;
+      }
+    }
+    //if none of the senders matches with the supplied senderid, return null
+    return null;
+  }
+  
+  public static HashMap checkQueue(){
+    HashMap listenerAttrs = new HashMap();
+    listenerAttrs.put("Create", listener1.createList);
+    listenerAttrs.put("Update", listener1.updateList);
+    listenerAttrs.put("Destroy", listener1.destroyList);
+    return listenerAttrs;
+  }
+  
+  public static void checkQueueOnSecondary (final Map primaryUpdatesMap){
+    final HashMap secondaryUpdatesMap = new HashMap();
+    secondaryUpdatesMap.put("Create", listener1.createList);
+    secondaryUpdatesMap.put("Update", listener1.updateList);
+    secondaryUpdatesMap.put("Destroy", listener1.destroyList);
+    
+    WaitCriterion wc = new WaitCriterion() {
+      public boolean done() {
+        secondaryUpdatesMap.put("Create", listener1.createList);
+        secondaryUpdatesMap.put("Update", listener1.updateList);
+        secondaryUpdatesMap.put("Destroy", listener1.destroyList);
+        if (secondaryUpdatesMap.equals(primaryUpdatesMap)) {
+          return true;
+        }
+        return false;
+      }
+
+      public String description() {
+        return "Expected seconadry map to be " + primaryUpdatesMap + " but it is " + secondaryUpdatesMap;
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, 300000, 500, true); 
+  }
+  
+  public static HashMap checkQueue2(){
+    HashMap listenerAttrs = new HashMap();
+    listenerAttrs.put("Create", listener2.createList);
+    listenerAttrs.put("Update", listener2.updateList);
+    listenerAttrs.put("Destroy", listener2.destroyList);
+    return listenerAttrs;
+  }
+  
+  public static HashMap checkPR(String regionName){
+    PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
+    QueueListener listener = (QueueListener)region.getCacheListener();
+    
+    HashMap listenerAttrs = new HashMap();
+    listenerAttrs.put("Create", listener.createList);
+    listenerAttrs.put("Update", listener.updateList);
+    listenerAttrs.put("Destroy", listener.destroyList);
+    return listenerAttrs;
+  }
+  
+  public static HashMap checkBR(String regionName, int numBuckets){
+    PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
+    HashMap listenerAttrs = new HashMap();
+    for (int i = 0; i < numBuckets; i++) {
+      BucketRegion br = region.getBucketRegion(i);
+      QueueListener listener = (QueueListener)br.getCacheListener();
+      listenerAttrs.put("Create"+i, listener.createList);
+      listenerAttrs.put("Update"+i, listener.updateList);
+      listenerAttrs.put("Destroy"+i, listener.destroyList);
+    }
+    return listenerAttrs;
+  }
+  
+  public static HashMap checkQueue_PR(String senderId){
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for(GatewaySender s : senders){
+      if(s.getId().equals(senderId)){
+        sender = s;
+        break;
+      }
+    }
+    
+    RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
+    .getQueues().toArray(new RegionQueue[1])[0];
+    
+    PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion();
+    QueueListener listener = (QueueListener)region.getCacheListener();
+    
+    HashMap listenerAttrs = new HashMap();
+    listenerAttrs.put("Create", listener.createList);
+    listenerAttrs.put("Update", listener.updateList);
+    listenerAttrs.put("Destroy", listener.destroyList);
+    return listenerAttrs;
+  }
+  
+  public static HashMap checkQueue_BR(String senderId, int numBuckets){
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for(GatewaySender s : senders){
+      if(s.getId().equals(senderId)){
+        sender = s;
+        break;
+      }
+    }
+    RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
+    .getQueues().toArray(new RegionQueue[1])[0];
+    
+    PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion();
+    HashMap listenerAttrs = new HashMap();
+    for (int i = 0; i < numBuckets; i++) {
+      BucketRegion br = region.getBucketRegion(i);
+      if (br != null) {
+        QueueListener listener = (QueueListener)br.getCacheListener();
+        if (listener != null) {
+          listenerAttrs.put("Create"+i, listener.createList);
+          listenerAttrs.put("Update"+i, listener.updateList);
+          listenerAttrs.put("Destroy"+i, listener.destroyList);
+        }
+      }
+    }
+    return listenerAttrs;
+  }
+
+  public static void addListenerOnBucketRegion(String regionName, int numBuckets) {
+    WANTestBase test = new WANTestBase(testName);
+    test.addCacheListenerOnBucketRegion(regionName, numBuckets);
+  }
+  
+  private void addCacheListenerOnBucketRegion(String regionName, int numBuckets){
+    PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
+    for (int i = 0; i < numBuckets; i++) {
+      BucketRegion br = region.getBucketRegion(i);
+      AttributesMutator mutator = br.getAttributesMutator();
+      listener1 = new QueueListener();
+      mutator.addCacheListener(listener1);
+    }
+  }
+  
+  public static void addListenerOnQueueBucketRegion(String senderId, int numBuckets) {
+    WANTestBase test = new WANTestBase(testName);
+    test.addCacheListenerOnQueueBucketRegion(senderId, numBuckets);
+  }
+  
+  private void addCacheListenerOnQueueBucketRegion(String senderId, int numBuckets){
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for(GatewaySender s : senders){
+      if(s.getId().equals(senderId)){
+        sender = s;
+        break;
+      }
+    }
+    RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
+    .getQueues().toArray(new RegionQueue[1])[0];
+    
+    PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion();
+    for (int i = 0; i < numBuckets; i++) {
+      BucketRegion br = region.getBucketRegion(i);
+      if (br != null) {
+        AttributesMutator mutator = br.getAttributesMutator();
+        CacheListener listener = new QueueListener();
+        mutator.addCacheListener(listener);
+      }
+    }
+    
+  }
+  
+  public static void addQueueListener(String senderId, boolean isParallel){
+    WANTestBase test = new WANTestBase(testName);
+    test.addCacheQueueListener(senderId, isParallel);
+  }
+  
+  public static void addSecondQueueListener(String senderId, boolean isParallel){
+    WANTestBase test = new WANTestBase(testName);
+    test.addSecondCacheQueueListener(senderId, isParallel);
+  }
+  
+  public static void addListenerOnRegion(String regionName){
+    WANTestBase test = new WANTestBase(testName);
+    test.addCacheListenerOnRegion(regionName);
+  }
+  private void addCacheListenerOnRegion(String regionName){
+    Region region = cache.getRegion(regionName);
+    AttributesMutator mutator = region.getAttributesMutator();
+    listener1 = new QueueListener();
+    mutator.addCacheListener(listener1);
+  }
+  
+  private void addCacheQueueListener(String senderId, boolean isParallel) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for(GatewaySender s : senders){
+      if(s.getId().equals(senderId)){
+        sender = s;
+        break;
+      }
+    }
+    listener1 = new QueueListener();
+    if (!isParallel) {
+      Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
+      for(RegionQueue q: queues) {
+        q.addCacheListener(listener1);
+      }
+    }
+    else {
+      RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
+      .getQueues().toArray(new RegionQueue[1])[0];
+      parallelQueue.addCacheListener(listener1);
+    }
+  }
+
+  private void addSecondCacheQueueListener(String senderId, boolean isParallel) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    listener2 = new QueueListener();
+    if (!isParallel) {
+      Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
+      for(RegionQueue q: queues) {
+        q.addCacheListener(listener2);
+      }
+    }
+    else {
+    	RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
+      .getQueues().toArray(new RegionQueue[1])[0];
+      parallelQueue.addCacheListener(listener2);
+    }
+  }
+  
+  public static void pauseSender(String senderId) {
+    final ExpectedException exln = addExpectedException("Could not connect");
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    try {
+      Set<GatewaySender> senders = cache.getGatewaySenders();
+      GatewaySender sender = null;
+      for (GatewaySender s : senders) {
+        if (s.getId().equals(senderId)) {
+          sender = s;
+          break;
+        }
+      }
+      sender.pause();
+    }
+    finally {
+      exp.remove();
+      exln.remove();
+    }
+  }
+  
+  public static void pauseSenderAndWaitForDispatcherToPause(String senderId) {
+    final ExpectedException exln = addExpectedException("Could not connect");
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    try {
+      Set<GatewaySender> senders = cache.getGatewaySenders();
+      GatewaySender sender = null;
+      for (GatewaySender s : senders) {
+        if (s.getId().equals(senderId)) {
+          sender = s;
+          break;
+        }
+      }
+      sender.pause();
+      ((AbstractGatewaySender)sender).getEventProcessor().waitForDispatcher

<TRUNCATED>


Mime
View raw message